I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 64840055 authored by zzz's avatar zzz
Browse files

I2PTunnel: First cut at SOCKS UDP (untested); also some streamr and UDP tweaks

parent 559653f0
No related branches found
No related tags found
No related merge requests found
Showing
with 549 additions and 21 deletions
package net.i2p.i2ptunnel.socks;
import java.util.Map;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.udp.*;
import net.i2p.util.Log;
/**
* Sends to one of many Sinks
* @author zzz modded from streamr/MultiSource
*/
public class MultiSink implements Source, Sink {
private static final Log _log = new Log(MultiSink.class);
public MultiSink(Map cache) {
this.cache = cache;
}
/** Don't use this - put sinks in the cache */
public void setSink(Sink sink) {}
public void start() {}
public void send(Destination from, byte[] data) {
Sink s = this.cache.get(from);
if (s == null) {
_log.error("No where to go for " + from.calculateHash().toBase64().substring(0, 6));
return;
}
s.send(from, data);
}
private Map<Destination, Sink> cache;
}
package net.i2p.i2ptunnel.socks;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.udp.*;
import net.i2p.util.Log;
/**
* Track who the reply goes to
* @author zzz
*/
public class ReplyTracker implements Source, Sink {
private static final Log _log = new Log(MultiSink.class);
public ReplyTracker(Sink reply, Map cache) {
this.reply = reply;
this.cache = cache;
}
public void setSink(Sink sink) {
this.sink = sink;
}
public void start() {}
public void send(Destination to, byte[] data) {
this.cache.put(to, this.reply);
this.sink.send(to, data);
}
private Sink reply;
private Map<Destination, Sink> cache;
private Sink sink;
}
...@@ -13,12 +13,15 @@ import java.io.IOException; ...@@ -13,12 +13,15 @@ import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.I2PException; import net.i2p.I2PException;
import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocket;
import net.i2p.data.DataFormatException; import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.I2PTunnel; import net.i2p.i2ptunnel.I2PTunnel;
import net.i2p.util.HexDump; import net.i2p.util.HexDump;
import net.i2p.util.Log; import net.i2p.util.Log;
...@@ -67,7 +70,8 @@ public class SOCKS5Server extends SOCKSServer { ...@@ -67,7 +70,8 @@ public class SOCKS5Server extends SOCKSServer {
out = new DataOutputStream(clientSock.getOutputStream()); out = new DataOutputStream(clientSock.getOutputStream());
init(in, out); init(in, out);
manageRequest(in, out); if (manageRequest(in, out) == Command.UDP_ASSOCIATE)
handleUDP(in, out);
} catch (IOException e) { } catch (IOException e) {
throw new SOCKSException("Connection error (" + e.getMessage() + ")"); throw new SOCKSException("Connection error (" + e.getMessage() + ")");
} }
...@@ -111,7 +115,7 @@ public class SOCKS5Server extends SOCKSServer { ...@@ -111,7 +115,7 @@ public class SOCKS5Server extends SOCKSServer {
* initialization, integrity/confidentiality encapsulations, etc) * initialization, integrity/confidentiality encapsulations, etc)
* has been stripped out of the input/output streams. * has been stripped out of the input/output streams.
*/ */
private void manageRequest(DataInputStream in, DataOutputStream out) throws IOException, SOCKSException { private int manageRequest(DataInputStream in, DataOutputStream out) throws IOException, SOCKSException {
int socksVer = in.readByte() & 0xff; int socksVer = in.readByte() & 0xff;
if (socksVer != SOCKS_VERSION_5) { if (socksVer != SOCKS_VERSION_5) {
_log.debug("error in SOCKS5 request (protocol != 5? wtf?)"); _log.debug("error in SOCKS5 request (protocol != 5? wtf?)");
...@@ -127,9 +131,12 @@ public class SOCKS5Server extends SOCKSServer { ...@@ -127,9 +131,12 @@ public class SOCKS5Server extends SOCKSServer {
sendRequestReply(Reply.COMMAND_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out); sendRequestReply(Reply.COMMAND_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
throw new SOCKSException("BIND command not supported"); throw new SOCKSException("BIND command not supported");
case Command.UDP_ASSOCIATE: case Command.UDP_ASSOCIATE:
/*** if(!Boolean.valueOf(tunnel.getOptions().getProperty("i2ptunnel.socks.allowUDP")).booleanValue()) {
_log.debug("UDP ASSOCIATE command is not supported!"); _log.debug("UDP ASSOCIATE command is not supported!");
sendRequestReply(Reply.COMMAND_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out); sendRequestReply(Reply.COMMAND_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
throw new SOCKSException("UDP ASSOCIATE command not supported"); throw new SOCKSException("UDP ASSOCIATE command not supported");
***/
break;
default: default:
_log.debug("unknown command in request (" + Integer.toHexString(command) + ")"); _log.debug("unknown command in request (" + Integer.toHexString(command) + ")");
sendRequestReply(Reply.COMMAND_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out); sendRequestReply(Reply.COMMAND_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
...@@ -152,7 +159,8 @@ public class SOCKS5Server extends SOCKSServer { ...@@ -152,7 +159,8 @@ public class SOCKS5Server extends SOCKSServer {
connHostName += "."; connHostName += ".";
} }
} }
_log.warn("IPV4 address type in request: " + connHostName + ". Is your client secure?"); if (command != Command.UDP_ASSOCIATE)
_log.warn("IPV4 address type in request: " + connHostName + ". Is your client secure?");
break; break;
case AddressType.DOMAINNAME: case AddressType.DOMAINNAME:
{ {
...@@ -168,9 +176,12 @@ public class SOCKS5Server extends SOCKSServer { ...@@ -168,9 +176,12 @@ public class SOCKS5Server extends SOCKSServer {
_log.debug("DOMAINNAME address type in request: " + connHostName); _log.debug("DOMAINNAME address type in request: " + connHostName);
break; break;
case AddressType.IPV6: case AddressType.IPV6:
_log.warn("IP V6 address type in request! Is your client secure?" + " (IPv6 is not supported, anyway :-)"); if (command != Command.UDP_ASSOCIATE) {
sendRequestReply(Reply.ADDRESS_TYPE_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out); _log.warn("IP V6 address type in request! Is your client secure?" + " (IPv6 is not supported, anyway :-)");
throw new SOCKSException("IPV6 addresses not supported"); sendRequestReply(Reply.ADDRESS_TYPE_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
throw new SOCKSException("IPV6 addresses not supported");
}
break;
default: default:
_log.debug("unknown address type in request (" + Integer.toHexString(command) + ")"); _log.debug("unknown address type in request (" + Integer.toHexString(command) + ")");
sendRequestReply(Reply.ADDRESS_TYPE_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out); sendRequestReply(Reply.ADDRESS_TYPE_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
...@@ -183,6 +194,7 @@ public class SOCKS5Server extends SOCKSServer { ...@@ -183,6 +194,7 @@ public class SOCKS5Server extends SOCKSServer {
sendRequestReply(Reply.CONNECTION_NOT_ALLOWED_BY_RULESET, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out); sendRequestReply(Reply.CONNECTION_NOT_ALLOWED_BY_RULESET, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
throw new SOCKSException("Invalid port number in request"); throw new SOCKSException("Invalid port number in request");
} }
return command;
} }
protected void confirmConnection() throws SOCKSException { protected void confirmConnection() throws SOCKSException {
...@@ -293,6 +305,13 @@ public class SOCKS5Server extends SOCKSServer { ...@@ -293,6 +305,13 @@ public class SOCKS5Server extends SOCKSServer {
// Let's not due a new Dest for every request, huh? // Let's not due a new Dest for every request, huh?
//I2PSocketManager sm = I2PSocketManagerFactory.createManager(); //I2PSocketManager sm = I2PSocketManagerFactory.createManager();
//destSock = sm.connect(I2PTunnel.destFromName(connHostName), null); //destSock = sm.connect(I2PTunnel.destFromName(connHostName), null);
Destination dest = I2PTunnel.destFromName(connHostName);
if (dest == null) {
try {
sendRequestReply(Reply.HOST_UNREACHABLE, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
} catch (IOException ioe) {}
throw new SOCKSException("Host not found");
}
destSock = t.createI2PSocket(I2PTunnel.destFromName(connHostName)); destSock = t.createI2PSocket(I2PTunnel.destFromName(connHostName));
} else if ("localhost".equals(connHostName) || "127.0.0.1".equals(connHostName)) { } else if ("localhost".equals(connHostName) || "127.0.0.1".equals(connHostName)) {
String err = "No localhost accesses allowed through the Socks Proxy"; String err = "No localhost accesses allowed through the Socks Proxy";
...@@ -358,6 +377,59 @@ public class SOCKS5Server extends SOCKSServer { ...@@ -358,6 +377,59 @@ public class SOCKS5Server extends SOCKSServer {
return destSock; return destSock;
} }
// This isn't really the right place for this, we can't stop the tunnel once it starts.
static SOCKSUDPTunnel _tunnel;
static Object _startLock = new Object();
static byte[] dummyIP = new byte[4];
/**
* We got a UDP associate command.
* Loop here looking for more, never return normally,
* or else I2PSocksTunnel will create a streaming lib connection.
*
* Do UDP Socks clients actually send more than one Associate request?
* RFC 1928 isn't clear... maybe not.
*/
private void handleUDP(DataInputStream in, DataOutputStream out) throws SOCKSException {
List<Integer> ports = new ArrayList(1);
synchronized (_startLock) {
if (_tunnel == null) {
// tunnel options?
_tunnel = new SOCKSUDPTunnel(new I2PTunnel());
_tunnel.startRunning();
}
}
while (true) {
// Set it up. connHostName and connPort are the client's info.
InetAddress ia = null;
try {
ia = InetAddress.getByAddress(connHostName, dummyIP);
} catch (UnknownHostException uhe) {} // won't happen, no resolving done here
int myPort = _tunnel.add(ia, connPort);
ports.add(Integer.valueOf(myPort));
try {
sendRequestReply(Reply.SUCCEEDED, AddressType.IPV4, InetAddress.getByName("127.0.0.1"), null, myPort, out);
} catch (IOException ioe) { break; }
// wait for more ???
try {
int command = manageRequest(in, out);
// don't do this...
if (command != Command.UDP_ASSOCIATE)
break;
} catch (IOException ioe) { break; }
catch (SOCKSException ioe) { break; }
}
for (Integer i : ports)
_tunnel.remove(i);
// Prevent I2PSocksTunnel from calling getDestinationI2PSocket() above
// to create a streaming lib connection...
// This isn't very elegant...
//
throw new SOCKSException("End of UDP Processing");
}
/* /*
* Some namespaces to enclose SOCKS protocol codes * Some namespaces to enclose SOCKS protocol codes
*/ */
......
package net.i2p.i2ptunnel.socks;
import net.i2p.data.Base32;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.I2PTunnel;
/**
* Save the SOCKS header from a datagram
* Ref: RFC 1928
*
* @author zzz
*/
public class SOCKSHeader {
/**
* @param data the whole packet
*/
public SOCKSHeader(byte[] data) {
if (data.length <= 8)
throw new IllegalArgumentException("Header too short: " + data.length);
if (data[0] != 0 || data[1] != 0)
throw new IllegalArgumentException("Not a SOCKS datagram?");
if (data[2] != 0)
throw new IllegalArgumentException("We can't handle fragments!");
int headerlen = 0;
int addressType = data[3];
if (addressType == 1) {
// this will fail in getDestination()
headerlen = 6 + 4;
} else if (addressType == 3) {
headerlen = 6 + 1 + (data[4] & 0xff);
} else if (addressType == 4) {
// this will fail in getDestination()
// but future garlicat partial hash lookup possible?
headerlen = 6 + 16;
} else {
throw new IllegalArgumentException("Unknown address type: " + addressType);
}
if (data.length < headerlen)
throw new IllegalArgumentException("Header too short: " + data.length);
this.header = new byte[headerlen];
System.arraycopy(this.header, 0, data, 0, headerlen);
}
private static final byte[] beg = {0,0,0,3,60};
private static final byte[] end = {'.','b','3','2','.','i','2','p',0,0};
/**
* Make a dummy header from a dest,
* for those cases where we want to receive unsolicited datagrams.
* Unused for now.
*/
public SOCKSHeader(Destination dest) {
this.header = new byte[beg.length + 52 + end.length];
System.arraycopy(this.header, 0, beg, 0, beg.length);
String b32 = Base32.encode(dest.calculateHash().getData());
System.arraycopy(this.header, beg.length, b32.getBytes(), 0, 52);
System.arraycopy(this.header, beg.length + 52, end, 0, end.length);
}
public String getHost() {
int addressType = this.header[3];
if (addressType != 3)
return null;
int namelen = (this.header[4] & 0xff);
byte[] nameBytes = new byte[namelen];
System.arraycopy(nameBytes, 0, this.header, 5, namelen);
return new String(nameBytes);
}
public Destination getDestination() {
String name = getHost();
if (name == null)
return null;
try {
// the naming service does caching (thankfully)
return I2PTunnel.destFromName(name);
} catch (DataFormatException dfe) {}
return null;
}
public byte[] getBytes() {
return header;
}
private byte[] header;
}
package net.i2p.i2ptunnel.socks;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.udp.*;
/**
* Implements a UDP port and Socks encapsulation / decapsulation.
* This is for a single port. If there is demuxing for multiple
* ports, it happens outside of here.
*
* TX:
* UDPSource -> SOCKSUDPUnwrapper -> ReplyTracker ( -> I2PSink in SOCKSUDPTunnel)
*
* RX:
* UDPSink <- SOCKSUDPWrapper ( <- MultiSink <- I2PSource in SOCKSUDPTunnel)
*
* The Unwrapper passes headers to the Wrapper through a cache.
* The ReplyTracker passes sinks to MultiSink through a cache.
*
* @author zzz
*/
public class SOCKSUDPPort implements Source, Sink {
public SOCKSUDPPort(InetAddress host, int port, Map replyMap) {
// this passes the host and port from UDPUnwrapper to UDPWrapper
Map cache = new ConcurrentHashMap(4);
// rcv from I2P and send to a port
this.wrapper = new SOCKSUDPWrapper(cache);
this.udpsink = new UDPSink(host, port);
this.wrapper.setSink(this.udpsink);
// rcv from the same port and send to I2P
DatagramSocket sock = this.udpsink.getSocket();
this.udpsource = new UDPSource(sock);
this.unwrapper = new SOCKSUDPUnwrapper(cache);
this.udpsource.setSink(this.unwrapper);
this.udptracker = new ReplyTracker(this, replyMap);
this.unwrapper.setSink(this.udptracker);
}
/** Socks passes this back to the client on the TCP connection */
public int getPort() {
return this.udpsink.getPort();
}
public void setSink(Sink sink) {
this.udptracker.setSink(sink);
}
public void start() {
// the other Sources don't use start
this.udpsource.start();
}
public void stop() {
this.udpsink.stop();
this.udpsource.stop();
}
public void send(Destination from, byte[] data) {
this.wrapper.send(from, data);
}
private UDPSink udpsink;
private UDPSource udpsource;
private SOCKSUDPWrapper wrapper;
private SOCKSUDPUnwrapper unwrapper;
private ReplyTracker udptracker;
}
package net.i2p.i2ptunnel.socks;
import java.net.InetAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.Map;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.I2PTunnel;
import net.i2p.i2ptunnel.Logging;
import net.i2p.i2ptunnel.udp.*;
import net.i2p.i2ptunnel.udpTunnel.I2PTunnelUDPClientBase;
import net.i2p.util.EventDispatcher;
/**
* A Datagram Tunnel that can have multiple bidirectional ports on the UDP side.
*
* TX:
* (ReplyTracker in multiple SOCKSUDPPorts -> ) I2PSink
*
* RX:
* (SOCKSUDPWrapper in multiple SOCKSUDPPorts <- ) MultiSink <- I2PSource
*
* The reply from a dest goes to the last SOCKSUDPPort that sent to that dest.
* If multiple ports are talking to a dest at the same time, this isn't
* going to work very well.
*
* @author zzz modded from streamr/StreamrConsumer
*/
public class SOCKSUDPTunnel extends I2PTunnelUDPClientBase {
/**
* Set up a tunnel with no UDP side yet.
* Use add() for each port.
*/
public SOCKSUDPTunnel(I2PTunnel tunnel) {
super(null, tunnel, tunnel, tunnel);
this.ports = new ConcurrentHashMap(1);
this.cache = new ConcurrentHashMap(1);
this.demuxer = new MultiSink(this.cache);
setSink(this.demuxer);
}
/** @return the UDP port number */
public int add(InetAddress host, int port) {
SOCKSUDPPort sup = new SOCKSUDPPort(host, port, this.cache);
this.ports.put(Integer.valueOf(sup.getPort()), sup);
sup.setSink(this);
sup.start();
return sup.getPort();
}
public void remove(Integer port) {
SOCKSUDPPort sup = this.ports.remove(port);
if (sup != null)
sup.stop();
for (Iterator iter = cache.entrySet().iterator(); iter.hasNext();) {
Map.Entry<Destination, SOCKSUDPPort> e = (Map.Entry) iter.next();
if (e.getValue() == sup)
iter.remove();
}
}
public final void startRunning() {
super.startRunning();
// demuxer start() doesn't do anything
startall();
}
public boolean close(boolean forced) {
stopall();
return super.close(forced);
}
/** you should really add() after startRunning() */
private void startall() {
}
private void stopall() {
for (SOCKSUDPPort sup : this.ports.values()) {
sup.stop();
}
this.ports.clear();
this.cache.clear();
}
private Map<Integer, SOCKSUDPPort> ports;
private Map<Destination, SOCKSUDPPort> cache;
private MultiSink demuxer;
}
package net.i2p.i2ptunnel.socks;
import java.util.Map;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.udp.*;
import net.i2p.util.Log;
/**
* Strip a SOCKS header off a datagram, convert it to a Destination
* Ref: RFC 1928
*
* @author zzz
*/
public class SOCKSUDPUnwrapper implements Source, Sink {
private static final Log _log = new Log(SOCKSUDPUnwrapper.class);
/**
* @param cache put headers here to pass to SOCKSUDPWrapper
*/
public SOCKSUDPUnwrapper(Map<Destination, SOCKSHeader> cache) {
this.cache = cache;
}
public void setSink(Sink sink) {
this.sink = sink;
}
public void start() {}
/**
*
*/
public void send(Destination ignored_from, byte[] data) {
SOCKSHeader h;
try {
h = new SOCKSHeader(data);
} catch (IllegalArgumentException iae) {
_log.error(iae.toString());
return;
}
Destination dest = h.getDestination();
if (dest == null) {
// no, we aren't going to send non-i2p traffic to a UDP outproxy :)
_log.error("Destination not found: " + h.getHost());
return;
}
cache.put(dest, h);
int headerlen = h.getBytes().length;
byte unwrapped[] = new byte[data.length - headerlen];
System.arraycopy(unwrapped, 0, data, headerlen, unwrapped.length);
this.sink.send(dest, unwrapped);
}
private Sink sink;
private Map<Destination, SOCKSHeader> cache;
}
package net.i2p.i2ptunnel.socks;
import java.util.Map;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.udp.*;
/**
* Put a SOCKS header on a datagram
* Ref: RFC 1928
*
* @author zzz
*/
public class SOCKSUDPWrapper implements Source, Sink {
public SOCKSUDPWrapper(Map<Destination, SOCKSHeader> cache) {
this.cache = cache;
}
public void setSink(Sink sink) {
this.sink = sink;
}
public void start() {}
/**
* Use the cached header, which should have the host string and port
*
*/
public void send(Destination from, byte[] data) {
if (this.sink == null)
return;
SOCKSHeader h = cache.get(from);
if (h == null) {
// RFC 1928 says drop
// h = new SOCKSHeader(from);
return;
}
byte[] header = h.getBytes();
byte wrapped[] = new byte[header.length + data.length];
System.arraycopy(wrapped, 0, header, 0, header.length);
System.arraycopy(wrapped, header.length, data, 0, data.length);
this.sink.send(from, wrapped);
}
private Sink sink;
private Map<Destination, SOCKSHeader> cache;
}
...@@ -27,6 +27,10 @@ public class MultiSource implements Source, Sink { ...@@ -27,6 +27,10 @@ public class MultiSource implements Source, Sink {
public void start() {} public void start() {}
public void stop() {
this.sinks.clear();
}
public void send(Destination ignored_from, byte[] data) { public void send(Destination ignored_from, byte[] data) {
for(Destination dest : this.sinks) { for(Destination dest : this.sinks) {
this.sink.send(dest, data); this.sink.send(dest, data);
......
...@@ -47,6 +47,7 @@ public class StreamrConsumer extends I2PTunnelUDPClientBase { ...@@ -47,6 +47,7 @@ public class StreamrConsumer extends I2PTunnelUDPClientBase {
public boolean close(boolean forced) { public boolean close(boolean forced) {
// send unsubscribe-message // send unsubscribe-message
this.pinger.stop(); this.pinger.stop();
this.sink.stop();
return super.close(forced); return super.close(forced);
} }
...@@ -59,6 +60,6 @@ public class StreamrConsumer extends I2PTunnelUDPClientBase { ...@@ -59,6 +60,6 @@ public class StreamrConsumer extends I2PTunnelUDPClientBase {
private Sink sink; private UDPSink sink;
private Pinger pinger; private Pinger pinger;
} }
...@@ -51,7 +51,8 @@ public class StreamrProducer extends I2PTunnelUDPServerBase { ...@@ -51,7 +51,8 @@ public class StreamrProducer extends I2PTunnelUDPServerBase {
} }
public boolean close(boolean forced) { public boolean close(boolean forced) {
// need some stop() methods in UDPSource and MultiSource this.server.stop();
this.multi.stop();
return super.close(forced); return super.close(forced);
} }
...@@ -65,6 +66,6 @@ public class StreamrProducer extends I2PTunnelUDPServerBase { ...@@ -65,6 +66,6 @@ public class StreamrProducer extends I2PTunnelUDPServerBase {
private MultiSource multi; private MultiSource multi;
private Source server; private UDPSource server;
private Sink subscriber; private Sink subscriber;
} }
...@@ -34,6 +34,8 @@ public class UDPSink implements Sink { ...@@ -34,6 +34,8 @@ public class UDPSink implements Sink {
} }
public void send(Destination src, byte[] data) { public void send(Destination src, byte[] data) {
// if data.length > this.sock.getSendBufferSize() ...
// create packet // create packet
DatagramPacket packet = new DatagramPacket(data, data.length, this.remoteHost, this.remotePort); DatagramPacket packet = new DatagramPacket(data, data.length, this.remoteHost, this.remotePort);
...@@ -46,17 +48,18 @@ public class UDPSink implements Sink { ...@@ -46,17 +48,18 @@ public class UDPSink implements Sink {
} }
} }
public int getPort() {
return this.sock.getLocalPort();
}
/** to pass to UDPSource constructor */
public DatagramSocket getSocket() {
return this.sock;
}
public void stop() {
this.sock.close();
}
......
...@@ -28,6 +28,13 @@ public class UDPSource implements Source, Runnable { ...@@ -28,6 +28,13 @@ public class UDPSource implements Source, Runnable {
// create thread // create thread
this.thread = new Thread(this); this.thread = new Thread(this);
} }
/** use socket from UDPSink */
public UDPSource(DatagramSocket sock) {
this.sink = null;
this.sock = sock;
this.thread = new Thread(this);
}
public void setSink(Sink sink) { public void setSink(Sink sink) {
this.sink = sink; this.sink = sink;
...@@ -57,13 +64,14 @@ public class UDPSource implements Source, Runnable { ...@@ -57,13 +64,14 @@ public class UDPSource implements Source, Runnable {
//System.out.print("i"); //System.out.print("i");
} catch(Exception e) { } catch(Exception e) {
e.printStackTrace(); e.printStackTrace();
break;
} }
} }
} }
public void stop() {
this.sock.close();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment