From 1249ad29e016e133d249645a0ef2c3f52627081f Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 04:42:02 +0100 Subject: [PATCH 01/27] claim pieces from list of available pieces --- .../groovy/com/muwire/core/download/Pieces.groovy | 12 ++++++++++++ .../com/muwire/core/download/PiecesTest.groovy | 15 +++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/core/src/main/groovy/com/muwire/core/download/Pieces.groovy b/core/src/main/groovy/com/muwire/core/download/Pieces.groovy index 4028a6f7..1bcac9f3 100644 --- a/core/src/main/groovy/com/muwire/core/download/Pieces.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Pieces.groovy @@ -38,6 +38,18 @@ class Pieces { } } + synchronized int claim(Set available) { + for (int i = claimed.nextSetBit(0); i >= 0; i = claimed.nextSetBit(i+1)) + available.remove(i) + if (available.isEmpty()) + return -1 + List toList = available.toList() + Collections.shuffle(toList) + int rv = toList[0] + claimed.set(rv) + rv + } + synchronized def getDownloaded() { def rv = [] for (int i = done.nextSetBit(0); i >= 0; i = done.nextSetBit(i+1)) { diff --git a/core/src/test/groovy/com/muwire/core/download/PiecesTest.groovy b/core/src/test/groovy/com/muwire/core/download/PiecesTest.groovy index 09082103..8fa7bfd4 100644 --- a/core/src/test/groovy/com/muwire/core/download/PiecesTest.groovy +++ b/core/src/test/groovy/com/muwire/core/download/PiecesTest.groovy @@ -34,4 +34,19 @@ class PiecesTest { pieces.markDownloaded(piece2) assert pieces.isComplete() } + + @Test + public void testClaimAvailable() { + pieces = new Pieces(2) + int claimed = pieces.claim([0].toSet()) + assert claimed == 0 + assert -1 == pieces.claim([0].toSet()) + } + + @Test + public void testClaimNoneAvailable() { + pieces = new Pieces(20) + int claimed = pieces.claim() + assert -1 == pieces.claim([claimed].toSet()) + } } From b23226e8c67d60488c5ec391a131e179eb5577b5 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 05:30:56 +0100 Subject: [PATCH 02/27] wip on parsing X-Have from uploader --- .../core/download/DownloadSession.groovy | 82 +++++++++++++------ 1 file changed, 58 insertions(+), 24 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy index a6a5172d..d476829a 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy @@ -30,6 +30,7 @@ class DownloadSession { private final File file private final int pieceSize private final long fileLength + private final Set available private final MessageDigest digest private final LinkedList timestamps = new LinkedList<>() @@ -38,7 +39,7 @@ class DownloadSession { private ByteBuffer mapped DownloadSession(String meB64, Pieces pieces, InfoHash infoHash, Endpoint endpoint, File file, - int pieceSize, long fileLength) { + int pieceSize, long fileLength, Set available) { this.meB64 = meB64 this.pieces = pieces this.endpoint = endpoint @@ -46,6 +47,7 @@ class DownloadSession { this.file = file this.pieceSize = pieceSize this.fileLength = fileLength + this.available = available try { digest = MessageDigest.getInstance("SHA-256") } catch (NoSuchAlgorithmException impossible) { @@ -63,7 +65,11 @@ class DownloadSession { OutputStream os = endpoint.getOutputStream() InputStream is = endpoint.getInputStream() - int piece = pieces.claim() + int piece + if (available.isEmpty()) + piece = pieces.claim() + else + piece = pieces.claim(available) if (piece == -1) return false boolean unclaim = true @@ -81,43 +87,60 @@ class DownloadSession { os.write("Range: $start-$end\r\n".getBytes(StandardCharsets.US_ASCII)) os.write("X-Persona: $meB64\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) os.flush() - String code = readTillRN(is) - if (code.startsWith("404 ")) { + String codeString = readTillRN(is) + codeString = codeString.substring(codeString.indexOf(' ')) + + int code = Integer.parseInt(codeString.trim()) + + if (code == 404) { log.warning("file not found") endpoint.close() return false } - if (code.startsWith("416 ")) { - log.warning("range $start-$end cannot be satisfied") - return // leave endpoint open - } - - if (!code.startsWith("200 ")) { + if (!(code == 200 || code == 416)) { log.warning("unknown code $code") endpoint.close() return false } // parse all headers - Set headers = new HashSet<>() + Map headers = new HashMap<>() String header - while((header = readTillRN(is)) != "" && headers.size() < Constants.MAX_HEADERS) - headers.add(header) + while((header = readTillRN(is)) != "" && headers.size() < Constants.MAX_HEADERS) { + int colon = header.indexOf(':') + if (colon == -1 || colon == header.length() - 1) + throw new IOException("invalid header $header") + String key = header.substring(0, colon) + String value = header.substring(colon + 1) + headers[key] = value + } - long receivedStart = -1 - long receivedEnd = -1 - for (String receivedHeader : headers) { - def group = (receivedHeader =~ /^Content-Range: (\d+)-(\d+)$/) - if (group.size() != 1) { - log.info("ignoring header $receivedHeader") - continue - } - - receivedStart = Long.parseLong(group[0][1]) - receivedEnd = Long.parseLong(group[0][2]) + // parse X-Have if present + if (headers.containsKey("X-Have")) { + updateAvailablePieces(headers["X-Have"]) + if (!available.contains(piece)) + return true // try again next time + } else { + if (code != 200) + throw new IOException("Code $code but no X-Have") + available.clear() } + if (code != 200) + return true + + String range = headers["Content-Range"] + if (range == null) + throw new IOException("Code 200 but no Content-Range") + + def group = (range =~ /^ (\d+)-(\d+)$/) + if (group.size() != 1) + throw new IOException("invalid Content-Range header $range") + + long receivedStart = Long.parseLong(group[0][1]) + long receivedEnd = Long.parseLong(group[0][2]) + if (receivedStart != start || receivedEnd != end) { log.warning("We don't support mismatching ranges yet") endpoint.close() @@ -196,4 +219,15 @@ class DownloadSession { totalRead += reads[idx] (int)(totalRead * 1000.0 / interval) } + + private void updateAvailablePieces(String xHave) { + byte [] availablePieces = Base64.decode(xHave) + availablePieces.eachWithIndex {b, i -> + for (int j = 0; j < 8 ; j++) { + int mask = 0x80 >> j + if ((b & mask) == mask) + available.add(i * 8 + j) + } + } + } } From f9777d29f4e2c645ccb7d99af4edfe0ef3bcaed5 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 05:41:49 +0100 Subject: [PATCH 03/27] get existing tests to pass --- .../com/muwire/core/download/DownloadSession.groovy | 3 ++- .../muwire/core/download/DownloadSessionTest.groovy | 10 ++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy index d476829a..678b7e09 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy @@ -88,7 +88,8 @@ class DownloadSession { os.write("X-Persona: $meB64\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) os.flush() String codeString = readTillRN(is) - codeString = codeString.substring(codeString.indexOf(' ')) + int space = codeString.indexOf(' ') + codeString = codeString.substring(0, space) int code = Integer.parseInt(codeString.trim()) diff --git a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy index 9480658b..ccead90e 100644 --- a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy +++ b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy @@ -9,6 +9,7 @@ import com.muwire.core.files.FileHasher import static com.muwire.core.util.DataUtil.readTillRN import net.i2p.data.Base64 +import net.i2p.util.ConcurrentHashSet class DownloadSessionTest { @@ -24,6 +25,9 @@ class DownloadSessionTest { private InputStream fromDownloader, fromUploader private OutputStream toDownloader, toUploader + private volatile boolean performed + private Set available = new ConcurrentHashSet<>() + private void initSession(int size, def claimedPieces = []) { Random r = new Random() byte [] content = new byte[size] @@ -56,8 +60,8 @@ class DownloadSessionTest { toUploader = new PipedOutputStream(fromDownloader) endpoint = new Endpoint(null, fromUploader, toUploader, null) - session = new DownloadSession("",pieces, infoHash, endpoint, target, pieceSize, size) - downloadThread = new Thread( { session.request() } as Runnable) + session = new DownloadSession("",pieces, infoHash, endpoint, target, pieceSize, size, available) + downloadThread = new Thread( { performed = session.request() } as Runnable) downloadThread.setDaemon(true) downloadThread.start() } @@ -87,6 +91,8 @@ class DownloadSessionTest { assert pieces.isComplete() assert target.bytes == source.bytes + assert performed + assert available.isEmpty() } @Test From 5504dd22518a253539e864a2b1a009089e5106cc Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 05:45:11 +0100 Subject: [PATCH 04/27] tighten conditions --- .../com/muwire/core/download/DownloadSessionTest.groovy | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy index ccead90e..f1e0ecac 100644 --- a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy +++ b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy @@ -114,6 +114,8 @@ class DownloadSessionTest { Thread.sleep(150) assert pieces.isComplete() assert target.bytes == source.bytes + assert performed + assert available.isEmpty() } @Test @@ -144,6 +146,8 @@ class DownloadSessionTest { Thread.sleep(150) assert !pieces.isComplete() assert 1 == pieces.donePieces() + assert performed + assert available.isEmpty() } @Test @@ -151,7 +155,9 @@ class DownloadSessionTest { initSession(20, [0]) long now = System.currentTimeMillis() downloadThread.join(100) - assert 100 > (System.currentTimeMillis() - now) + assert 100 >= (System.currentTimeMillis() - now) + assert !performed + assert available.isEmpty() } @Test From a26ad229ee8720b3a185f49fde30017a569f41a8 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 05:56:42 +0100 Subject: [PATCH 05/27] more tests --- .../core/download/DownloadSessionTest.groovy | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy index f1e0ecac..60b9c356 100644 --- a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy +++ b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy @@ -27,6 +27,7 @@ class DownloadSessionTest { private volatile boolean performed private Set available = new ConcurrentHashSet<>() + private volatile IOException thrown private void initSession(int size, def claimedPieces = []) { Random r = new Random() @@ -61,7 +62,13 @@ class DownloadSessionTest { endpoint = new Endpoint(null, fromUploader, toUploader, null) session = new DownloadSession("",pieces, infoHash, endpoint, target, pieceSize, size, available) - downloadThread = new Thread( { performed = session.request() } as Runnable) + downloadThread = new Thread( { + try { + performed = session.request() + } catch (IOException e) { + thrown = e + } + } as Runnable) downloadThread.setDaemon(true) downloadThread.start() } @@ -93,6 +100,7 @@ class DownloadSessionTest { assert target.bytes == source.bytes assert performed assert available.isEmpty() + assert thrown == null } @Test @@ -116,6 +124,7 @@ class DownloadSessionTest { assert target.bytes == source.bytes assert performed assert available.isEmpty() + assert thrown == null } @Test @@ -148,6 +157,7 @@ class DownloadSessionTest { assert 1 == pieces.donePieces() assert performed assert available.isEmpty() + assert thrown == null } @Test @@ -158,6 +168,7 @@ class DownloadSessionTest { assert 100 >= (System.currentTimeMillis() - now) assert !performed assert available.isEmpty() + assert thrown == null } @Test @@ -176,4 +187,25 @@ class DownloadSessionTest { assert pieces.claimed.get(0) assert start == 0 && end == (1 << pieceSize) - 1 } + + @Test + public void test416NoHave() { + initSession(20) + readAllHeaders(fromDownloader) + + toDownloader.write("416 don't have it\r\n\r\n".bytes) + toDownloader.flush() + Thread.sleep(150) + assert !performed + assert available.isEmpty() + assert thrown != null + } + + private static Set readAllHeaders(InputStream is) { + Set rv = new HashSet<>() + String header + while((header = readTillRN(is)) != "") + rv.add(header) + rv + } } From 807ab22f8ef0565cde6a78b8d35f54073268766d Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 06:43:48 +0100 Subject: [PATCH 06/27] test parsing of X-Have --- .../core/download/DownloadSession.groovy | 9 ++-- .../core/download/DownloadSessionTest.groovy | 46 +++++++++++++++++++ 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy index 678b7e09..edd5603d 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy @@ -114,7 +114,7 @@ class DownloadSession { throw new IOException("invalid header $header") String key = header.substring(0, colon) String value = header.substring(colon + 1) - headers[key] = value + headers[key] = value.trim() } // parse X-Have if present @@ -135,7 +135,7 @@ class DownloadSession { if (range == null) throw new IOException("Code 200 but no Content-Range") - def group = (range =~ /^ (\d+)-(\d+)$/) + def group = (range =~ /^(\d+)-(\d+)$/) if (group.size() != 1) throw new IOException("invalid Content-Range header $range") @@ -225,9 +225,10 @@ class DownloadSession { byte [] availablePieces = Base64.decode(xHave) availablePieces.eachWithIndex {b, i -> for (int j = 0; j < 8 ; j++) { - int mask = 0x80 >> j - if ((b & mask) == mask) + byte mask = 0x80 >>> j + if ((b & mask) == mask) { available.add(i * 8 + j) + } } } } diff --git a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy index 60b9c356..432fedd3 100644 --- a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy +++ b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy @@ -201,6 +201,38 @@ class DownloadSessionTest { assert thrown != null } + @Test + public void test416Have() { + initSession(20) + readAllHeaders(fromDownloader) + + toDownloader.write("416 don't have it\r\n".bytes) + toDownloader.write("X-Have: ${encodeXHave([0], 1)}\r\n\r\n".bytes) + toDownloader.flush() + + Thread.sleep(150) + assert performed + assert available.contains(0) + assert thrown == null + } + + @Test + public void test416Have2Pieces() { + int pieceSize = FileHasher.getPieceSize(1) + int size = (1 << pieceSize) + 1 + initSession(size) + readAllHeaders(fromDownloader) + + toDownloader.write("416 don't have it\r\n".bytes) + toDownloader.write("X-Have: ${encodeXHave([1], 2)}\r\n\r\n".bytes) + toDownloader.flush() + + Thread.sleep(150) + assert performed + assert available.contains(1) + assert thrown == null + } + private static Set readAllHeaders(InputStream is) { Set rv = new HashSet<>() String header @@ -208,4 +240,18 @@ class DownloadSessionTest { rv.add(header) rv } + + private static String encodeXHave(List pieces, int totalPieces) { + int bytes = totalPieces / 8 + if (totalPieces % 8 != 0) + bytes++ + byte[] raw = new byte[bytes] + pieces.each { + int byteIdx = it / 8 + int offset = it % 8 + int mask = 0x80 >>> offset + raw[byteIdx] |= mask + } + Base64.encode(raw) + } } From dd655ed60fc4eb5f9f599d1ce4eb2c867a0ba522 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 09:12:42 +0100 Subject: [PATCH 07/27] test for re-requesting available pieces --- .../core/download/DownloadSessionTest.groovy | 66 +++++++++++++++++-- 1 file changed, 59 insertions(+), 7 deletions(-) diff --git a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy index 432fedd3..16f31369 100644 --- a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy +++ b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy @@ -1,5 +1,7 @@ package com.muwire.core.download +import static org.junit.Assert.fail + import org.junit.After import org.junit.Test @@ -62,17 +64,19 @@ class DownloadSessionTest { endpoint = new Endpoint(null, fromUploader, toUploader, null) session = new DownloadSession("",pieces, infoHash, endpoint, target, pieceSize, size, available) - downloadThread = new Thread( { - try { - performed = session.request() - } catch (IOException e) { - thrown = e - } - } as Runnable) + downloadThread = new Thread( { perform() } as Runnable) downloadThread.setDaemon(true) downloadThread.start() } + private void perform() { + try { + performed = session.request() + } catch (IOException e) { + thrown = e + } + } + @After public void teardown() { source?.delete() @@ -233,6 +237,54 @@ class DownloadSessionTest { assert thrown == null } + @Test + public void test200TwoPieces1Available() { + int pieceSize = FileHasher.getPieceSize(1) + int size = (1 << pieceSize) * 9 + 1 + initSession(size) + + Set headers = readAllHeaders(fromDownloader) + def matcher = null + headers.each { + if (it.startsWith("Range")) + matcher = (it =~ /^Range: (\d+)-(\d+)$/) + } + assert matcher.groupCount() > 0 + int start = Integer.parseInt(matcher[0][1]) + int end = Integer.parseInt(matcher[0][2]) + + if (start == 0) + fail("inconlcusive") + + toDownloader.write("416 don't have it \r\n".bytes) + toDownloader.write("X-Have: ${encodeXHave([0],2)}\r\n\r\n".bytes) + toDownloader.flush() + downloadThread.join() + + assert performed + performed = false + assert available.contains(0) + assert thrown == null + + // request same session + downloadThread = new Thread( { perform() } as Runnable) + downloadThread.setDaemon(true) + downloadThread.start() + + Thread.sleep(150) + + headers = readAllHeaders(fromDownloader) + matcher = null + headers.each { + if (it.startsWith("Range")) + matcher = (it =~ /^Range: (\d+)-(\d+)$/) + } + assert matcher.groupCount() > 0 + start = Integer.parseInt(matcher[0][1]) + end = Integer.parseInt(matcher[0][2]) + assert start == 0 + } + private static Set readAllHeaders(InputStream is) { Set rv = new HashSet<>() String header From 8f9996848b40847b26476b6f9dd5cac71c99cae9 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 09:25:28 +0100 Subject: [PATCH 08/27] send X-Have from downloader too --- .../core/download/DownloadSession.groovy | 6 +++++- .../com/muwire/core/util/DataUtil.groovy | 16 ++++++++++++++++ .../core/download/DownloadSessionTest.groovy | 18 ++++-------------- 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy index edd5603d..3a8cc586 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy @@ -5,6 +5,8 @@ import net.i2p.data.Base64 import com.muwire.core.Constants import com.muwire.core.InfoHash import com.muwire.core.connection.Endpoint +import com.muwire.core.util.DataUtil + import static com.muwire.core.util.DataUtil.readTillRN import groovy.util.logging.Log @@ -85,7 +87,9 @@ class DownloadSession { try { os.write("GET $root\r\n".getBytes(StandardCharsets.US_ASCII)) os.write("Range: $start-$end\r\n".getBytes(StandardCharsets.US_ASCII)) - os.write("X-Persona: $meB64\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("X-Persona: $meB64\r\n".getBytes(StandardCharsets.US_ASCII)) + String xHave = DataUtil.encodeXHave(pieces.getDownloaded(), pieces.nPieces) + os.write("X-Have: $xHave\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) os.flush() String codeString = readTillRN(is) int space = codeString.indexOf(' ') diff --git a/core/src/main/groovy/com/muwire/core/util/DataUtil.groovy b/core/src/main/groovy/com/muwire/core/util/DataUtil.groovy index 25bec593..26bd19d7 100644 --- a/core/src/main/groovy/com/muwire/core/util/DataUtil.groovy +++ b/core/src/main/groovy/com/muwire/core/util/DataUtil.groovy @@ -4,6 +4,8 @@ import java.nio.charset.StandardCharsets import com.muwire.core.Constants +import net.i2p.data.Base64 + class DataUtil { private final static int MAX_SHORT = (0x1 << 16) - 1 @@ -79,4 +81,18 @@ class DataUtil { } new String(baos.toByteArray(), StandardCharsets.US_ASCII) } + + public static String encodeXHave(List pieces, int totalPieces) { + int bytes = totalPieces / 8 + if (totalPieces % 8 != 0) + bytes++ + byte[] raw = new byte[bytes] + pieces.each { + int byteIdx = it / 8 + int offset = it % 8 + int mask = 0x80 >>> offset + raw[byteIdx] |= mask + } + Base64.encode(raw) + } } diff --git a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy index 16f31369..1a3327f4 100644 --- a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy +++ b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy @@ -9,6 +9,7 @@ import com.muwire.core.InfoHash import com.muwire.core.connection.Endpoint import com.muwire.core.files.FileHasher import static com.muwire.core.util.DataUtil.readTillRN +import static com.muwire.core.util.DataUtil.encodeXHave import net.i2p.data.Base64 import net.i2p.util.ConcurrentHashSet @@ -91,6 +92,7 @@ class DownloadSessionTest { assert "GET $rootBase64" == readTillRN(fromDownloader) assert "Range: 0-19" == readTillRN(fromDownloader) readTillRN(fromDownloader) + readTillRN(fromDownloader) assert "" == readTillRN(fromDownloader) toDownloader.write("200 OK\r\n".bytes) @@ -116,6 +118,7 @@ class DownloadSessionTest { assert "GET $rootBase64" == readTillRN(fromDownloader) readTillRN(fromDownloader) readTillRN(fromDownloader) + readTillRN(fromDownloader) assert "" == readTillRN(fromDownloader) toDownloader.write("200 OK\r\n".bytes) @@ -146,6 +149,7 @@ class DownloadSessionTest { assert (start == 0 && end == ((1 << pieceSize) - 1)) || (start == (1 << pieceSize) && end == (1 << pieceSize)) + readTillRN(fromDownloader) readTillRN(fromDownloader) assert "" == readTillRN(fromDownloader) @@ -292,18 +296,4 @@ class DownloadSessionTest { rv.add(header) rv } - - private static String encodeXHave(List pieces, int totalPieces) { - int bytes = totalPieces / 8 - if (totalPieces % 8 != 0) - bytes++ - byte[] raw = new byte[bytes] - pieces.each { - int byteIdx = it / 8 - int offset = it % 8 - int mask = 0x80 >>> offset - raw[byteIdx] |= mask - } - Base64.encode(raw) - } } From 56125f6df8f51259922f36e4b4b72d3f8f19f3b5 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 09:32:10 +0100 Subject: [PATCH 09/27] refactor X-Have decoding logic --- .../muwire/core/download/DownloadSession.groovy | 16 +++------------- .../groovy/com/muwire/core/util/DataUtil.groovy | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy index 3a8cc586..7d8f9777 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy @@ -123,7 +123,9 @@ class DownloadSession { // parse X-Have if present if (headers.containsKey("X-Have")) { - updateAvailablePieces(headers["X-Have"]) + DataUtil.decodeXHave(headers["X-Have"]).each { + available.add(it) + } if (!available.contains(piece)) return true // try again next time } else { @@ -224,16 +226,4 @@ class DownloadSession { totalRead += reads[idx] (int)(totalRead * 1000.0 / interval) } - - private void updateAvailablePieces(String xHave) { - byte [] availablePieces = Base64.decode(xHave) - availablePieces.eachWithIndex {b, i -> - for (int j = 0; j < 8 ; j++) { - byte mask = 0x80 >>> j - if ((b & mask) == mask) { - available.add(i * 8 + j) - } - } - } - } } diff --git a/core/src/main/groovy/com/muwire/core/util/DataUtil.groovy b/core/src/main/groovy/com/muwire/core/util/DataUtil.groovy index 26bd19d7..238fa871 100644 --- a/core/src/main/groovy/com/muwire/core/util/DataUtil.groovy +++ b/core/src/main/groovy/com/muwire/core/util/DataUtil.groovy @@ -95,4 +95,18 @@ class DataUtil { } Base64.encode(raw) } + + public static List decodeXHave(String xHave) { + byte [] availablePieces = Base64.decode(xHave) + List available = new ArrayList<>() + availablePieces.eachWithIndex {b, i -> + for (int j = 0; j < 8 ; j++) { + byte mask = 0x80 >>> j + if ((b & mask) == mask) { + available.add(i * 8 + j) + } + } + } + available + } } From 05b02834af4b5f977266ebfd6467cb15ebbd119a Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 12:25:04 +0100 Subject: [PATCH 10/27] parse X-Alt --- .../core/download/DownloadSession.groovy | 15 +++++++- .../muwire/core/download/Downloader.groovy | 2 +- .../download/SourceDiscoveredEvent.groovy | 10 ++++++ .../core/download/DownloadSessionTest.groovy | 36 ++++++++++++++++++- 4 files changed, 60 insertions(+), 3 deletions(-) create mode 100644 core/src/main/groovy/com/muwire/core/download/SourceDiscoveredEvent.groovy diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy index 7d8f9777..8744f681 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy @@ -3,7 +3,9 @@ package com.muwire.core.download; import net.i2p.data.Base64 import com.muwire.core.Constants +import com.muwire.core.EventBus import com.muwire.core.InfoHash +import com.muwire.core.Persona import com.muwire.core.connection.Endpoint import com.muwire.core.util.DataUtil @@ -25,6 +27,7 @@ class DownloadSession { private static int SAMPLES = 10 + private final EventBus eventBus private final String meB64 private final Pieces pieces private final InfoHash infoHash @@ -40,8 +43,9 @@ class DownloadSession { private ByteBuffer mapped - DownloadSession(String meB64, Pieces pieces, InfoHash infoHash, Endpoint endpoint, File file, + DownloadSession(EventBus eventBus, String meB64, Pieces pieces, InfoHash infoHash, Endpoint endpoint, File file, int pieceSize, long fileLength, Set available) { + this.eventBus = eventBus this.meB64 = meB64 this.pieces = pieces this.endpoint = endpoint @@ -120,6 +124,15 @@ class DownloadSession { String value = header.substring(colon + 1) headers[key] = value.trim() } + + // prase X-Alt if present + if (headers.containsKey("X-Alt")) { + headers["X-Alt"].split(",").each { + byte [] raw = Base64.decode(it) + Persona source = new Persona(new ByteArrayInputStream(raw)) + eventBus.publish(new SourceDiscoveredEvent(infoHash : infoHash, source : source)) + } + } // parse X-Have if present if (headers.containsKey("X-Have")) { diff --git a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy index 4458b67b..e2c2d2d2 100644 --- a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy @@ -247,7 +247,7 @@ public class Downloader { currentState = WorkerState.DOWNLOADING boolean requestPerformed while(!pieces.isComplete()) { - currentSession = new DownloadSession(me.toBase64(), pieces, getInfoHash(), endpoint, incompleteFile, pieceSize, length) + currentSession = new DownloadSession(eventBus, me.toBase64(), pieces, getInfoHash(), endpoint, incompleteFile, pieceSize, length) requestPerformed = currentSession.request() if (!requestPerformed) break diff --git a/core/src/main/groovy/com/muwire/core/download/SourceDiscoveredEvent.groovy b/core/src/main/groovy/com/muwire/core/download/SourceDiscoveredEvent.groovy new file mode 100644 index 00000000..47bf88ec --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/download/SourceDiscoveredEvent.groovy @@ -0,0 +1,10 @@ +package com.muwire.core.download + +import com.muwire.core.Event +import com.muwire.core.InfoHash +import com.muwire.core.Persona + +class SourceDiscoveredEvent extends Event { + InfoHash infoHash + Persona source +} diff --git a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy index 1a3327f4..aad57ebf 100644 --- a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy +++ b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy @@ -3,9 +3,13 @@ package com.muwire.core.download import static org.junit.Assert.fail import org.junit.After +import org.junit.Before import org.junit.Test +import com.muwire.core.EventBus import com.muwire.core.InfoHash +import com.muwire.core.Persona +import com.muwire.core.Personas import com.muwire.core.connection.Endpoint import com.muwire.core.files.FileHasher import static com.muwire.core.util.DataUtil.readTillRN @@ -16,6 +20,7 @@ import net.i2p.util.ConcurrentHashSet class DownloadSessionTest { + private EventBus eventBus private File source, target private InfoHash infoHash private Endpoint endpoint @@ -32,6 +37,12 @@ class DownloadSessionTest { private Set available = new ConcurrentHashSet<>() private volatile IOException thrown + + @Before + public void setUp() { + eventBus = new EventBus() + } + private void initSession(int size, def claimedPieces = []) { Random r = new Random() byte [] content = new byte[size] @@ -64,7 +75,7 @@ class DownloadSessionTest { toUploader = new PipedOutputStream(fromDownloader) endpoint = new Endpoint(null, fromUploader, toUploader, null) - session = new DownloadSession("",pieces, infoHash, endpoint, target, pieceSize, size, available) + session = new DownloadSession(eventBus, "",pieces, infoHash, endpoint, target, pieceSize, size, available) downloadThread = new Thread( { perform() } as Runnable) downloadThread.setDaemon(true) downloadThread.start() @@ -289,6 +300,29 @@ class DownloadSessionTest { assert start == 0 } + @Test + public void testXAlt() throws Exception { + Personas personas = new Personas() + def sources = [] + def listener = new Object() { + public void onSourceDiscoveredEvent(SourceDiscoveredEvent e) { + sources << e.source + } + } + eventBus.register(SourceDiscoveredEvent.class, listener) + + initSession(20) + readAllHeaders(fromDownloader) + toDownloader.write("416 don't have it\r\n".bytes) + toDownloader.write("X-Alt: ${personas.persona1.toBase64()},${personas.persona2.toBase64()}\r\n\r\n".bytes) + toDownloader.flush() + + Thread.sleep(150) + assert sources.contains(personas.persona1) + assert sources.contains(personas.persona2) + assert 2 == sources.size() + } + private static Set readAllHeaders(InputStream is) { Set rv = new HashSet<>() String header From 6eb1aa07f5af6f0c76d7d8dd71f9fc8863c0210f Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 12:29:32 +0100 Subject: [PATCH 11/27] key downloaders by infohash --- .../muwire/core/download/DownloadManager.groovy | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy index 5106e536..8687eed1 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy @@ -17,6 +17,7 @@ import com.muwire.core.InfoHash import com.muwire.core.Persona import com.muwire.core.UILoadedEvent +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executor import java.util.concurrent.Executors @@ -28,7 +29,7 @@ public class DownloadManager { private final File incompletes, home private final Persona me - private final Set downloaders = new ConcurrentHashSet<>() + private final Map downloaders = new ConcurrentHashMap<>() public DownloadManager(EventBus eventBus, I2PConnector connector, File home, Persona me) { this.eventBus = eventBus @@ -64,14 +65,14 @@ public class DownloadManager { def downloader = new Downloader(eventBus, this, me, e.target, size, infohash, pieceSize, connector, destinations, incompletes) - downloaders.add(downloader) + downloaders.put(infohash, downloader) persistDownloaders() executor.execute({downloader.download()} as Runnable) eventBus.publish(new DownloadStartedEvent(downloader : downloader)) } public void onUIDownloadCancelledEvent(UIDownloadCancelledEvent e) { - downloaders.remove(e.downloader) + downloaders.remove(e.downloader.infoHash) persistDownloaders() } @@ -101,21 +102,21 @@ public class DownloadManager { } def downloader = new Downloader(eventBus, this, me, file, (long)json.length, infoHash, json.pieceSizePow2, connector, destinations, incompletes) - downloaders.add(downloader) + downloaders.put(infoHash, downloader) downloader.download() eventBus.publish(new DownloadStartedEvent(downloader : downloader)) } } void onFileDownloadedEvent(FileDownloadedEvent e) { - downloaders.remove(e.downloader) + downloaders.remove(e.downloader.infoHash) persistDownloaders() } private void persistDownloaders() { File downloadsFile = new File(home,"downloads.json") downloadsFile.withPrintWriter { writer -> - downloaders.each { downloader -> + downloaders.values().each { downloader -> if (!downloader.cancelled) { def json = [:] json.file = Base64.encode(DataUtil.encodei18nString(downloader.file.getAbsolutePath())) @@ -139,7 +140,7 @@ public class DownloadManager { } public void shutdown() { - downloaders.each { it.stop() } + downloaders.values().each { it.stop() } Downloader.executorService.shutdownNow() } } From 0f07562de3150099106c2404eec36de3cf5f1a3b Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 12:39:16 +0100 Subject: [PATCH 12/27] pass new sources to active downloaders --- .../main/groovy/com/muwire/core/Core.groovy | 4 ++- .../core/download/DownloadManager.groovy | 25 ++++++++++++++++++- .../muwire/core/download/Downloader.groovy | 8 ++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 0f3787b7..17b7eda0 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -13,6 +13,7 @@ import com.muwire.core.connection.I2PConnector import com.muwire.core.connection.LeafConnectionManager import com.muwire.core.connection.UltrapeerConnectionManager import com.muwire.core.download.DownloadManager +import com.muwire.core.download.SourceDiscoveredEvent import com.muwire.core.download.UIDownloadCancelledEvent import com.muwire.core.download.UIDownloadEvent import com.muwire.core.files.FileDownloadedEvent @@ -203,11 +204,12 @@ public class Core { eventBus.register(ResultsEvent.class, searchManager) log.info("initializing download manager") - downloadManager = new DownloadManager(eventBus, i2pConnector, home, me) + downloadManager = new DownloadManager(eventBus, trustService, props, i2pConnector, home, me) eventBus.register(UIDownloadEvent.class, downloadManager) eventBus.register(UILoadedEvent.class, downloadManager) eventBus.register(FileDownloadedEvent.class, downloadManager) eventBus.register(UIDownloadCancelledEvent.class, downloadManager) + eventBus.register(SourceDiscoveredEvent.class, downloadManager) log.info("initializing upload manager") UploadManager uploadManager = new UploadManager(eventBus, fileManager) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy index 8687eed1..d782aa27 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy @@ -3,6 +3,8 @@ package com.muwire.core.download import com.muwire.core.connection.I2PConnector import com.muwire.core.files.FileDownloadedEvent import com.muwire.core.files.FileHasher +import com.muwire.core.trust.TrustLevel +import com.muwire.core.trust.TrustService import com.muwire.core.util.DataUtil import groovy.json.JsonBuilder @@ -14,6 +16,7 @@ import net.i2p.util.ConcurrentHashSet import com.muwire.core.EventBus import com.muwire.core.InfoHash +import com.muwire.core.MuWireSettings import com.muwire.core.Persona import com.muwire.core.UILoadedEvent @@ -24,6 +27,8 @@ import java.util.concurrent.Executors public class DownloadManager { private final EventBus eventBus + private final TrustService trustService + private final MuWireSettings muSettings private final I2PConnector connector private final Executor executor private final File incompletes, home @@ -31,8 +36,11 @@ public class DownloadManager { private final Map downloaders = new ConcurrentHashMap<>() - public DownloadManager(EventBus eventBus, I2PConnector connector, File home, Persona me) { + public DownloadManager(EventBus eventBus, TrustService trustService, MuWireSettings muSettings, + I2PConnector connector, File home, Persona me) { this.eventBus = eventBus + this.trustService = trustService + this.muSettings = muSettings this.connector = connector this.incompletes = new File(home,"incompletes") this.home = home @@ -108,6 +116,21 @@ public class DownloadManager { } } + void onSourceDiscoveredEvent(SourceDiscoveredEvent e) { + Downloader downloader = downloaders.get(e.infoHash) + if (downloader == null) + return + boolean ok = false + switch(trustService.getLevel(e.source.destination)) { + case TrustLevel.TRUSTED: ok = true; break + case TrustLevel.NEUTRAL: ok = muSettings.allowUntrusted; break + case TrustLevel.DISTRUSTED: ok = false; break + } + + if (ok) + downloader.addSource(e.source.destination) + } + void onFileDownloadedEvent(FileDownloadedEvent e) { downloaders.remove(e.downloader.infoHash) persistDownloaders() diff --git a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy index e2c2d2d2..b802c710 100644 --- a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy @@ -221,6 +221,14 @@ public class Downloader { } } + void addSource(Destination d) { + if (activeWorkers.containsKey(d)) + return + DownloadWorker newWorker = new DownloadWorker(d) + activeWorkers.put(d, newWorker) + executorService.submit(newWorker) + } + class DownloadWorker implements Runnable { private final Destination destination private volatile WorkerState currentState From 1ee9ccf098eb4ce1c1619ed099ab15cb15cf080c Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 12:55:25 +0100 Subject: [PATCH 13/27] parse X-Have on uploader side --- .../com/muwire/core/upload/ContentRequest.groovy | 1 + .../main/groovy/com/muwire/core/upload/Request.groovy | 9 ++++++++- .../com/muwire/core/upload/UploadManager.groovy | 11 ++++++++++- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/upload/ContentRequest.groovy b/core/src/main/groovy/com/muwire/core/upload/ContentRequest.groovy index 45f03336..fa500f13 100644 --- a/core/src/main/groovy/com/muwire/core/upload/ContentRequest.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/ContentRequest.groovy @@ -2,4 +2,5 @@ package com.muwire.core.upload class ContentRequest extends Request { Range range + boolean have } diff --git a/core/src/main/groovy/com/muwire/core/upload/Request.groovy b/core/src/main/groovy/com/muwire/core/upload/Request.groovy index a38b7d9d..a27acd59 100644 --- a/core/src/main/groovy/com/muwire/core/upload/Request.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/Request.groovy @@ -5,6 +5,7 @@ import java.nio.charset.StandardCharsets import com.muwire.core.Constants import com.muwire.core.InfoHash import com.muwire.core.Persona +import com.muwire.core.util.DataUtil import groovy.util.logging.Log import net.i2p.data.Base64 @@ -48,8 +49,14 @@ class Request { def decoded = Base64.decode(encoded) downloader = new Persona(new ByteArrayInputStream(decoded)) } + + boolean have = false + if (headers.containsKey("X-Have")) { + def encoded = headers["X-Have"].trim() + have = DataUtil.decodeXHave(encoded).size() > 0 + } new ContentRequest( infoHash : infoHash, range : new Range(start, end), - headers : headers, downloader : downloader) + headers : headers, downloader : downloader, have : have) } static Request parseHashListRequest(InfoHash infoHash, InputStream is) throws IOException { diff --git a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy index 6b7b76ba..3045ce08 100644 --- a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy @@ -6,6 +6,7 @@ import com.muwire.core.EventBus import com.muwire.core.InfoHash import com.muwire.core.SharedFile import com.muwire.core.connection.Endpoint +import com.muwire.core.download.SourceDiscoveredEvent import com.muwire.core.files.FileManager import groovy.util.logging.Log @@ -61,12 +62,16 @@ public class UploadManager { return } - Request request = Request.parseContentRequest(new InfoHash(infoHashRoot), e.getInputStream()) + ContentRequest request = Request.parseContentRequest(new InfoHash(infoHashRoot), e.getInputStream()) if (request.downloader != null && request.downloader.destination != e.destination) { log.info("Downloader persona doesn't match their destination") e.close() return } + + if (request.have) + eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : e.destination)) + Uploader uploader = new ContentUploader(sharedFiles.iterator().next().file, request, e) eventBus.publish(new UploadEvent(uploader : uploader)) try { @@ -153,6 +158,10 @@ public class UploadManager { e.close() return } + + if (request.have) + eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : e.destination)) + uploader = new ContentUploader(sharedFiles.iterator().next().file, request, e) eventBus.publish(new UploadEvent(uploader : uploader)) try { From 1b6eda5a403c4b4a397511d39ba3d5259e294a46 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 13:34:00 +0100 Subject: [PATCH 14/27] skeleton of mesh manager --- .../main/groovy/com/muwire/core/Core.groovy | 5 +++ .../groovy/com/muwire/core/mesh/Mesh.groovy | 19 ++++++++ .../com/muwire/core/mesh/MeshManager.groovy | 43 +++++++++++++++++++ 3 files changed, 67 insertions(+) create mode 100644 core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy create mode 100644 core/src/main/groovy/com/muwire/core/mesh/MeshManager.groovy diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 17b7eda0..73509f47 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -29,6 +29,7 @@ import com.muwire.core.files.DirectoryWatcher import com.muwire.core.hostcache.CacheClient import com.muwire.core.hostcache.HostCache import com.muwire.core.hostcache.HostDiscoveredEvent +import com.muwire.core.mesh.MeshManager import com.muwire.core.search.QueryEvent import com.muwire.core.search.ResultsEvent import com.muwire.core.search.ResultsSender @@ -166,6 +167,10 @@ public class Core { eventBus.register(FileDownloadedEvent.class, fileManager) eventBus.register(FileUnsharedEvent.class, fileManager) eventBus.register(SearchEvent.class, fileManager) + + log.info("initializing mesh manager") + MeshManager meshManager = new MeshManager(fileManager) + eventBus.register(SourceDiscoveredEvent.class, meshManager) log.info "initializing persistence service" persisterService = new PersisterService(new File(home, "files.json"), eventBus, 15000, fileManager) diff --git a/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy b/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy new file mode 100644 index 00000000..d18c75fb --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy @@ -0,0 +1,19 @@ +package com.muwire.core.mesh + +import com.muwire.core.InfoHash + +import com.muwire.core.download.Pieces + +import net.i2p.data.Destination +import net.i2p.util.ConcurrentHashSet + +class Mesh { + private final InfoHash infoHash + private final Set sources = new ConcurrentHashSet<>() + private final Pieces pieces + + Mesh(InfoHash infoHash, Pieces pieces) { + this.infoHash = infoHash + this.pieces = pieces + } +} diff --git a/core/src/main/groovy/com/muwire/core/mesh/MeshManager.groovy b/core/src/main/groovy/com/muwire/core/mesh/MeshManager.groovy new file mode 100644 index 00000000..0239acd8 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/mesh/MeshManager.groovy @@ -0,0 +1,43 @@ +package com.muwire.core.mesh + +import com.muwire.core.Constants +import com.muwire.core.InfoHash +import com.muwire.core.download.Pieces +import com.muwire.core.download.SourceDiscoveredEvent +import com.muwire.core.files.FileManager + +class MeshManager { + + private final Map meshes = Collections.synchronizedMap(new HashMap<>()) + private final FileManager fileManager + + MeshManager(FileManager fileManager) { + this.fileManager = fileManager + } + + Mesh get(InfoHash infoHash) { + meshes.get(infoHash) + } + + Mesh getOrCreate(InfoHash infoHash, int nPieces) { + synchronized(meshes) { + if (meshes.containsKey(infoHash)) + return meshes.get(infoHash) + Pieces pieces = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO) + if (fileManager.rootToFiles.containsKey(infoHash)) { + for (int i = 0; i < nPieces; i++) + pieces.markDownloaded(i) + } + Mesh rv = new Mesh(infoHash, pieces) + meshes.put(infoHash, rv) + return rv + } + } + + void onSourceDiscoveredEvent(SourceDiscoveredEvent e) { + Mesh mesh = meshes.get(e.infoHash) + if (mesh == null) + return + mesh.sources.add(e.source.destination) + } +} From 710f9f52a8ee152a421aad65a16898a148d7f8c2 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 13:58:21 +0100 Subject: [PATCH 15/27] send X-Have and X-Alts from uploader --- .../main/groovy/com/muwire/core/Core.groovy | 2 +- .../groovy/com/muwire/core/mesh/Mesh.groovy | 9 ++++++ .../muwire/core/upload/ContentUploader.groovy | 28 ++++++++++++++++--- .../muwire/core/upload/UploadManager.groovy | 16 +++++++++-- .../main/java/com/muwire/core/SharedFile.java | 9 ++++++ 5 files changed, 56 insertions(+), 8 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 73509f47..0c00a902 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -217,7 +217,7 @@ public class Core { eventBus.register(SourceDiscoveredEvent.class, downloadManager) log.info("initializing upload manager") - UploadManager uploadManager = new UploadManager(eventBus, fileManager) + UploadManager uploadManager = new UploadManager(eventBus, fileManager, meshManager) log.info("initializing connection establisher") connectionEstablisher = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache) diff --git a/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy b/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy index d18c75fb..2fd5546f 100644 --- a/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy +++ b/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy @@ -16,4 +16,13 @@ class Mesh { this.infoHash = infoHash this.pieces = pieces } + + Set getRandom(int n, Destination exclude) { + List tmp = new ArrayList<>(sources) + tmp.remove(exclude) + Collections.shuffle(tmp) + if (tmp.size() < n) + return tmp + tmp[0..n-1] + } } diff --git a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy index 49da55d5..c4d433ee 100644 --- a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy @@ -5,18 +5,25 @@ import java.nio.channels.FileChannel import java.nio.charset.StandardCharsets import java.nio.file.Files import java.nio.file.StandardOpenOption +import java.util.stream.Collectors import com.muwire.core.connection.Endpoint +import com.muwire.core.mesh.Mesh +import com.muwire.core.util.DataUtil + +import net.i2p.data.Destination class ContentUploader extends Uploader { private final File file private final ContentRequest request + private final Mesh mesh - ContentUploader(File file, ContentRequest request, Endpoint endpoint) { + ContentUploader(File file, ContentRequest request, Endpoint endpoint, Mesh mesh) { super(endpoint) this.file = file this.request = request + this.mesh = mesh } @Override @@ -24,14 +31,18 @@ class ContentUploader extends Uploader { OutputStream os = endpoint.getOutputStream() Range range = request.getRange() if (range.start >= file.length() || range.end >= file.length()) { - os.write("416 Range Not Satisfiable\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("416 Range Not Satisfiable\r\n".getBytes(StandardCharsets.US_ASCII)) + writeMesh() + os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) os.flush() return } os.write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII)) - os.write("Content-Range: $range.start-$range.end\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) - + os.write("Content-Range: $range.start-$range.end\r\n".getBytes(StandardCharsets.US_ASCII)) + writeMesh() + os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) + FileChannel channel try { channel = Files.newByteChannel(file.toPath(), EnumSet.of(StandardOpenOption.READ)) @@ -50,6 +61,15 @@ class ContentUploader extends Uploader { endpoint.getOutputStream().flush() } } + + private void writeMesh() { + String xHave = DataUtil.encodeXHave(mesh.pieces.getDownloaded(), mesh.pieces.nPieces) + endpoint.getOutputStream().write("X-Have: $xHave\r\n".getBytes(StandardCharsets.US_ASCII)) + + Set sources = mesh.getRandom(3, endpoint.destination) + String xAlts = sources.stream().map({ it.toBase64() }).collect(Collectors.joining(",")) + endpoint.getOutputStream().write("X-Alt: $xAlts\r\n".getBytes(StandardCharsets.US_ASCII)) + } @Override public String getName() { diff --git a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy index 3045ce08..b18a717d 100644 --- a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy @@ -8,6 +8,8 @@ import com.muwire.core.SharedFile import com.muwire.core.connection.Endpoint import com.muwire.core.download.SourceDiscoveredEvent import com.muwire.core.files.FileManager +import com.muwire.core.mesh.Mesh +import com.muwire.core.mesh.MeshManager import groovy.util.logging.Log import net.i2p.data.Base64 @@ -16,12 +18,14 @@ import net.i2p.data.Base64 public class UploadManager { private final EventBus eventBus private final FileManager fileManager + private final MeshManager meshManager public UploadManager() {} - public UploadManager(EventBus eventBus, FileManager fileManager) { + public UploadManager(EventBus eventBus, FileManager fileManager, MeshManager meshManager) { this.eventBus = eventBus this.fileManager = fileManager + this.meshManager = meshManager } public void processGET(Endpoint e) throws IOException { @@ -72,7 +76,10 @@ public class UploadManager { if (request.have) eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : e.destination)) - Uploader uploader = new ContentUploader(sharedFiles.iterator().next().file, request, e) + SharedFile file = sharedFiles.iterator().next(); + Mesh mesh = meshManager.getOrCreate(request.infoHash, file.NPieces) + + Uploader uploader = new ContentUploader(file.file, request, e, mesh) eventBus.publish(new UploadEvent(uploader : uploader)) try { uploader.respond() @@ -162,7 +169,10 @@ public class UploadManager { if (request.have) eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : e.destination)) - uploader = new ContentUploader(sharedFiles.iterator().next().file, request, e) + SharedFile file = sharedFiles.iterator().next(); + Mesh mesh = meshManager.getOrCreate(request.infoHash, file.NPieces) + + uploader = new ContentUploader(file.file, request, e, mesh) eventBus.publish(new UploadEvent(uploader : uploader)) try { uploader.respond() diff --git a/core/src/main/java/com/muwire/core/SharedFile.java b/core/src/main/java/com/muwire/core/SharedFile.java index 19c66c85..81072834 100644 --- a/core/src/main/java/com/muwire/core/SharedFile.java +++ b/core/src/main/java/com/muwire/core/SharedFile.java @@ -26,6 +26,15 @@ public class SharedFile { return pieceSize; } + public int getNPieces() { + long length = file.length(); + int rawPieceSize = 0x1 << pieceSize; + int rv = (int) (length / rawPieceSize); + if (length % pieceSize != 0) + rv++; + return rv; + } + @Override public int hashCode() { return file.hashCode() ^ infoHash.hashCode(); From 38ff49d28f6f3a1f97e75302c193dac19841f402 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 14:17:10 +0100 Subject: [PATCH 16/27] downloaders get pieces from mesh manager --- .../main/groovy/com/muwire/core/Core.groovy | 2 +- .../core/download/DownloadManager.groovy | 23 ++++++++++++++++--- .../muwire/core/download/Downloader.groovy | 13 +++-------- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 0c00a902..772aabd8 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -209,7 +209,7 @@ public class Core { eventBus.register(ResultsEvent.class, searchManager) log.info("initializing download manager") - downloadManager = new DownloadManager(eventBus, trustService, props, i2pConnector, home, me) + downloadManager = new DownloadManager(eventBus, trustService, meshManager, props, i2pConnector, home, me) eventBus.register(UIDownloadEvent.class, downloadManager) eventBus.register(UILoadedEvent.class, downloadManager) eventBus.register(FileDownloadedEvent.class, downloadManager) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy index d782aa27..b88ba098 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy @@ -3,6 +3,8 @@ package com.muwire.core.download import com.muwire.core.connection.I2PConnector import com.muwire.core.files.FileDownloadedEvent import com.muwire.core.files.FileHasher +import com.muwire.core.mesh.Mesh +import com.muwire.core.mesh.MeshManager import com.muwire.core.trust.TrustLevel import com.muwire.core.trust.TrustService import com.muwire.core.util.DataUtil @@ -28,6 +30,7 @@ public class DownloadManager { private final EventBus eventBus private final TrustService trustService + private final MeshManager meshManager private final MuWireSettings muSettings private final I2PConnector connector private final Executor executor @@ -36,7 +39,7 @@ public class DownloadManager { private final Map downloaders = new ConcurrentHashMap<>() - public DownloadManager(EventBus eventBus, TrustService trustService, MuWireSettings muSettings, + public DownloadManager(EventBus eventBus, TrustService trustService, MeshManager meshManager, MuWireSettings muSettings, I2PConnector connector, File home, Persona me) { this.eventBus = eventBus this.trustService = trustService @@ -70,9 +73,11 @@ public class DownloadManager { destinations.addAll(e.sources) destinations.remove(me.destination) + Pieces pieces = getPieces(infohash, size, pieceSize) + def downloader = new Downloader(eventBus, this, me, e.target, size, infohash, pieceSize, connector, destinations, - incompletes) + incompletes, pieces) downloaders.put(infohash, downloader) persistDownloaders() executor.execute({downloader.download()} as Runnable) @@ -108,14 +113,26 @@ public class DownloadManager { byte [] root = Base64.decode(json.hashRoot) infoHash = new InfoHash(root) } + + Pieces pieces = getPieces(infoHash, (long)json.length, json.pieceSizePow2) + def downloader = new Downloader(eventBus, this, me, file, (long)json.length, - infoHash, json.pieceSizePow2, connector, destinations, incompletes) + infoHash, json.pieceSizePow2, connector, destinations, incompletes, pieces) downloaders.put(infoHash, downloader) downloader.download() eventBus.publish(new DownloadStartedEvent(downloader : downloader)) } } + private Pieces getPieces(InfoHash infoHash, long length, int pieceSizePow2) { + int pieceSize = 0x1 << pieceSizePow2 + int nPieces = (int)(length / pieceSize) + if (length % pieceSize != 0) + nPieces++ + Mesh mesh = meshManager.getOrCreate(infoHash, nPieces) + mesh.pieces + } + void onSourceDiscoveredEvent(SourceDiscoveredEvent e) { Downloader downloader = downloaders.get(e.infoHash) if (downloader == null) diff --git a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy index b802c710..72959c4e 100644 --- a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy @@ -60,7 +60,7 @@ public class Downloader { public Downloader(EventBus eventBus, DownloadManager downloadManager, Persona me, File file, long length, InfoHash infoHash, int pieceSizePow2, I2PConnector connector, Set destinations, - File incompletes) { + File incompletes, Pieces pieces) { this.eventBus = eventBus this.me = me this.downloadManager = downloadManager @@ -73,15 +73,8 @@ public class Downloader { this.incompleteFile = new File(incompletes, file.getName()+".part") this.pieceSizePow2 = pieceSizePow2 this.pieceSize = 1 << pieceSizePow2 - - int nPieces - if (length % pieceSize == 0) - nPieces = length / pieceSize - else - nPieces = length / pieceSize + 1 - this.nPieces = nPieces - - pieces = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO) + this.pieces = pieces + this.nPieces = pieces.nPieces } public synchronized InfoHash getInfoHash() { From c210af78708365b43b72a1d26529ae49dc9ec2b9 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 14:39:20 +0100 Subject: [PATCH 17/27] source partial uploads from incompletes file --- .../main/groovy/com/muwire/core/Core.groovy | 2 +- .../muwire/core/upload/UploadManager.groovy | 74 +++++++++++++++---- 2 files changed, 60 insertions(+), 16 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 772aabd8..8b55d494 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -217,7 +217,7 @@ public class Core { eventBus.register(SourceDiscoveredEvent.class, downloadManager) log.info("initializing upload manager") - UploadManager uploadManager = new UploadManager(eventBus, fileManager, meshManager) + UploadManager uploadManager = new UploadManager(eventBus, fileManager, meshManager, downloadManager) log.info("initializing connection establisher") connectionEstablisher = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache) diff --git a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy index b18a717d..c73cf988 100644 --- a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy @@ -6,6 +6,8 @@ import com.muwire.core.EventBus import com.muwire.core.InfoHash import com.muwire.core.SharedFile import com.muwire.core.connection.Endpoint +import com.muwire.core.download.DownloadManager +import com.muwire.core.download.Downloader import com.muwire.core.download.SourceDiscoveredEvent import com.muwire.core.files.FileManager import com.muwire.core.mesh.Mesh @@ -19,13 +21,16 @@ public class UploadManager { private final EventBus eventBus private final FileManager fileManager private final MeshManager meshManager + private final DownloadManager downloadManager public UploadManager() {} - public UploadManager(EventBus eventBus, FileManager fileManager, MeshManager meshManager) { + public UploadManager(EventBus eventBus, FileManager fileManager, + MeshManager meshManager, DownloadManager downloadManager) { this.eventBus = eventBus this.fileManager = fileManager this.meshManager = meshManager + this.downloadManager = downloadManager } public void processGET(Endpoint e) throws IOException { @@ -49,8 +54,10 @@ public class UploadManager { log.info("Responding to upload request for root $infoHashString") byte [] infoHashRoot = Base64.decode(infoHashString) + InfoHash infoHash = new InfoHash(infoHashRoot) Set sharedFiles = fileManager.getSharedFiles(infoHashRoot) - if (sharedFiles == null || sharedFiles.isEmpty()) { + Downloader downloader = downloadManager.downloaders.get(infoHash) + if (downloader == null && (sharedFiles == null || sharedFiles.isEmpty())) { log.info "file not found" e.getOutputStream().write("404 File Not Found\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) e.getOutputStream().flush() @@ -66,7 +73,7 @@ public class UploadManager { return } - ContentRequest request = Request.parseContentRequest(new InfoHash(infoHashRoot), e.getInputStream()) + ContentRequest request = Request.parseContentRequest(infoHash, e.getInputStream()) if (request.downloader != null && request.downloader.destination != e.destination) { log.info("Downloader persona doesn't match their destination") e.close() @@ -76,10 +83,18 @@ public class UploadManager { if (request.have) eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : e.destination)) - SharedFile file = sharedFiles.iterator().next(); - Mesh mesh = meshManager.getOrCreate(request.infoHash, file.NPieces) + Mesh mesh + File file + if (downloader != null) { + mesh = meshManager.get(infoHash) + file = downloader.incompleteFile + } else { + SharedFile sharedFile = sharedFiles.iterator().next(); + mesh = meshManager.getOrCreate(request.infoHash, sharedFile.NPieces) + file = sharedFile.file + } - Uploader uploader = new ContentUploader(file.file, request, e, mesh) + Uploader uploader = new ContentUploader(file, request, e, mesh) eventBus.publish(new UploadEvent(uploader : uploader)) try { uploader.respond() @@ -97,8 +112,10 @@ public class UploadManager { log.info("Responding to hashlist request for root $infoHashString") byte [] infoHashRoot = Base64.decode(infoHashString) + InfoHash infoHash = new InfoHash(infoHashRoot) + Downloader downloader = downloadManager.downloaders.get(infoHash) Set sharedFiles = fileManager.getSharedFiles(infoHashRoot) - if (sharedFiles == null || sharedFiles.isEmpty()) { + if (downloader == null && (sharedFiles == null || sharedFiles.isEmpty())) { log.info "file not found" e.getOutputStream().write("404 File Not Found\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) e.getOutputStream().flush() @@ -114,13 +131,30 @@ public class UploadManager { return } - Request request = Request.parseHashListRequest(new InfoHash(infoHashRoot), e.getInputStream()) + Request request = Request.parseHashListRequest(infoHash, e.getInputStream()) if (request.downloader != null && request.downloader.destination != e.destination) { log.info("Downloader persona doesn't match their destination") e.close() return } - Uploader uploader = new HashListUploader(e, sharedFiles.iterator().next().infoHash, request) + + InfoHash fullInfoHash + if (downloader == null) { + fullInfoHash = sharedFiles.iterator().next().infoHash + } else { + byte [] hashList = downloader.getInfoHash().getHashList() + if (hashList != null && hashList.length > 0) + fullInfoHash = downloader.getInfoHash() + else { + log.info("infohash not found in downloader") + e.getOutputStream().write("404 File Not Found\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + e.getOutputStream().flush() + e.close() + return + } + } + + Uploader uploader = new HashListUploader(e, fullInfoHash, request) eventBus.publish(new UploadEvent(uploader : uploader)) try { uploader.respond() @@ -142,8 +176,10 @@ public class UploadManager { log.info("Responding to upload request for root $infoHashString") infoHashRoot = Base64.decode(infoHashString) + infoHash = new InfoHash(infoHashRoot) sharedFiles = fileManager.getSharedFiles(infoHashRoot) - if (sharedFiles == null || sharedFiles.isEmpty()) { + downloader = downloadManager.downloaders.get(infoHash) + if (downloader == null && (sharedFiles == null || sharedFiles.isEmpty())) { log.info "file not found" e.getOutputStream().write("404 File Not Found\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) e.getOutputStream().flush() @@ -168,11 +204,19 @@ public class UploadManager { if (request.have) eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : e.destination)) - - SharedFile file = sharedFiles.iterator().next(); - Mesh mesh = meshManager.getOrCreate(request.infoHash, file.NPieces) - - uploader = new ContentUploader(file.file, request, e, mesh) + + Mesh mesh + File file + if (downloader != null) { + mesh = meshManager.get(infoHash) + file = downloader.incompleteFile + } else { + SharedFile sharedFile = sharedFiles.iterator().next(); + mesh = meshManager.getOrCreate(request.infoHash, sharedFile.NPieces) + file = sharedFile.file + } + + uploader = new ContentUploader(file, request, e, mesh) eventBus.publish(new UploadEvent(uploader : uploader)) try { uploader.respond() From 85466a8e8066217e4104a84cd72cd32a4aefa086 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 14:45:14 +0100 Subject: [PATCH 18/27] fix npe --- .../main/groovy/com/muwire/core/download/DownloadManager.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy index b88ba098..bd28907e 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy @@ -43,6 +43,7 @@ public class DownloadManager { I2PConnector connector, File home, Persona me) { this.eventBus = eventBus this.trustService = trustService + this.meshManager = meshManager this.muSettings = muSettings this.connector = connector this.incompletes = new File(home,"incompletes") From 40410eba63a11509f58b46ded5ab9eae57f19395 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 14:57:53 +0100 Subject: [PATCH 19/27] fix constructor --- .../main/groovy/com/muwire/core/download/Downloader.groovy | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy index 72959c4e..402138c5 100644 --- a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy @@ -228,6 +228,7 @@ public class Downloader { private volatile Thread downloadThread private Endpoint endpoint private volatile DownloadSession currentSession + private final Set available = new HashSet<>() DownloadWorker(Destination destination) { this.destination = destination @@ -248,7 +249,8 @@ public class Downloader { currentState = WorkerState.DOWNLOADING boolean requestPerformed while(!pieces.isComplete()) { - currentSession = new DownloadSession(eventBus, me.toBase64(), pieces, getInfoHash(), endpoint, incompleteFile, pieceSize, length) + currentSession = new DownloadSession(eventBus, me.toBase64(), pieces, getInfoHash(), + endpoint, incompleteFile, pieceSize, length, available) requestPerformed = currentSession.request() if (!requestPerformed) break From 89e761f53b6c2b1de19f27381c09040b8b81fa25 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 15:26:18 +0100 Subject: [PATCH 20/27] write personas on the wire part1 --- core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy | 8 ++++---- .../main/groovy/com/muwire/core/mesh/MeshManager.groovy | 2 +- .../groovy/com/muwire/core/upload/ContentUploader.groovy | 9 +++++---- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy b/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy index 2fd5546f..e6e0faf7 100644 --- a/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy +++ b/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy @@ -1,7 +1,7 @@ package com.muwire.core.mesh import com.muwire.core.InfoHash - +import com.muwire.core.Persona import com.muwire.core.download.Pieces import net.i2p.data.Destination @@ -9,7 +9,7 @@ import net.i2p.util.ConcurrentHashSet class Mesh { private final InfoHash infoHash - private final Set sources = new ConcurrentHashSet<>() + private final Set sources = new ConcurrentHashSet<>() private final Pieces pieces Mesh(InfoHash infoHash, Pieces pieces) { @@ -17,8 +17,8 @@ class Mesh { this.pieces = pieces } - Set getRandom(int n, Destination exclude) { - List tmp = new ArrayList<>(sources) + Set getRandom(int n, Persona exclude) { + List tmp = new ArrayList<>(sources) tmp.remove(exclude) Collections.shuffle(tmp) if (tmp.size() < n) diff --git a/core/src/main/groovy/com/muwire/core/mesh/MeshManager.groovy b/core/src/main/groovy/com/muwire/core/mesh/MeshManager.groovy index 0239acd8..67aadfdd 100644 --- a/core/src/main/groovy/com/muwire/core/mesh/MeshManager.groovy +++ b/core/src/main/groovy/com/muwire/core/mesh/MeshManager.groovy @@ -38,6 +38,6 @@ class MeshManager { Mesh mesh = meshes.get(e.infoHash) if (mesh == null) return - mesh.sources.add(e.source.destination) + mesh.sources.add(e.source) } } diff --git a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy index c4d433ee..8d48ac65 100644 --- a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy @@ -7,6 +7,7 @@ import java.nio.file.Files import java.nio.file.StandardOpenOption import java.util.stream.Collectors +import com.muwire.core.Persona import com.muwire.core.connection.Endpoint import com.muwire.core.mesh.Mesh import com.muwire.core.util.DataUtil @@ -32,7 +33,7 @@ class ContentUploader extends Uploader { Range range = request.getRange() if (range.start >= file.length() || range.end >= file.length()) { os.write("416 Range Not Satisfiable\r\n".getBytes(StandardCharsets.US_ASCII)) - writeMesh() + writeMesh(request.downloader) os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) os.flush() return @@ -40,7 +41,7 @@ class ContentUploader extends Uploader { os.write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII)) os.write("Content-Range: $range.start-$range.end\r\n".getBytes(StandardCharsets.US_ASCII)) - writeMesh() + writeMesh(request.downloader) os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) FileChannel channel @@ -62,11 +63,11 @@ class ContentUploader extends Uploader { } } - private void writeMesh() { + private void writeMesh(Persona toExclude) { String xHave = DataUtil.encodeXHave(mesh.pieces.getDownloaded(), mesh.pieces.nPieces) endpoint.getOutputStream().write("X-Have: $xHave\r\n".getBytes(StandardCharsets.US_ASCII)) - Set sources = mesh.getRandom(3, endpoint.destination) + Set sources = mesh.getRandom(3, toExclude) String xAlts = sources.stream().map({ it.toBase64() }).collect(Collectors.joining(",")) endpoint.getOutputStream().write("X-Alt: $xAlts\r\n".getBytes(StandardCharsets.US_ASCII)) } From d1c8328080ad83add6a652028bac5282671d2a8b Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 15:39:00 +0100 Subject: [PATCH 21/27] do not send alts if there aren't any --- .../com/muwire/core/download/DownloadSession.groovy | 10 ++++++---- .../com/muwire/core/upload/ContentUploader.groovy | 6 ++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy index 8744f681..98cb2b42 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy @@ -127,10 +127,12 @@ class DownloadSession { // prase X-Alt if present if (headers.containsKey("X-Alt")) { - headers["X-Alt"].split(",").each { - byte [] raw = Base64.decode(it) - Persona source = new Persona(new ByteArrayInputStream(raw)) - eventBus.publish(new SourceDiscoveredEvent(infoHash : infoHash, source : source)) + headers["X-Alt"].split(",").each { + if (it.length() > 0) { + byte [] raw = Base64.decode(it) + Persona source = new Persona(new ByteArrayInputStream(raw)) + eventBus.publish(new SourceDiscoveredEvent(infoHash : infoHash, source : source)) + } } } diff --git a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy index 8d48ac65..aad30f5c 100644 --- a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy @@ -68,8 +68,10 @@ class ContentUploader extends Uploader { endpoint.getOutputStream().write("X-Have: $xHave\r\n".getBytes(StandardCharsets.US_ASCII)) Set sources = mesh.getRandom(3, toExclude) - String xAlts = sources.stream().map({ it.toBase64() }).collect(Collectors.joining(",")) - endpoint.getOutputStream().write("X-Alt: $xAlts\r\n".getBytes(StandardCharsets.US_ASCII)) + if (!sources.isEmpty()) { + String xAlts = sources.stream().map({ it.toBase64() }).collect(Collectors.joining(",")) + endpoint.getOutputStream().write("X-Alt: $xAlts\r\n".getBytes(StandardCharsets.US_ASCII)) + } } @Override From ab5fea9216be82071e0bca5c489ab4db15d4e7bb Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 16:03:20 +0100 Subject: [PATCH 22/27] 416 if piece not downloaded --- .../com/muwire/core/download/Pieces.groovy | 4 ++++ .../muwire/core/upload/ContentUploader.groovy | 16 ++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/download/Pieces.groovy b/core/src/main/groovy/com/muwire/core/download/Pieces.groovy index 1bcac9f3..243c1c14 100644 --- a/core/src/main/groovy/com/muwire/core/download/Pieces.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Pieces.groovy @@ -74,4 +74,8 @@ class Pieces { synchronized int donePieces() { done.cardinality() } + + synchronized boolean isDownloaded(int piece) { + done.get(piece) + } } diff --git a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy index aad30f5c..ef33e8fd 100644 --- a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy @@ -19,19 +19,31 @@ class ContentUploader extends Uploader { private final File file private final ContentRequest request private final Mesh mesh + private final int pieceSize - ContentUploader(File file, ContentRequest request, Endpoint endpoint, Mesh mesh) { + ContentUploader(File file, ContentRequest request, Endpoint endpoint, Mesh mesh, int pieceSize) { super(endpoint) this.file = file this.request = request this.mesh = mesh + this.pieceSize = pieceSize } @Override void respond() { OutputStream os = endpoint.getOutputStream() Range range = request.getRange() - if (range.start >= file.length() || range.end >= file.length()) { + boolean satisfiable = true + final long length = file.length() + if (range.start >= length || range.end >= length) + satisfiable = false + if (satisfiable) { + int startPiece = length / range.start + int endPiece = length / range.end + for (int i = startPiece; i < endPiece; i++) + satisfiable &= mesh.pieces.isDownloaded(i) + } + if (!satisfiable) { os.write("416 Range Not Satisfiable\r\n".getBytes(StandardCharsets.US_ASCII)) writeMesh(request.downloader) os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) From 2be9c425f70408d5c0611ca448bd24994a190dd1 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 16:09:57 +0100 Subject: [PATCH 23/27] compute which pieces are requested --- .../com/muwire/core/upload/ContentUploader.groovy | 4 ++-- .../groovy/com/muwire/core/upload/UploadManager.groovy | 10 ++++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy index ef33e8fd..d8836332 100644 --- a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy @@ -38,8 +38,8 @@ class ContentUploader extends Uploader { if (range.start >= length || range.end >= length) satisfiable = false if (satisfiable) { - int startPiece = length / range.start - int endPiece = length / range.end + int startPiece = range.start / (0x1 << pieceSize) + int endPiece = range.end / (0x1 << pieceSize) for (int i = startPiece; i < endPiece; i++) satisfiable &= mesh.pieces.isDownloaded(i) } diff --git a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy index c73cf988..d4758e37 100644 --- a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy @@ -85,16 +85,19 @@ public class UploadManager { Mesh mesh File file + int pieceSize if (downloader != null) { mesh = meshManager.get(infoHash) file = downloader.incompleteFile + pieceSize = downloader.pieceSizePow2 } else { SharedFile sharedFile = sharedFiles.iterator().next(); mesh = meshManager.getOrCreate(request.infoHash, sharedFile.NPieces) file = sharedFile.file + pieceSize = sharedFile.pieceSize } - Uploader uploader = new ContentUploader(file, request, e, mesh) + Uploader uploader = new ContentUploader(file, request, e, mesh, pieceSize) eventBus.publish(new UploadEvent(uploader : uploader)) try { uploader.respond() @@ -207,16 +210,19 @@ public class UploadManager { Mesh mesh File file + int pieceSize if (downloader != null) { mesh = meshManager.get(infoHash) file = downloader.incompleteFile + pieceSize = downloader.pieceSizePow2 } else { SharedFile sharedFile = sharedFiles.iterator().next(); mesh = meshManager.getOrCreate(request.infoHash, sharedFile.NPieces) file = sharedFile.file + pieceSize = sharedFile.pieceSize } - uploader = new ContentUploader(file, request, e, mesh) + uploader = new ContentUploader(file, request, e, mesh, pieceSize) eventBus.publish(new UploadEvent(uploader : uploader)) try { uploader.respond() From f794c397607b046ea78e9e185633962c89b2c79d Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 16:15:35 +0100 Subject: [PATCH 24/27] personas not destinations --- .../main/groovy/com/muwire/core/upload/UploadManager.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy index d4758e37..83f90d57 100644 --- a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy @@ -81,7 +81,7 @@ public class UploadManager { } if (request.have) - eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : e.destination)) + eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : request.downloader)) Mesh mesh File file @@ -206,7 +206,7 @@ public class UploadManager { } if (request.have) - eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : e.destination)) + eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : request.downloader)) Mesh mesh File file From d27872cc8bde48c98039e08e40e1058fd06900b3 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 16:29:52 +0100 Subject: [PATCH 25/27] investigate StringIndexOutOfBounds --- .../groovy/com/muwire/core/download/DownloadSession.groovy | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy index 98cb2b42..71bb9f81 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy @@ -97,7 +97,8 @@ class DownloadSession { os.flush() String codeString = readTillRN(is) int space = codeString.indexOf(' ') - codeString = codeString.substring(0, space) + if (space > 0) + codeString = codeString.substring(0, space) int code = Integer.parseInt(codeString.trim()) From ed12d78a48ec85a5829277af0bbbe747034a9164 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 17:22:55 +0100 Subject: [PATCH 26/27] clear pieces on cancel --- .../main/groovy/com/muwire/core/download/Downloader.groovy | 1 + core/src/main/groovy/com/muwire/core/download/Pieces.groovy | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy index 402138c5..6e80ebba 100644 --- a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy @@ -180,6 +180,7 @@ public class Downloader { piecesFile.delete() } incompleteFile.delete() + pieces.clearAll() } void stop() { diff --git a/core/src/main/groovy/com/muwire/core/download/Pieces.groovy b/core/src/main/groovy/com/muwire/core/download/Pieces.groovy index 243c1c14..1d947d76 100644 --- a/core/src/main/groovy/com/muwire/core/download/Pieces.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Pieces.groovy @@ -78,4 +78,9 @@ class Pieces { synchronized boolean isDownloaded(int piece) { done.get(piece) } + + synchronized void clearAll() { + done.clear() + claimed.clear() + } } From c79e8712d01910d1e3751b0ba1d14a305968bd46 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 21 Jun 2019 20:36:33 +0100 Subject: [PATCH 27/27] correctly determine if uploader has requested piece --- .../main/groovy/com/muwire/core/upload/ContentUploader.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy index d8836332..7928c730 100644 --- a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy @@ -40,7 +40,7 @@ class ContentUploader extends Uploader { if (satisfiable) { int startPiece = range.start / (0x1 << pieceSize) int endPiece = range.end / (0x1 << pieceSize) - for (int i = startPiece; i < endPiece; i++) + for (int i = startPiece; i <= endPiece; i++) satisfiable &= mesh.pieces.isDownloaded(i) } if (!satisfiable) {