From 0f54ba59fb7e72f2278f7392e188b2204ac3433e Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Sun, 26 Sep 2004 15:32:24 +0000 Subject: [PATCH] die phttp die --- .../router/transport/phttp/PHTTPPoller.java | 240 -------------- .../router/transport/phttp/PHTTPSender.java | 285 ----------------- .../transport/phttp/PHTTPTransport.java | 300 ------------------ 3 files changed, 825 deletions(-) delete mode 100644 router/java/src/net/i2p/router/transport/phttp/PHTTPPoller.java delete mode 100644 router/java/src/net/i2p/router/transport/phttp/PHTTPSender.java delete mode 100644 router/java/src/net/i2p/router/transport/phttp/PHTTPTransport.java diff --git a/router/java/src/net/i2p/router/transport/phttp/PHTTPPoller.java b/router/java/src/net/i2p/router/transport/phttp/PHTTPPoller.java deleted file mode 100644 index 51f4d84ab7..0000000000 --- a/router/java/src/net/i2p/router/transport/phttp/PHTTPPoller.java +++ /dev/null @@ -1,240 +0,0 @@ -package net.i2p.router.transport.phttp; -/* - * free (adj.): unencumbered; not under the control of others - * Written by jrandom in 2003 and released into the public domain - * with no warranty of any kind, either expressed or implied. - * It probably won't make your computer catch on fire, or eat - * your children, but it might. Use at your own risk. - * - */ - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Date; - -import net.i2p.data.DataFormatException; -import net.i2p.data.DataHelper; -import net.i2p.data.Signature; -import net.i2p.data.i2np.I2NPMessage; -import net.i2p.data.i2np.I2NPMessageException; -import net.i2p.data.i2np.I2NPMessageHandler; -import net.i2p.router.RouterContext; -import net.i2p.util.I2PThread; -import net.i2p.util.Log; - -class PHTTPPoller { - private Log _log; - private PHTTPTransport _transport; - private URL _pollURL; - private Poller _poller; - private RouterContext _context; - private boolean _polling; - - public PHTTPPoller(RouterContext context, PHTTPTransport transport) { - _context = context; - _log = context.logManager().getLog(PHTTPPoller.class); - _transport = transport; - _pollURL = null; - _poller = new Poller(); - _polling = false; - } - - public synchronized void startPolling() { - if (_polling) return; - _polling = true; - - try { - _pollURL = new URL(_transport.getMyPollURL()); - } catch (MalformedURLException mue) { - _log.error("Invalid polling URL [" + _transport.getMyPollURL() + "]", mue); - return; - } - Thread t = new I2PThread(_poller); - t.setName("HTTP Poller"); - t.setDaemon(true); - t.setPriority(I2PThread.MIN_PRIORITY); - t.start(); - } - - public void stopPolling() { - _poller.stopPolling(); - } - - private byte[] getAuthData() { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(4); - long nonce = _context.random().nextInt(Integer.MAX_VALUE); - _log.debug("Creating nonce with value [" + nonce + "]"); - DataHelper.writeLong(baos, 4, nonce); - byte nonceData[] = baos.toByteArray(); - Signature sig = _context.dsa().sign(nonceData, _transport.getMySigningKey()); - baos = new ByteArrayOutputStream(512); - DataHelper.writeLong(baos, 4, nonce); - sig.writeBytes(baos); - byte data[] = baos.toByteArray(); - return data; - } catch (NumberFormatException nfe) { - _log.error("Error writing the authentication data", nfe); - return null; - } catch (DataFormatException dfe) { - _log.error("Error formatting the authentication data", dfe); - return null; - } catch (IOException ioe) { - _log.error("Error writing the authentication data", ioe); - return null; - } - } - - public final static String CONFIG_POLL = "i2np.phttp.shouldPoll"; - public final static boolean DEFAULT_POLL = false; - - boolean shouldRejectMessages() { - String val = _context.router().getConfigSetting(CONFIG_POLL); - if (null == val) { - return !DEFAULT_POLL; - } else { - return !("true".equals(val)); - } - } - - class Poller implements Runnable { - private boolean _running; - private I2NPMessageHandler _handler = new I2NPMessageHandler(_context); - public void run() { - _running = true; - // wait 5 seconds before starting to poll so we don't drop too many messages - try { Thread.sleep(10*1000); } catch (InterruptedException ie) {} - - _log.debug("Poller running with delay [" + _transport.getPollFrequencyMs() + "]"); - try { - while (_running) { - int numRead = getMessages(); - if (numRead > 0) - _log.info("# messages found: " + numRead); - try { Thread.sleep(_transport.getPollFrequencyMs()); } catch (InterruptedException ie) {} - } - } catch (Throwable t) { - _log.info("Error while polling", t); - } - } - - private int getMessages() { - // open the _pollURL, authenticate ourselves, and get any messages available - byte authData[] = getAuthData(); - if (authData == null) return 0; - - //_context.bandwidthLimiter().delayOutbound(null, authData.length + 512, false); // HTTP overhead - - try { - _log.debug("Before opening " + _pollURL.toExternalForm()); - HttpURLConnection con = (HttpURLConnection)_pollURL.openConnection(); - // send the info - con.setRequestMethod("POST"); - con.setUseCaches(false); - con.setDoOutput(true); - con.setDoInput(true); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(authData.length + 64); - String target = _transport.getMyIdentity().getHash().toBase64(); - baos.write("target=".getBytes()); - baos.write(target.getBytes()); - baos.write("&".getBytes()); - baos.write(authData); - byte data[] = baos.toByteArray(); - //_log.debug("Data to be sent: " + Base64.encode(data)); - - con.setRequestProperty("Content-length", ""+data.length); - con.getOutputStream().write(data); - _log.debug("Data sent, before reading results of poll for [" + target + "]"); - - con.connect(); - - // fetch the results - int rc = con.getResponseCode(); - _log.debug("Response code: " + rc); - switch (rc) { - case 200: // ok - _log.debug("Polling can progress"); - break; - case 401: // signature failed - _log.error("Signature failed during polling???"); - return 0; - case 404: // not yet registered - _log.error("Not registered with the relay - reregistering (in case they failed)"); - _transport.registerWithRelay(); - return 0; - default: // unknown - _log.error("Invalid error code returned: " + rc); - return 0; - } - - InputStream in = con.getInputStream(); - Date peerTime = DataHelper.readDate(in); - long offset = peerTime.getTime() - System.currentTimeMillis(); - if (_transport.getTrustTime()) { - _log.info("Updating time offset to " + offset + " (old offset: " + _context.clock().getOffset() + ")"); - _context.clock().setOffset(offset); - } - - boolean shouldReject = shouldRejectMessages(); - if (shouldReject) { - _log.debug("Rejecting any messages [we just checked in so we could get the time]"); - return 0; - } - - int numMessages = (int)DataHelper.readLong(in, 2); - if ( (numMessages > 100) || (numMessages < 0) ) { - _log.error("Invalid # messages specified [" + numMessages + "], skipping"); - return 0; - } - - int bytesRead = 512; // HTTP overhead - - int numSuccessful = 0; - for (int i = 0; i < numMessages; i++) { - _log.debug("Receiving message " + (i+1) + " of "+ numMessages + " pending"); - long len = DataHelper.readLong(in, 4); - byte msgBuf[] = new byte[(int)len]; - int read = DataHelper.read(in, msgBuf); - if (read == -1) { - _log.error("Unable to read the message as we encountered an EOF"); - return i - 1; - } else if (read != len) { - _log.error("Unable to read the message fully [" + read + " read, " + len + " expected]"); - return i - 1; - } else { - bytesRead += 4 + read; - try { - I2NPMessage msg = _handler.readMessage(new ByteArrayInputStream(msgBuf)); - if (msg == null) { - _log.warn("PHTTP couldn't read a message from the peer out of a " + len + " byte buffer"); - } else { - _log.info("Receive message " + (i+1) + " of " + numMessages + ": " + msg.getClass().getName()); - _transport.messageReceived(msg, null, null, _handler.getLastReadTime(), (int)len); - numSuccessful++; - } - } catch (IOException ioe) { - _log.warn("Unable to read the message fully", ioe); - } catch (I2NPMessageException ime) { - _log.warn("Poorly formatted message", ime); - } - } - } - - //_context.bandwidthLimiter().delayInbound(null, bytesRead); - - return numSuccessful; - } catch (Throwable t) { - _log.debug("Error polling", t); - return 0; - } - } - - public void stopPolling() { _running = false; } - } -} diff --git a/router/java/src/net/i2p/router/transport/phttp/PHTTPSender.java b/router/java/src/net/i2p/router/transport/phttp/PHTTPSender.java deleted file mode 100644 index 1bd372e0d3..0000000000 --- a/router/java/src/net/i2p/router/transport/phttp/PHTTPSender.java +++ /dev/null @@ -1,285 +0,0 @@ -package net.i2p.router.transport.phttp; -/* - * free (adj.): unencumbered; not under the control of others - * Written by jrandom in 2003 and released into the public domain - * with no warranty of any kind, either expressed or implied. - * It probably won't make your computer catch on fire, or eat - * your children, but it might. Use at your own risk. - * - */ - -import java.io.BufferedReader; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Date; -import java.util.Iterator; - -import net.i2p.data.RouterAddress; -import net.i2p.router.OutNetMessage; -import net.i2p.router.RouterContext; -import net.i2p.util.I2PThread; -import net.i2p.util.Log; - -class PHTTPSender { - private Log _log; - private RouterContext _context; - private PHTTPTransport _transport; - private volatile long _sendId = 0; - - public final static long RECHECK_DELAY = 1000; // 1 sec - public final static long HARD_TIMEOUT = 30*1000; // no timeouts > 30 seconds - - /** H(routerIdent).toBase64() of the target to receive the message */ - public final static String PARAM_SEND_TARGET = "target"; - /** # ms to wait for the message to be delivered before failing it */ - public final static String PARAM_SEND_TIMEOUTMS = "timeoutMs"; - /** # bytes to be sent in the message */ - public final static String PARAM_SEND_DATA_LENGTH = "dataLength"; - /** local time in ms */ - public final static String PARAM_SEND_TIME = "localTime"; - - private final static String PROP_STATUS = "status"; - private final static String STATUS_OK = "accepted"; - private final static String STATUS_PENDING = "pending"; - private final static String STATUS_CLOCKSKEW = "clockSkew_"; /** prefix for (remote-local) */ - - /** HTTP error code if the target is known and accepting messages */ - public final static int CODE_OK = 201; // created - /** HTTP error code if the target is not known or is not accepting messages */ - public final static int CODE_FAIL = 410; // gone - - /* the URL to check to see when the message is delivered */ - public final static String PROP_CHECK_URL = "statusCheckURL"; - - /** HTTP error code if the message was sent completely */ - public final static int CODE_NOT_PENDING = 410; // gone - /** HTTP error code if the message is still pending */ - public final static int CODE_PENDING = 204; // ok, but no content - - public PHTTPSender(RouterContext context, PHTTPTransport transport) { - _context = context; - _log = context.logManager().getLog(PHTTPSender.class); - _transport = transport; - } - - public void send(OutNetMessage msg) { - _log.debug("Sending message " + msg.getMessage().getClass().getName() + " to " + msg.getTarget().getIdentity().getHash().toBase64()); - Thread t = new I2PThread(new Send(msg)); - t.setName("PHTTP Sender " + (_sendId++)); - t.setDaemon(true); - t.start(); - } - - class Send implements Runnable { - private OutNetMessage _msg; - public Send(OutNetMessage msg) { - _msg = msg; - } - public void run() { - boolean ok = false; - try { - ok = doSend(_msg); - } catch (IOException ioe) { - _log.error("Error sending the message", ioe); - } - _transport.afterSend(_msg, ok); - } - } - - private boolean doSend(OutNetMessage msg) throws IOException { - long delay = 0; // _context.bandwidthLimiter().calculateDelayOutbound(msg.getTarget().getIdentity(), (int)msg.getMessageSize()); - _log.debug("Delaying [" + delay + "ms]"); - try { Thread.sleep(delay); } catch (InterruptedException ie) {} - _log.debug("Continuing with sending"); - // now send - URL sendURL = getURL(msg); - if (sendURL == null) { - _log.debug("No URL to send"); - return false; - } else { - _log.debug("Sending to " + sendURL.toExternalForm()); - HttpURLConnection con = (HttpURLConnection)sendURL.openConnection(); - // send the info - con.setRequestMethod("POST"); - con.setUseCaches(false); - con.setDoOutput(true); - con.setDoInput(true); - - byte data[] = getData(msg); - if (data == null) return false; - - //_context.bandwidthLimiter().delayOutbound(msg.getTarget().getIdentity(), data.length+512, false); // HTTP overhead - - con.setRequestProperty("Content-length", ""+data.length); - OutputStream out = con.getOutputStream(); - out.write(data); - out.flush(); - _log.debug("Data sent, before reading"); - - // fetch the results - String checkURL = getCheckURL(con); - if (checkURL != null) { - _log.debug("Message sent"); - return checkDelivery(checkURL, msg); - } else { - _log.warn("Target not known or unable to send to " + msg.getTarget().getIdentity().getHash().toBase64()); - return false; - } - } - } - - private String getCheckURL(HttpURLConnection con) throws IOException { - BufferedReader reader = new BufferedReader(new InputStreamReader(con.getInputStream())); - String statusLine = reader.readLine(); - if (statusLine == null) { - _log.error("Null response line when checking URL"); - return null; - } - boolean statusOk = false; - if (!statusLine.startsWith(PROP_STATUS)) { - _log.warn("Response does not begin with status [" + statusLine + "]"); - return null; - } else { - String statVal = statusLine.substring(PROP_STATUS.length() + 1); - statusOk = STATUS_OK.equals(statVal); - - if (!statusOk) { - _log.info("Status was not ok for sending [" + statVal + "]"); - return null; - } - } - - String checkURL = reader.readLine(); - if (!checkURL.startsWith(PROP_CHECK_URL)) { - _log.warn("Incorrect OK response: " + checkURL); - return null; - } else { - String checkURLStr = checkURL.substring(PROP_CHECK_URL.length()+1); - _log.debug("Check URL = [" + checkURLStr + "]"); - return checkURLStr; - } - } - - private boolean checkDelivery(String checkURLStr, OutNetMessage msg) { - long now = _context.clock().now(); - long expiration = msg.getExpiration(); - if (expiration <= now) - expiration = now + HARD_TIMEOUT; - - _log.debug("Check delivery [expiration = " + new Date(expiration) + "]"); - try { - URL checkStatusURL = new URL(checkURLStr); - long delay = RECHECK_DELAY; - do { - //_context.bandwidthLimiter().delayOutbound(msg.getTarget().getIdentity(), 512, false); // HTTP overhead - //_context.bandwidthLimiter().delayInbound(msg.getTarget().getIdentity(), 512); // HTTP overhead - - _log.debug("Checking delivery at " + checkURLStr); - HttpURLConnection con = (HttpURLConnection)checkStatusURL.openConnection(); - con.setRequestMethod("GET"); - //con.setInstanceFollowRedirects(false); // kaffe doesn't support this (yet) - con.setDoInput(true); - con.setDoOutput(false); - con.setUseCaches(false); - con.connect(); - - boolean isPending = getIsPending(con); - if (!isPending) { - _log.info("Check delivery successful for message " + msg.getMessage().getClass().getName()); - return true; - } - - if (now + delay > expiration) - delay = expiration - now - 30; // 30 = kludgy # for the next 4 statements - _log.debug("Still pending (wait " + delay + "ms)"); - Thread.sleep(delay); - //delay += RECHECK_DELAY; - - now = _context.clock().now(); - } while (now < expiration); - _log.warn("Timeout for checking delivery to " + checkURLStr + " for message " + msg.getMessage().getClass().getName()); - } catch (Throwable t) { - _log.debug("Error checking for delivery", t); - } - return false; - } - - private boolean getIsPending(HttpURLConnection con) throws IOException { - int len = con.getContentLength(); - int rc = con.getResponseCode(); - BufferedReader reader = new BufferedReader(new InputStreamReader(con.getInputStream())); - String statusLine = reader.readLine(); - if (statusLine == null) { - _log.warn("Server didn't send back a status line [len = " + len + ", rc = " + rc + "]"); - return false; - } - boolean statusPending = false; - if (!statusLine.startsWith(PROP_STATUS)) { - _log.warn("Response does not begin with status [" + statusLine + "]"); - return false; - } else { - String statVal = statusLine.substring(PROP_STATUS.length() + 1); - statusPending = STATUS_PENDING.equals(statVal); - if (statVal.startsWith(STATUS_CLOCKSKEW)) { - long skew = Long.MAX_VALUE; - String skewStr = statVal.substring(STATUS_CLOCKSKEW.length()+1); - try { - skew = Long.parseLong(skewStr); - } catch (Throwable t) { - _log.error("Unable to decode the clock skew [" + skewStr + "]"); - skew = Long.MAX_VALUE; - } - _log.error("Clock skew talking with phttp relay: " + skew + "ms (remote-local)"); - } - return statusPending; - } - } - - private byte[] getData(OutNetMessage msg) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream((int)(msg.getMessageSize() + 64)); - String target = msg.getTarget().getIdentity().getHash().toBase64(); - StringBuffer buf = new StringBuffer(); - buf.append(PARAM_SEND_TARGET).append('=').append(target).append('&'); - buf.append(PARAM_SEND_TIMEOUTMS).append('=').append(msg.getExpiration() - _context.clock().now()).append('&'); - buf.append(PARAM_SEND_DATA_LENGTH).append('=').append(msg.getMessageSize()).append('&'); - buf.append(PARAM_SEND_TIME).append('=').append(_context.clock().now()).append('&').append('\n'); - baos.write(buf.toString().getBytes()); - baos.write(msg.getMessageData()); - byte data[] = baos.toByteArray(); - _log.debug("Data to be sent: " + data.length); - return data; - } catch (Throwable t) { - _log.error("Error preparing the data", t); - return null; - } - } - - private URL getURL(OutNetMessage msg) { - for (Iterator iter = msg.getTarget().getAddresses().iterator(); iter.hasNext(); ) { - RouterAddress addr = (RouterAddress)iter.next(); - URL url = getURL(addr); - if (url != null) return url; - } - _log.warn("No URLs could be constructed to send to " + msg.getTarget().getIdentity().getHash().toBase64()); - return null; - } - - private URL getURL(RouterAddress addr) { - if (PHTTPTransport.STYLE.equals(addr.getTransportStyle())) { - String url = addr.getOptions().getProperty(PHTTPTransport.PROP_TO_SEND_URL); - if (url == null) return null; - try { - return new URL(url); - } catch (MalformedURLException mue) { - _log.info("Address has a bad url [" + url + "]", mue); - } - } - return null; - } -} diff --git a/router/java/src/net/i2p/router/transport/phttp/PHTTPTransport.java b/router/java/src/net/i2p/router/transport/phttp/PHTTPTransport.java deleted file mode 100644 index 6b500eb93f..0000000000 --- a/router/java/src/net/i2p/router/transport/phttp/PHTTPTransport.java +++ /dev/null @@ -1,300 +0,0 @@ -package net.i2p.router.transport.phttp; -/* - * free (adj.): unencumbered; not under the control of others - * Written by jrandom in 2003 and released into the public domain - * with no warranty of any kind, either expressed or implied. - * It probably won't make your computer catch on fire, or eat - * your children, but it might. Use at your own risk. - * - */ - -import java.io.BufferedReader; -import java.io.ByteArrayOutputStream; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.Date; -import java.util.Properties; - -import net.i2p.data.DataHelper; -import net.i2p.data.RouterAddress; -import net.i2p.data.RouterIdentity; -import net.i2p.data.RouterInfo; -import net.i2p.data.SigningPrivateKey; -import net.i2p.router.JobImpl; -import net.i2p.router.OutNetMessage; -import net.i2p.router.RouterContext; -import net.i2p.router.transport.TransportBid; -import net.i2p.router.transport.TransportImpl; -import net.i2p.util.Log; - -/** - * - * - */ -public class PHTTPTransport extends TransportImpl { - private Log _log; - public final static String STYLE = "PHTTP"; - private RouterAddress _myAddress; - private String _mySendURL; - private String _myPollURL; - private String _myRegisterURL; - private long _timeOffset; - private long _pollFrequencyMs; - private int _transportCost; - private PHTTPPoller _poller; - private PHTTPSender _sender; - private boolean _trustTime; - - /** how long after a registration failure should we delay? this gets doubled each time */ - private long _nextRegisterDelay = 1000; - - /** if the phttp relay is down, check it up to once every 5 minutes */ - private final static long MAX_REGISTER_DELAY = 5*60*1000; - - /** URL to which registration with the server can occur */ - public final static String PROP_TO_REGISTER_URL = "registerURL"; - /** URL to which messages destined for this address can be sent */ - public final static String PROP_TO_SEND_URL = "sendURL"; - - public final static String PROP_LOCALTIME = "localtime"; - - /* key=val keys sent back on registration */ - public final static String PROP_STATUS = "status"; - public final static String PROP_POLL_URL = "pollURL"; - public final static String PROP_SEND_URL = "sendURL"; - public final static String PROP_TIME_OFFSET = "timeOffset"; // ms (remote-local) - - /* values for the PROP_STATUS */ - public final static String STATUS_FAILED = "failed"; - public final static String STATUS_REGISTERED = "registered"; - - public final static String CONFIG_POLL_FREQUENCY = "i2np.phttp.pollFrequencySeconds"; - public final static long DEFAULT_POLL_FREQUENCY = 60*1000; // every 60 seconds - - /** - * do we want to assume that the relay's clock is sync'ed with NTP and update - * our offset according to what they say? - */ - public final static String CONFIG_TRUST_TIME = "i2np.phttp.trustRelayTime"; - public final static boolean DEFAULT_TRUST_TIME = true; - - public PHTTPTransport(RouterContext ctx, RouterAddress myAddress) { - super(ctx); - _log = ctx.logManager().getLog(PHTTPTransport.class); - _myAddress = myAddress; - - if (myAddress != null) { - Properties opts = myAddress.getOptions(); - _myRegisterURL = opts.getProperty(PROP_TO_REGISTER_URL); - _mySendURL = opts.getProperty(PROP_TO_SEND_URL); - _pollFrequencyMs = DEFAULT_POLL_FREQUENCY; - String pollFreq = _context.router().getConfigSetting(CONFIG_POLL_FREQUENCY); - if (pollFreq != null) { - try { - long val = Long.parseLong(pollFreq); - _pollFrequencyMs = val*1000; - _log.info("PHTTP Polling Frequency specified as once every " + val + " seconds"); - } catch (NumberFormatException nfe) { - _log.error("Poll frequency is not valid (" + pollFreq + ")", nfe); - } - } else { - _log.info("PHTTP Polling Frequency not specified via (" + CONFIG_POLL_FREQUENCY + "), defaulting to once every " + (DEFAULT_POLL_FREQUENCY/1000) + " seconds"); - } - - String trustTime = _context.router().getConfigSetting(CONFIG_TRUST_TIME); - if (trustTime != null) { - _trustTime = Boolean.TRUE.toString().equalsIgnoreCase(trustTime); - } else { - _trustTime = DEFAULT_TRUST_TIME; - } - - _context.jobQueue().addJob(new RegisterJob()); - } - _sender = new PHTTPSender(_context, this); - _poller = new PHTTPPoller(_context, this); - } - - public String getMySendURL() { return _mySendURL; } - SigningPrivateKey getMySigningKey() { return _context.keyManager().getSigningPrivateKey(); } - RouterIdentity getMyIdentity() { return _context.router().getRouterInfo().getIdentity(); } - String getMyPollURL() { return _myPollURL; } - long getPollFrequencyMs() { return _pollFrequencyMs; } - - private class RegisterJob extends JobImpl { - public RegisterJob() { - super(PHTTPTransport.this._context); - } - public String getName() { return "Register with PHTTP relay"; } - public void runJob() { - boolean ok = doRegisterWithRelay(); - if (ok) { - _log.debug("Registration successful with the last registration delay of " + _nextRegisterDelay + "ms"); - _poller.startPolling(); - } else { - _nextRegisterDelay = _nextRegisterDelay * 2; - if (_nextRegisterDelay > MAX_REGISTER_DELAY) - _nextRegisterDelay = MAX_REGISTER_DELAY; - long nextRegister = _context.clock().now() + _nextRegisterDelay; - _log.debug("Registration failed, next registration attempt in " + _nextRegisterDelay + "ms"); - requeue(nextRegister); - } - } - } - - boolean registerWithRelay() { - boolean ok = doRegisterWithRelay(); - if (ok) { - _log.info("Registered with PHTTP relay"); - return ok; - } - _log.error("Unable to register with relay"); - return false; - } - - synchronized boolean doRegisterWithRelay() { - _log.debug("Beginning registration"); - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - try { - DataHelper.writeDate(baos, new Date(_context.clock().now())); - _context.router().getRouterInfo().getIdentity().writeBytes(baos); - int postLength = baos.size(); - - //_context.bandwidthLimiter().delayOutbound(null, postLength+512, false); // HTTP overhead - //_context.bandwidthLimiter().delayInbound(null, 2048+512); // HTTP overhead - - long now = _context.clock().now(); - _log.debug("Before opening " + _myRegisterURL); - URL url = new URL(_myRegisterURL); - HttpURLConnection con = (HttpURLConnection)url.openConnection(); - // send the info - con.setRequestMethod("POST"); - con.setUseCaches(false); - con.setDoOutput(true); - con.setDoInput(true); - con.setRequestProperty("Content-length", ""+postLength); - baos.writeTo(con.getOutputStream()); - _log.debug("Data sent, before reading"); - con.connect(); - // fetch the results - BufferedReader reader = new BufferedReader(new InputStreamReader(con.getInputStream())); - String line = null; - String stat = null; - boolean ok = false; - while ( (line = reader.readLine()) != null) { - if (line.startsWith(PROP_SEND_URL)) { - _mySendURL = line.substring(PROP_SEND_URL.length()+1).trim(); - } else if (line.startsWith(PROP_POLL_URL)) { - _myPollURL = line.substring(PROP_POLL_URL.length()+1).trim(); - } else if (line.startsWith(PROP_STATUS)) { - stat = line.substring(PROP_STATUS.length()+1).trim(); - if (STATUS_REGISTERED.equals(stat.toLowerCase())) - ok = true; - } else if (line.startsWith(PROP_TIME_OFFSET)) { - String offset = line.substring(PROP_TIME_OFFSET.length()+1).trim(); - try { - _timeOffset = Long.parseLong(offset); - } catch (Throwable t) { - _log.warn("Unable to parse time offset [" + offset + "] - treating as MAX"); - _timeOffset = Long.MAX_VALUE; - } - } - if ( (_myPollURL != null) && (_mySendURL != null) && (stat != null) ) - break; - } - - if (_trustTime) { - _log.info("Setting time offset to " + _timeOffset + " (old offset: " + _context.clock().getOffset() + ")"); - _context.clock().setOffset(_timeOffset); - } - //if ( (_timeOffset > Router.CLOCK_FUDGE_FACTOR) || (_timeOffset < 0 - Router.CLOCK_FUDGE_FACTOR) ) { - // _log.error("Unable to register with PHTTP relay, as there is too much clock skew! " + _timeOffset + "ms difference (them-us)", new Exception("Too much clock skew with phttp relay!")); - // return false; - //} - - if (ok) { - _log.info("Registered with the PHTTP relay [" + _myRegisterURL + "]"); - _log.info("Registered sending url: [" + _mySendURL + "]"); - _log.info("Registered polling url: [" + _myPollURL + "]"); - return true; - } else { - _log.warn("PHTTP relay [" + _myRegisterURL + "] rejected registration"); - } - } catch (Throwable t) { - _log.warn("Error registering", t); - } - - return false; - } - - protected void outboundMessageReady() { - OutNetMessage msg = getNextMessage(); - if (msg != null) { - _context.jobQueue().addJob(new PushNewMessageJob(msg)); - } else { - _log.debug("OutboundMessageReady called, but none were available"); - } - } - - public TransportBid bid(RouterInfo toAddress, long dataSize) { - if (_poller.shouldRejectMessages()) - return null; // we're not using phttp - - long latencyStartup = 0; //_context.bandwidthLimiter().calculateDelayOutbound(toAddress.getIdentity(), (int)dataSize); - latencyStartup += _pollFrequencyMs / 2; // average distance until the next poll - long sendTime = (int)((dataSize)/(16*1024)); // 16K/sec ARBITRARY - int bytes = (int)dataSize+1024; - - // lets seriously penalize phttp to heavily prefer TCP - bytes += 1024*100; - latencyStartup += 1000*600; - - TransportBid bid = new TransportBid(); - bid.setBandwidthBytes(bytes); - bid.setExpiration(new Date(_context.clock().now()+1000*60)); // 1 minute, since the bwlimiter goes per minute - bid.setLatencyMs((int) (latencyStartup + sendTime)); - bid.setMessageSize((int)dataSize); - bid.setRouter(toAddress); - bid.setTransport(this); - - RouterAddress addr = getTargetAddress(toAddress); - if (addr == null) - return null; - - return bid; - } - - public RouterAddress startListening() { - _log.debug("Start listening"); - return _myAddress; - } - public void stopListening() { - if (_poller != null) - _poller.stopPolling(); - } - - - public void rotateAddresses() {} - public void addAddressInfo(Properties infoForNewAddress) {} - public String getStyle() { return STYLE; } - - boolean getTrustTime() { return _trustTime; } - - private class PushNewMessageJob extends JobImpl { - private OutNetMessage _msg; - public PushNewMessageJob(OutNetMessage msg) { - super(PHTTPTransport.this._context); - _msg = msg; - } - public String getName() { return "Push New PHTTP Message"; } - public void runJob() { - long delay = 0; // _context.bandwidthLimiter().calculateDelayOutbound(_msg.getTarget().getIdentity(), (int)_msg.getMessageSize()); - if (delay > 0) { - getTiming().setStartAfter(delay + _context.clock().now()); - PHTTPTransport.this._context.jobQueue().addJob(this); - } else { - _sender.send(_msg); - } - } - } -} -- GitLab