I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Unverified Commit 631a6dd2 authored by zzz's avatar zzz
Browse files

I2CP: Synch fixes

Synch fields in the data messages both directions,
after one report of zero session ID in MessagePayloadMessage
to an external client on fast hardware with Java 18.
Add new constructors for efficiency.
Deprecated setters, unit tests not changed, TODO.
Should fix all the other messages also, TODO.
parent 0154a87c
No related branches found
No related tags found
No related merge requests found
......@@ -211,23 +211,20 @@ class I2CPMessageProducer {
// drop the message... send fail notification?
return;
SendMessageMessage msg;
if (expires > 0 || flags > 0) {
SendMessageExpiresMessage smsg = new SendMessageExpiresMessage();
smsg.setExpiration(expires);
smsg.setFlags(flags);
msg = smsg;
} else
msg = new SendMessageMessage();
msg.setDestination(dest);
SessionId sid = session.getSessionId();
if (sid == null) {
_log.error(session.toString() + " cannot send message, session closed", new Exception());
return;
}
msg.setSessionId(sid);
msg.setNonce(nonce);
Payload data = createPayload(payload);
msg.setPayload(data);
if (expires > 0 || flags > 0) {
SendMessageExpiresMessage smsg = new SendMessageExpiresMessage(sid, dest, data, nonce);
smsg.setExpiration(expires);
smsg.setFlags(flags);
msg = smsg;
} else {
msg = new SendMessageMessage(sid, dest, data, nonce);
}
session.sendMessage(msg);
}
......@@ -244,17 +241,13 @@ class I2CPMessageProducer {
if (!updateBps(payload.length, expires))
// drop the message... send fail notification?
return;
SendMessageMessage msg = new SendMessageExpiresMessage(options);
msg.setDestination(dest);
SessionId sid = session.getSessionId();
if (sid == null) {
_log.error(session.toString() + " cannot send message, session closed", new Exception());
return;
}
msg.setSessionId(sid);
msg.setNonce(nonce);
Payload data = createPayload(payload);
msg.setPayload(data);
SendMessageMessage msg = new SendMessageExpiresMessage(sid, dest, data, nonce, options);
session.sendMessage(msg);
}
......
......@@ -28,12 +28,29 @@ public class MessagePayloadMessage extends I2CPMessageImpl {
private long _messageId;
private Payload _payload;
/**
* For reading.
* Deprecated for writing, use 3-arg constructor
*/
public MessagePayloadMessage() {
_sessionId = -1;
_messageId = -1;
}
public long getSessionId() {
/**
* For writing
*
* @since 0.9.54
*/
public MessagePayloadMessage(long sessID, long msgID, Payload payload) {
synchronized(this) {
_sessionId = (int) sessID;
_messageId = msgID;
_payload = payload;
}
}
public synchronized long getSessionId() {
return _sessionId;
}
......@@ -43,33 +60,45 @@ public class MessagePayloadMessage extends I2CPMessageImpl {
* @since 0.9.21
*/
@Override
public SessionId sessionId() {
public synchronized SessionId sessionId() {
return _sessionId >= 0 ? new SessionId(_sessionId) : null;
}
/** @param id 0-65535 */
public void setSessionId(long id) {
/**
* @param id 0-65535
* @deprecated use 3-arg constructor
*/
@Deprecated
public synchronized void setSessionId(long id) {
_sessionId = (int) id;
}
public long getMessageId() {
public synchronized long getMessageId() {
return _messageId;
}
public void setMessageId(long id) {
/**
* @deprecated use 3-arg constructor
*/
@Deprecated
public synchronized void setMessageId(long id) {
_messageId = id;
}
public Payload getPayload() {
public synchronized Payload getPayload() {
return _payload;
}
public void setPayload(Payload payload) {
/**
* @deprecated use 3-arg constructor
*/
@Deprecated
public synchronized void setPayload(Payload payload) {
_payload = payload;
}
@Override
protected void doReadMessage(InputStream in, int size) throws I2CPMessageException, IOException {
protected synchronized void doReadMessage(InputStream in, int size) throws I2CPMessageException, IOException {
try {
_sessionId = (int) DataHelper.readLong(in, 2);
_messageId = DataHelper.readLong(in, 4);
......@@ -95,7 +124,7 @@ public class MessagePayloadMessage extends I2CPMessageImpl {
* @throws IOException
*/
@Override
public void writeMessage(OutputStream out) throws I2CPMessageException, IOException {
public synchronized void writeMessage(OutputStream out) throws I2CPMessageException, IOException {
if (_sessionId <= 0)
throw new I2CPMessageException("Unable to write out the message, as the session ID has not been defined");
if (_messageId < 0)
......
......@@ -17,6 +17,8 @@ import java.util.Date;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.DateAndFlags;
import net.i2p.data.Destination;
import net.i2p.data.Payload;
/**
* Same as SendMessageMessage, but with an expiration to be passed to the router
......@@ -30,16 +32,47 @@ public class SendMessageExpiresMessage extends SendMessageMessage {
public final static int MESSAGE_TYPE = 36;
private final DateAndFlags _daf;
/**
* For reading.
* Deprecated for writing, use 4-arg constructor
*/
public SendMessageExpiresMessage() {
this(new DateAndFlags());
super();
_daf = new DateAndFlags();
}
/** @since 0.9.2 */
/**
* For writing
*
* @deprecated use 5-arg constructor
* @since 0.9.2
*/
@Deprecated
public SendMessageExpiresMessage(DateAndFlags options) {
super();
_daf = options;
}
/**
* For writing
*
* @since 0.9.54
*/
public SendMessageExpiresMessage(SessionId sessID, Destination dest, Payload payload, long nonce) {
super(sessID, dest, payload, nonce);
_daf = new DateAndFlags();
}
/**
* For writing
*
* @since 0.9.54
*/
public SendMessageExpiresMessage(SessionId sessID, Destination dest, Payload payload, long nonce, DateAndFlags options) {
super(sessID, dest, payload, nonce);
_daf = options;
}
/**
* The Date object is created here, it is not cached.
* Use getExpirationTime() if you only need the long value.
......@@ -87,7 +120,7 @@ public class SendMessageExpiresMessage extends SendMessageMessage {
* @throws IOException
*/
@Override
public void readMessage(InputStream in, int length, int type) throws I2CPMessageException, IOException {
public synchronized void readMessage(InputStream in, int length, int type) throws I2CPMessageException, IOException {
super.readMessage(in, length, type);
try {
......@@ -104,7 +137,7 @@ public class SendMessageExpiresMessage extends SendMessageMessage {
* @throws IOException
*/
@Override
public void writeMessage(OutputStream out) throws I2CPMessageException, IOException {
public synchronized void writeMessage(OutputStream out) throws I2CPMessageException, IOException {
if (_sessionId == null)
throw new I2CPMessageException("No session ID");
if (_destination == null)
......
......@@ -31,10 +31,28 @@ public class SendMessageMessage extends I2CPMessageImpl {
protected Payload _payload;
protected long _nonce;
/**
* For reading.
* Deprecated for writing, use 4-arg constructor
*/
public SendMessageMessage() {
}
public SessionId getSessionId() {
/**
* For writing
*
* @since 0.9.54
*/
public SendMessageMessage(SessionId sessID, Destination dest, Payload payload, long nonce) {
synchronized(this) {
_sessionId = sessID;
_destination = dest;
_payload = payload;
_nonce = nonce;
}
}
public synchronized SessionId getSessionId() {
return _sessionId;
}
......@@ -44,41 +62,55 @@ public class SendMessageMessage extends I2CPMessageImpl {
* @since 0.9.21
*/
@Override
public SessionId sessionId() {
public synchronized SessionId sessionId() {
return _sessionId;
}
public void setSessionId(SessionId id) {
/**
* @deprecated use 4-arg constructor
*/
@Deprecated
public synchronized void setSessionId(SessionId id) {
_sessionId = id;
}
public Destination getDestination() {
public synchronized Destination getDestination() {
return _destination;
}
public void setDestination(Destination destination) {
/**
* @deprecated use 4-arg constructor
*/
@Deprecated
public synchronized void setDestination(Destination destination) {
_destination = destination;
}
public Payload getPayload() {
public synchronized Payload getPayload() {
return _payload;
}
public void setPayload(Payload payload) {
/**
* @deprecated use 4-arg constructor
*/
@Deprecated
public synchronized void setPayload(Payload payload) {
_payload = payload;
}
/**
* @return 0 to 0xffffffff
*/
public long getNonce() {
public synchronized long getNonce() {
return _nonce;
}
/**
* @param nonce 0 to 0xffffffff
* @param nonce 0 to 0xffffffff
* @deprecated use 4-arg constructor
*/
public void setNonce(long nonce) {
@Deprecated
public synchronized void setNonce(long nonce) {
_nonce = nonce;
}
......@@ -93,7 +125,7 @@ public class SendMessageMessage extends I2CPMessageImpl {
* @throws IOException
*/
@Override
public void readMessage(InputStream in, int length, int type) throws I2CPMessageException, IOException {
public synchronized void readMessage(InputStream in, int length, int type) throws I2CPMessageException, IOException {
if (type != getType())
throw new I2CPMessageException("Invalid message type (found: " + type + " supported: " + getType()
+ " class: " + getClass().getName() + ")");
......@@ -126,7 +158,7 @@ public class SendMessageMessage extends I2CPMessageImpl {
* @throws IOException
*/
@Override
public void writeMessage(OutputStream out) throws I2CPMessageException, IOException {
public synchronized void writeMessage(OutputStream out) throws I2CPMessageException, IOException {
if (_sessionId == null)
throw new I2CPMessageException("No session ID");
if (_destination == null)
......
......@@ -512,10 +512,6 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
if (_runner.isDead()) return;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling receive begin: id = " + message.getMessageId());
MessagePayloadMessage msg = new MessagePayloadMessage();
msg.setMessageId(message.getMessageId());
// TODO validate session id
msg.setSessionId(message.getSessionId());
Payload payload = _runner.getPayload(new MessageId(message.getMessageId()));
if (payload == null) {
if (_log.shouldLog(Log.WARN))
......@@ -523,7 +519,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
+ "] is null! Dropped or Unknown message id");
return;
}
msg.setPayload(payload);
// TODO validate session id
MessagePayloadMessage msg = new MessagePayloadMessage(message.getSessionId(), message.getMessageId(), payload);
try {
_runner.doSend(msg);
} catch (I2CPMessageException ime) {
......
......@@ -110,16 +110,13 @@ class MessageReceivedJob extends JobImpl {
* @since 0.9.4
*/
private void sendMessage(long id) throws I2CPMessageException {
MessagePayloadMessage msg = new MessagePayloadMessage();
msg.setMessageId(id);
SessionId sid = _runner.getSessionId(_toDest.calculateHash());
if (sid == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No session for " + _toDest.toBase32());
return;
}
msg.setSessionId(sid.getSessionId());
msg.setPayload(_payload);
MessagePayloadMessage msg = new MessagePayloadMessage(sid.getSessionId(), id, _payload);
_runner.doSend(msg);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment