From 7f30f481b23f91e0d1d5c4baa938ee5a578f97c3 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sat, 9 May 2015 22:22:13 +0000 Subject: [PATCH] i2ptunnel: Improve error handling for UDP tunnels, Sink.send() may now throw RuntimeException, converted from IOException or I2PSessionException; interrupt runner threads on error; ignore I2PSessionException in Pinger.close(); logging and javadoc improvements; untested --- .../net/i2p/i2ptunnel/socks/MultiSink.java | 4 ++++ .../net/i2p/i2ptunnel/socks/ReplyTracker.java | 4 ++++ .../net/i2p/i2ptunnel/socks/SOCKSUDPPort.java | 4 ++++ .../i2ptunnel/socks/SOCKSUDPUnwrapper.java | 2 ++ .../i2p/i2ptunnel/socks/SOCKSUDPWrapper.java | 2 ++ .../i2p/i2ptunnel/streamr/MultiSource.java | 4 ++++ .../src/net/i2p/i2ptunnel/streamr/Pinger.java | 19 ++++++++++++++++--- .../net/i2p/i2ptunnel/streamr/Subscriber.java | 16 ++++++++++++++-- .../src/net/i2p/i2ptunnel/udp/I2PSink.java | 10 ++++++---- .../i2p/i2ptunnel/udp/I2PSinkAnywhere.java | 10 ++++++---- .../src/net/i2p/i2ptunnel/udp/I2PSource.java | 19 +++++++++++++++---- .../java/src/net/i2p/i2ptunnel/udp/Sink.java | 4 ++++ .../src/net/i2p/i2ptunnel/udp/UDPSink.java | 19 +++++++++++++------ .../src/net/i2p/i2ptunnel/udp/UDPSource.java | 12 ++++++++++-- .../udpTunnel/I2PTunnelUDPClientBase.java | 1 + .../udpTunnel/I2PTunnelUDPServerBase.java | 1 + 16 files changed, 106 insertions(+), 25 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/MultiSink.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/MultiSink.java index c34616a031..bc2af15143 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/MultiSink.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/MultiSink.java @@ -22,6 +22,10 @@ public class MultiSink<S extends Sink> implements Source, Sink { public void start() {} + /** + * May throw RuntimeException from underlying sinks + * @throws RuntimeException + */ public void send(Destination from, byte[] data) { Sink s = this.cache.get(from); if (s == null) { diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/ReplyTracker.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/ReplyTracker.java index 7f7e0a030f..7e99efa420 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/ReplyTracker.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/ReplyTracker.java @@ -23,6 +23,10 @@ public class ReplyTracker<S extends Sink> implements Source, Sink { public void start() {} + /** + * May throw RuntimeException from underlying sink + * @throws RuntimeException + */ public void send(Destination to, byte[] data) { this.cache.put(to, this.reply); this.sink.send(to, data); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPPort.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPPort.java index 1f414f1343..2064d698b4 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPPort.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPPort.java @@ -64,6 +64,10 @@ public class SOCKSUDPPort implements Source, Sink { this.udpsource.stop(); } + /** + * May throw RuntimeException from underlying sink + * @throws RuntimeException + */ public void send(Destination from, byte[] data) { this.wrapper.send(from, data); } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPUnwrapper.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPUnwrapper.java index e446398e79..89a27cde1a 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPUnwrapper.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPUnwrapper.java @@ -30,6 +30,8 @@ public class SOCKSUDPUnwrapper implements Source, Sink { /** * + * May throw RuntimeException from underlying sink + * @throws RuntimeException */ public void send(Destination ignored_from, byte[] data) { SOCKSHeader h; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPWrapper.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPWrapper.java index eab1568411..daad929f4a 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPWrapper.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPWrapper.java @@ -25,6 +25,8 @@ public class SOCKSUDPWrapper implements Source, Sink { /** * Use the cached header, which should have the host string and port * + * May throw RuntimeException from underlying sink + * @throws RuntimeException */ public void send(Destination from, byte[] data) { if (this.sink == null) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java index 0bb6400ec8..7de086e78c 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java @@ -27,6 +27,10 @@ public class MultiSource implements Source, Sink { this.sinks.clear(); } + /** + * May throw RuntimeException from underlying sinks + * @throws RuntimeException + */ public void send(Destination ignored_from, byte[] data) { for(Destination dest : this.sinks) { this.sink.send(dest, data); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Pinger.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Pinger.java index b467652876..dc4a621ea6 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Pinger.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Pinger.java @@ -1,7 +1,9 @@ package net.i2p.i2ptunnel.streamr; import net.i2p.i2ptunnel.udp.*; +import net.i2p.I2PAppContext; import net.i2p.util.I2PAppThread; +import net.i2p.util.Log; /** * @@ -31,7 +33,9 @@ public class Pinger implements Source, Runnable { // send unsubscribe-message byte[] data = new byte[1]; data[0] = 1; - this.sink.send(null, data); + try { + this.sink.send(null, data); + } catch (RuntimeException re) {} } public void run() { @@ -41,7 +45,14 @@ public class Pinger implements Source, Runnable { int i = 0; while(this.running) { //System.out.print("p"); - this.sink.send(null, data); + try { + this.sink.send(null, data); + } catch (RuntimeException re) { + Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); + if (log.shouldWarn()) + log.warn("error sending", re); + break; + } synchronized(this.waitlock) { int delay = 10000; if (i < 5) { @@ -50,7 +61,9 @@ public class Pinger implements Source, Runnable { } try { this.waitlock.wait(delay); - } catch(InterruptedException ie) {} + } catch(InterruptedException ie) { + break; + } } } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Subscriber.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Subscriber.java index 300832a2f5..5bf89c945d 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Subscriber.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Subscriber.java @@ -2,9 +2,11 @@ package net.i2p.i2ptunnel.streamr; import java.util.Set; +import net.i2p.I2PAppContext; import net.i2p.data.Destination; import net.i2p.i2ptunnel.udp.*; import net.i2p.util.ConcurrentHashSet; +import net.i2p.util.Log; /** * server-mode @@ -19,10 +21,18 @@ public class Subscriber implements Sink { this.subscriptions = new ConcurrentHashSet<Destination>(); } + /** + * Doesn't really "send" anywhere, just subscribes or unsubscribes the destination + * + * @param dest to subscribe or unsubscribe + * @param data must be a single byte, 0 to subscribe, 1 to unsubscribe + */ public void send(Destination dest, byte[] data) { if(dest == null || data.length < 1) { // invalid packet - // TODO: write to log + Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); + if (log.shouldWarn()) + log.warn("bad subscription from " + dest); } else { byte ctrl = data[0]; if(ctrl == 0) { @@ -40,7 +50,9 @@ public class Subscriber implements Sink { multi.remove(dest); } else { // invalid packet - // TODO: write to log + Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); + if (log.shouldWarn()) + log.warn("bad subscription from " + dest); } } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSink.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSink.java index e4f47cdafb..7052009269 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSink.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSink.java @@ -32,7 +32,10 @@ public class I2PSink implements Sink { } } - /** @param src ignored */ + /** + * @param src ignored + * @throws RuntimeException if session is closed + */ public synchronized void send(Destination src, byte[] data) { //System.out.print("w"); // create payload @@ -49,9 +52,8 @@ public class I2PSink implements Sink { this.sess.sendMessage(this.dest, payload, (this.raw ? I2PSession.PROTO_DATAGRAM_RAW : I2PSession.PROTO_DATAGRAM), I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); - } catch(I2PSessionException exc) { - // TODO: handle better - exc.printStackTrace(); + } catch (I2PSessionException ise) { + throw new RuntimeException("failed to send data", ise); } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSinkAnywhere.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSinkAnywhere.java index e33d93d107..9308bd379e 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSinkAnywhere.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSinkAnywhere.java @@ -31,7 +31,10 @@ public class I2PSinkAnywhere implements Sink { } } - /** @param to - where it's going */ + /** + * @param to - where it's going + * @throws RuntimeException if session is closed + */ public synchronized void send(Destination to, byte[] data) { // create payload byte[] payload; @@ -47,9 +50,8 @@ public class I2PSinkAnywhere implements Sink { this.sess.sendMessage(to, payload, (this.raw ? I2PSession.PROTO_DATAGRAM_RAW : I2PSession.PROTO_DATAGRAM), I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); - } catch(I2PSessionException exc) { - // TODO: handle better - exc.printStackTrace(); + } catch (I2PSessionException ise) { + throw new RuntimeException("failed to send data", ise); } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSource.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSource.java index 2f59418a52..e50e1a051f 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSource.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSource.java @@ -3,10 +3,12 @@ package net.i2p.i2ptunnel.udp; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import net.i2p.I2PAppContext; import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionListener; import net.i2p.client.datagram.I2PDatagramDissector; import net.i2p.util.I2PAppThread; +import net.i2p.util.Log; /** * @@ -48,7 +50,8 @@ public class I2PSource implements Source, Runnable { public void run() { // create dissector I2PDatagramDissector diss = new I2PDatagramDissector(); - while(true) { + _running = true; + while (_running) { try { // get id int id = this.queue.take(); @@ -71,7 +74,10 @@ public class I2PSource implements Source, Runnable { } //System.out.print("r"); } catch(Exception e) { - e.printStackTrace(); + Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); + if (log.shouldWarn()) + log.warn("error sending", e); + break; } } } @@ -91,11 +97,15 @@ public class I2PSource implements Source, Runnable { } public void disconnected(I2PSession arg0) { - // ignore + _running = false; + thread.interrupt(); } public void errorOccurred(I2PSession arg0, String arg1, Throwable arg2) { - // ignore + Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); + log.error(arg1, arg2); + _running = false; + thread.interrupt(); } } @@ -106,4 +116,5 @@ public class I2PSource implements Source, Runnable { protected final Thread thread; protected final boolean verify; protected final boolean raw; + private volatile boolean _running; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Sink.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Sink.java index a3ddc6b84e..b125391605 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Sink.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Sink.java @@ -7,5 +7,9 @@ import net.i2p.data.Destination; * @author welterde */ public interface Sink { + /** + * @param src some implementations may ignore + * @throws RuntimeException in some implementations + */ public void send(Destination src, byte[] data); } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java index 0b5905ad7e..c9a9ab3f35 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java @@ -1,5 +1,6 @@ package net.i2p.i2ptunnel.udp; +import java.io.IOException; import java.net.DatagramSocket; import java.net.DatagramPacket; import java.net.InetAddress; @@ -12,13 +13,16 @@ import net.i2p.data.Destination; */ public class UDPSink implements Sink { + /** + * @param src ignored + * @throws IllegalArgumentException on DatagramSocket IOException + */ public UDPSink(InetAddress host, int port) { // create socket try { this.sock = new DatagramSocket(); - } catch(Exception e) { - // TODO: fail better - throw new RuntimeException("failed to open udp-socket", e); + } catch (IOException e) { + throw new IllegalArgumentException("failed to open udp-socket", e); } this.remoteHost = host; @@ -27,6 +31,10 @@ public class UDPSink implements Sink { this.remotePort = port; } + /** + * @param src ignored + * @throws RuntimeException on DatagramSocket IOException + */ public void send(Destination src, byte[] data) { // if data.length > this.sock.getSendBufferSize() ... @@ -36,9 +44,8 @@ public class UDPSink implements Sink { // send packet try { this.sock.send(packet); - } catch(Exception e) { - // TODO: fail a bit better - e.printStackTrace(); + } catch (IOException ioe) { + throw new RuntimeException("failed to send data", ioe); } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java index 23d4b8048b..e543941076 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java @@ -1,9 +1,12 @@ package net.i2p.i2ptunnel.udp; +import java.io.IOException; import java.net.DatagramSocket; import java.net.DatagramPacket; +import net.i2p.I2PAppContext; import net.i2p.util.I2PAppThread; +import net.i2p.util.Log; /** * @@ -12,11 +15,14 @@ import net.i2p.util.I2PAppThread; public class UDPSource implements Source, Runnable { public static final int MAX_SIZE = 15360; + /** + * @throws RuntimeException on DatagramSocket IOException + */ public UDPSource(int port) { // create udp-socket try { this.sock = new DatagramSocket(port); - } catch(Exception e) { + } catch (IOException e) { throw new RuntimeException("failed to listen...", e); } @@ -57,7 +63,9 @@ public class UDPSource implements Source, Runnable { this.sink.send(null, nbuf); //System.out.print("i"); } catch(Exception e) { - e.printStackTrace(); + Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); + if (log.shouldWarn()) + log.warn("error sending", e); break; } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java index f1e8abaabe..a89e348986 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java @@ -180,6 +180,7 @@ import net.i2p.util.EventDispatcher; * * @param to - ignored if configured for a single destination * (we use the dest specified in the constructor) + * @throws RuntimeException if session is closed */ public void send(Destination to, byte[] data) { _i2pSink.send(to, data); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java index dc17522b88..b5fc94b2eb 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java @@ -195,6 +195,7 @@ public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sin * Sink Methods * * @param to + * @throws RuntimeException if session is closed * */ public void send(Destination to, byte[] data) { -- GitLab