From e389090b7e95212137ff94e28007ad5e339bf7eb Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Wed, 12 Jun 2019 22:13:16 +0100 Subject: [PATCH] download side of oob hashlist --- .../core/download/DownloadSession.groovy | 4 +- .../muwire/core/download/Downloader.groovy | 35 ++++++-- .../core/download/HashListSession.groovy | 82 +++++++++++++++++++ .../core/upload/HashListUploader.groovy | 2 +- .../muwire/core/upload/UploadManager.groovy | 2 +- 5 files changed, 116 insertions(+), 9 deletions(-) create mode 100644 core/src/main/groovy/com/muwire/core/download/HashListSession.groovy diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy index 3c4d74ac..de5051e7 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy @@ -106,7 +106,7 @@ class DownloadSession { if (!code.startsWith("200 ")) { log.warning("unknown code $code") endpoint.close() - return + return false } // parse all headers @@ -131,7 +131,7 @@ class DownloadSession { if (receivedStart != start || receivedEnd != end) { log.warning("We don't support mismatching ranges yet") endpoint.close() - return + return false } // start the download diff --git a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy index 0bad8779..1ae0f774 100644 --- a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy @@ -20,8 +20,8 @@ import net.i2p.data.Destination @Log public class Downloader { - public enum DownloadState { CONNECTING, DOWNLOADING, FAILED, CANCELLED, FINISHED } - private enum WorkerState { CONNECTING, DOWNLOADING, FINISHED} + public enum DownloadState { CONNECTING, HASHLIST, DOWNLOADING, FAILED, CANCELLED, FINISHED } + private enum WorkerState { CONNECTING, HASHLIST, DOWNLOADING, FINISHED} private static final ExecutorService executorService = Executors.newCachedThreadPool({r -> Thread rv = new Thread(r) @@ -36,7 +36,7 @@ public class Downloader { private final File file private final Pieces downloaded, claimed private final long length - private final InfoHash infoHash + private InfoHash infoHash private final int pieceSize private final I2PConnector connector private final Set destinations @@ -76,6 +76,14 @@ public class Downloader { claimed = new Pieces(nPieces) } + private synchronized InfoHash getInfoHash() { + infoHash + } + + private synchronized void setInfoHash(InfoHash infoHash) { + this.infoHash = infoHash + } + void download() { readPieces() destinations.each { @@ -145,6 +153,17 @@ public class Downloader { if (oneDownloading) return DownloadState.DOWNLOADING + // at least one is requesting hashlist + boolean oneHashlist = false + activeWorkers.values().each { + if (it.currentState == WorkerState.HASHLIST) { + oneHashlist = true + return + } + } + if (oneHashlist) + return DownloadState.HASHLIST + return DownloadState.CONNECTING } @@ -198,10 +217,16 @@ public class Downloader { Endpoint endpoint = null try { endpoint = connector.connect(destination) + while(getInfoHash().hashList == null) { + currentState = WorkerState.HASHLIST + HashListSession session = new HashListSession(me.toBase64(), infoHash, endpoint) + InfoHash received = session.request() + setInfoHash(received) + } currentState = WorkerState.DOWNLOADING boolean requestPerformed while(!downloaded.isComplete()) { - currentSession = new DownloadSession(me.toBase64(), downloaded, claimed, infoHash, endpoint, file, pieceSize, length) + currentSession = new DownloadSession(me.toBase64(), downloaded, claimed, getInfoHash(), endpoint, file, pieceSize, length) requestPerformed = currentSession.request() if (!requestPerformed) break @@ -214,7 +239,7 @@ public class Downloader { if (downloaded.isComplete() && !eventFired) { piecesFile.delete() eventFired = true - eventBus.publish(new FileDownloadedEvent(downloadedFile : new DownloadedFile(file, infoHash, pieceSizePow2, Collections.emptySet())), + eventBus.publish(new FileDownloadedEvent(downloadedFile : new DownloadedFile(file, getInfoHash(), pieceSizePow2, Collections.emptySet())), downloader : Downloader.this) } endpoint?.close() diff --git a/core/src/main/groovy/com/muwire/core/download/HashListSession.groovy b/core/src/main/groovy/com/muwire/core/download/HashListSession.groovy new file mode 100644 index 00000000..2eb9f56d --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/download/HashListSession.groovy @@ -0,0 +1,82 @@ +package com.muwire.core.download + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.security.MessageDigest +import java.security.NoSuchAlgorithmException + +import com.muwire.core.Constants +import com.muwire.core.InfoHash +import com.muwire.core.connection.Endpoint + +import groovy.util.logging.Log + +import static com.muwire.core.util.DataUtil.readTillRN + +import net.i2p.data.Base64 + +@Log +class HashListSession { + private final String meB64 + private final InfoHash infoHash + private final Endpoint endpoint + + HashListSession(String meB64, InfoHash infoHash, Endpoint endpoint) { + this.meB64 = meB64 + this.infoHash = infoHash + this.endpoint = endpoint + } + + InfoHash request() throws IOException { + InputStream is = endpoint.getInputStream() + OutputStream os = endpoint.getOutputStream() + + String root = Base64.encode(infoHash.getRoot()) + os.write("HASHLIST $root\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("X-Persona: $meB64\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + os.flush() + + String code = readTillRN(is) + if (!code.startsWith("200")) + throw new IOException("unknown code $code") + + // parse all headers + Set headers = new HashSet<>() + String header + while((header = readTillRN(is)) != "" && headers.size() < Constants.MAX_HEADERS) + headers.add(header) + + long receivedStart = -1 + long receivedEnd = -1 + for (String receivedHeader : headers) { + def group = (receivedHeader =~ /^Content-Range: (\d+)-(\d+)$/) + if (group.size() != 1) { + log.info("ignoring header $receivedHeader") + continue + } + + receivedStart = Long.parseLong(group[0][1]) + receivedEnd = Long.parseLong(group[0][2]) + } + + if (receivedStart != 0) + throw new IOException("hashlist started at $receivedStart") + + byte[] hashList = new byte[receivedEnd] + ByteBuffer hashListBuf = ByteBuffer.wrap(hashList) + byte[] tmp = new byte[0x1 << 13] + while(hashListBuf.hasRemaining()) { + if (hashListBuf.remaining() > tmp.length) + tmp = new byte[hashListBuf.remaining()] + int read = is.read(tmp) + if (read == -1) + throw new IOException() + hashListBuf.put(tmp, 0, read) + } + + InfoHash received = InfoHash.fromHashList(hashList) + if (received.getRoot() != infoHash.getRoot()) + throw new IOException("fetched list doesn't match root") + received + } +} diff --git a/core/src/main/groovy/com/muwire/core/upload/HashListUploader.groovy b/core/src/main/groovy/com/muwire/core/upload/HashListUploader.groovy index ac0d3e03..c302338b 100644 --- a/core/src/main/groovy/com/muwire/core/upload/HashListUploader.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/HashListUploader.groovy @@ -19,7 +19,7 @@ class HashListUploader extends Uploader { void respond() { OutputStream os = endpoint.getOutputStream() os.write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII)) - os.write("Content-Range: 0-${mapped.remaining()}") + os.write("Content-Range: 0-${mapped.remaining()}\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) byte[]tmp = new byte[0x1 << 13] while(mapped.hasRemaining()) { diff --git a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy index 2b6472d6..6b7b76ba 100644 --- a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy @@ -108,7 +108,7 @@ public class UploadManager { e.close() return } - Uploader uploader = new HashListUploader(e, sharedFiles.iterator().next().infoHash, request, request) + Uploader uploader = new HashListUploader(e, sharedFiles.iterator().next().infoHash, request) eventBus.publish(new UploadEvent(uploader : uploader)) try { uploader.respond()