diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 0f3787b7..8b55d494 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -13,6 +13,7 @@ import com.muwire.core.connection.I2PConnector import com.muwire.core.connection.LeafConnectionManager import com.muwire.core.connection.UltrapeerConnectionManager import com.muwire.core.download.DownloadManager +import com.muwire.core.download.SourceDiscoveredEvent import com.muwire.core.download.UIDownloadCancelledEvent import com.muwire.core.download.UIDownloadEvent import com.muwire.core.files.FileDownloadedEvent @@ -28,6 +29,7 @@ import com.muwire.core.files.DirectoryWatcher import com.muwire.core.hostcache.CacheClient import com.muwire.core.hostcache.HostCache import com.muwire.core.hostcache.HostDiscoveredEvent +import com.muwire.core.mesh.MeshManager import com.muwire.core.search.QueryEvent import com.muwire.core.search.ResultsEvent import com.muwire.core.search.ResultsSender @@ -165,6 +167,10 @@ public class Core { eventBus.register(FileDownloadedEvent.class, fileManager) eventBus.register(FileUnsharedEvent.class, fileManager) eventBus.register(SearchEvent.class, fileManager) + + log.info("initializing mesh manager") + MeshManager meshManager = new MeshManager(fileManager) + eventBus.register(SourceDiscoveredEvent.class, meshManager) log.info "initializing persistence service" persisterService = new PersisterService(new File(home, "files.json"), eventBus, 15000, fileManager) @@ -203,14 +209,15 @@ public class Core { eventBus.register(ResultsEvent.class, searchManager) log.info("initializing download manager") - downloadManager = new DownloadManager(eventBus, i2pConnector, home, me) + downloadManager = new DownloadManager(eventBus, trustService, meshManager, props, i2pConnector, home, me) eventBus.register(UIDownloadEvent.class, downloadManager) eventBus.register(UILoadedEvent.class, downloadManager) eventBus.register(FileDownloadedEvent.class, downloadManager) eventBus.register(UIDownloadCancelledEvent.class, downloadManager) + eventBus.register(SourceDiscoveredEvent.class, downloadManager) log.info("initializing upload manager") - UploadManager uploadManager = new UploadManager(eventBus, fileManager) + UploadManager uploadManager = new UploadManager(eventBus, fileManager, meshManager, downloadManager) log.info("initializing connection establisher") connectionEstablisher = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy index 5106e536..bd28907e 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy @@ -3,6 +3,10 @@ package com.muwire.core.download import com.muwire.core.connection.I2PConnector import com.muwire.core.files.FileDownloadedEvent import com.muwire.core.files.FileHasher +import com.muwire.core.mesh.Mesh +import com.muwire.core.mesh.MeshManager +import com.muwire.core.trust.TrustLevel +import com.muwire.core.trust.TrustService import com.muwire.core.util.DataUtil import groovy.json.JsonBuilder @@ -14,24 +18,33 @@ import net.i2p.util.ConcurrentHashSet import com.muwire.core.EventBus import com.muwire.core.InfoHash +import com.muwire.core.MuWireSettings import com.muwire.core.Persona import com.muwire.core.UILoadedEvent +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executor import java.util.concurrent.Executors public class DownloadManager { private final EventBus eventBus + private final TrustService trustService + private final MeshManager meshManager + private final MuWireSettings muSettings private final I2PConnector connector private final Executor executor private final File incompletes, home private final Persona me - private final Set downloaders = new ConcurrentHashSet<>() + private final Map downloaders = new ConcurrentHashMap<>() - public DownloadManager(EventBus eventBus, I2PConnector connector, File home, Persona me) { + public DownloadManager(EventBus eventBus, TrustService trustService, MeshManager meshManager, MuWireSettings muSettings, + I2PConnector connector, File home, Persona me) { this.eventBus = eventBus + this.trustService = trustService + this.meshManager = meshManager + this.muSettings = muSettings this.connector = connector this.incompletes = new File(home,"incompletes") this.home = home @@ -61,17 +74,19 @@ public class DownloadManager { destinations.addAll(e.sources) destinations.remove(me.destination) + Pieces pieces = getPieces(infohash, size, pieceSize) + def downloader = new Downloader(eventBus, this, me, e.target, size, infohash, pieceSize, connector, destinations, - incompletes) - downloaders.add(downloader) + incompletes, pieces) + downloaders.put(infohash, downloader) persistDownloaders() executor.execute({downloader.download()} as Runnable) eventBus.publish(new DownloadStartedEvent(downloader : downloader)) } public void onUIDownloadCancelledEvent(UIDownloadCancelledEvent e) { - downloaders.remove(e.downloader) + downloaders.remove(e.downloader.infoHash) persistDownloaders() } @@ -99,23 +114,50 @@ public class DownloadManager { byte [] root = Base64.decode(json.hashRoot) infoHash = new InfoHash(root) } + + Pieces pieces = getPieces(infoHash, (long)json.length, json.pieceSizePow2) + def downloader = new Downloader(eventBus, this, me, file, (long)json.length, - infoHash, json.pieceSizePow2, connector, destinations, incompletes) - downloaders.add(downloader) + infoHash, json.pieceSizePow2, connector, destinations, incompletes, pieces) + downloaders.put(infoHash, downloader) downloader.download() eventBus.publish(new DownloadStartedEvent(downloader : downloader)) } } + private Pieces getPieces(InfoHash infoHash, long length, int pieceSizePow2) { + int pieceSize = 0x1 << pieceSizePow2 + int nPieces = (int)(length / pieceSize) + if (length % pieceSize != 0) + nPieces++ + Mesh mesh = meshManager.getOrCreate(infoHash, nPieces) + mesh.pieces + } + + void onSourceDiscoveredEvent(SourceDiscoveredEvent e) { + Downloader downloader = downloaders.get(e.infoHash) + if (downloader == null) + return + boolean ok = false + switch(trustService.getLevel(e.source.destination)) { + case TrustLevel.TRUSTED: ok = true; break + case TrustLevel.NEUTRAL: ok = muSettings.allowUntrusted; break + case TrustLevel.DISTRUSTED: ok = false; break + } + + if (ok) + downloader.addSource(e.source.destination) + } + void onFileDownloadedEvent(FileDownloadedEvent e) { - downloaders.remove(e.downloader) + downloaders.remove(e.downloader.infoHash) persistDownloaders() } private void persistDownloaders() { File downloadsFile = new File(home,"downloads.json") downloadsFile.withPrintWriter { writer -> - downloaders.each { downloader -> + downloaders.values().each { downloader -> if (!downloader.cancelled) { def json = [:] json.file = Base64.encode(DataUtil.encodei18nString(downloader.file.getAbsolutePath())) @@ -139,7 +181,7 @@ public class DownloadManager { } public void shutdown() { - downloaders.each { it.stop() } + downloaders.values().each { it.stop() } Downloader.executorService.shutdownNow() } } diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy index a6a5172d..71bb9f81 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy @@ -3,8 +3,12 @@ package com.muwire.core.download; import net.i2p.data.Base64 import com.muwire.core.Constants +import com.muwire.core.EventBus import com.muwire.core.InfoHash +import com.muwire.core.Persona import com.muwire.core.connection.Endpoint +import com.muwire.core.util.DataUtil + import static com.muwire.core.util.DataUtil.readTillRN import groovy.util.logging.Log @@ -23,6 +27,7 @@ class DownloadSession { private static int SAMPLES = 10 + private final EventBus eventBus private final String meB64 private final Pieces pieces private final InfoHash infoHash @@ -30,6 +35,7 @@ class DownloadSession { private final File file private final int pieceSize private final long fileLength + private final Set available private final MessageDigest digest private final LinkedList timestamps = new LinkedList<>() @@ -37,8 +43,9 @@ class DownloadSession { private ByteBuffer mapped - DownloadSession(String meB64, Pieces pieces, InfoHash infoHash, Endpoint endpoint, File file, - int pieceSize, long fileLength) { + DownloadSession(EventBus eventBus, String meB64, Pieces pieces, InfoHash infoHash, Endpoint endpoint, File file, + int pieceSize, long fileLength, Set available) { + this.eventBus = eventBus this.meB64 = meB64 this.pieces = pieces this.endpoint = endpoint @@ -46,6 +53,7 @@ class DownloadSession { this.file = file this.pieceSize = pieceSize this.fileLength = fileLength + this.available = available try { digest = MessageDigest.getInstance("SHA-256") } catch (NoSuchAlgorithmException impossible) { @@ -63,7 +71,11 @@ class DownloadSession { OutputStream os = endpoint.getOutputStream() InputStream is = endpoint.getInputStream() - int piece = pieces.claim() + int piece + if (available.isEmpty()) + piece = pieces.claim() + else + piece = pieces.claim(available) if (piece == -1) return false boolean unclaim = true @@ -79,45 +91,79 @@ class DownloadSession { try { os.write("GET $root\r\n".getBytes(StandardCharsets.US_ASCII)) os.write("Range: $start-$end\r\n".getBytes(StandardCharsets.US_ASCII)) - os.write("X-Persona: $meB64\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("X-Persona: $meB64\r\n".getBytes(StandardCharsets.US_ASCII)) + String xHave = DataUtil.encodeXHave(pieces.getDownloaded(), pieces.nPieces) + os.write("X-Have: $xHave\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) os.flush() - String code = readTillRN(is) - if (code.startsWith("404 ")) { + String codeString = readTillRN(is) + int space = codeString.indexOf(' ') + if (space > 0) + codeString = codeString.substring(0, space) + + int code = Integer.parseInt(codeString.trim()) + + if (code == 404) { log.warning("file not found") endpoint.close() return false } - if (code.startsWith("416 ")) { - log.warning("range $start-$end cannot be satisfied") - return // leave endpoint open - } - - if (!code.startsWith("200 ")) { + if (!(code == 200 || code == 416)) { log.warning("unknown code $code") endpoint.close() return false } // parse all headers - Set headers = new HashSet<>() + Map headers = new HashMap<>() String header - while((header = readTillRN(is)) != "" && headers.size() < Constants.MAX_HEADERS) - headers.add(header) - - long receivedStart = -1 - long receivedEnd = -1 - for (String receivedHeader : headers) { - def group = (receivedHeader =~ /^Content-Range: (\d+)-(\d+)$/) - if (group.size() != 1) { - log.info("ignoring header $receivedHeader") - continue - } - - receivedStart = Long.parseLong(group[0][1]) - receivedEnd = Long.parseLong(group[0][2]) + while((header = readTillRN(is)) != "" && headers.size() < Constants.MAX_HEADERS) { + int colon = header.indexOf(':') + if (colon == -1 || colon == header.length() - 1) + throw new IOException("invalid header $header") + String key = header.substring(0, colon) + String value = header.substring(colon + 1) + headers[key] = value.trim() } + // prase X-Alt if present + if (headers.containsKey("X-Alt")) { + headers["X-Alt"].split(",").each { + if (it.length() > 0) { + byte [] raw = Base64.decode(it) + Persona source = new Persona(new ByteArrayInputStream(raw)) + eventBus.publish(new SourceDiscoveredEvent(infoHash : infoHash, source : source)) + } + } + } + + // parse X-Have if present + if (headers.containsKey("X-Have")) { + DataUtil.decodeXHave(headers["X-Have"]).each { + available.add(it) + } + if (!available.contains(piece)) + return true // try again next time + } else { + if (code != 200) + throw new IOException("Code $code but no X-Have") + available.clear() + } + + if (code != 200) + return true + + String range = headers["Content-Range"] + if (range == null) + throw new IOException("Code 200 but no Content-Range") + + def group = (range =~ /^(\d+)-(\d+)$/) + if (group.size() != 1) + throw new IOException("invalid Content-Range header $range") + + long receivedStart = Long.parseLong(group[0][1]) + long receivedEnd = Long.parseLong(group[0][2]) + if (receivedStart != start || receivedEnd != end) { log.warning("We don't support mismatching ranges yet") endpoint.close() diff --git a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy index 4458b67b..6e80ebba 100644 --- a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy @@ -60,7 +60,7 @@ public class Downloader { public Downloader(EventBus eventBus, DownloadManager downloadManager, Persona me, File file, long length, InfoHash infoHash, int pieceSizePow2, I2PConnector connector, Set destinations, - File incompletes) { + File incompletes, Pieces pieces) { this.eventBus = eventBus this.me = me this.downloadManager = downloadManager @@ -73,15 +73,8 @@ public class Downloader { this.incompleteFile = new File(incompletes, file.getName()+".part") this.pieceSizePow2 = pieceSizePow2 this.pieceSize = 1 << pieceSizePow2 - - int nPieces - if (length % pieceSize == 0) - nPieces = length / pieceSize - else - nPieces = length / pieceSize + 1 - this.nPieces = nPieces - - pieces = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO) + this.pieces = pieces + this.nPieces = pieces.nPieces } public synchronized InfoHash getInfoHash() { @@ -187,6 +180,7 @@ public class Downloader { piecesFile.delete() } incompleteFile.delete() + pieces.clearAll() } void stop() { @@ -221,12 +215,21 @@ public class Downloader { } } + void addSource(Destination d) { + if (activeWorkers.containsKey(d)) + return + DownloadWorker newWorker = new DownloadWorker(d) + activeWorkers.put(d, newWorker) + executorService.submit(newWorker) + } + class DownloadWorker implements Runnable { private final Destination destination private volatile WorkerState currentState private volatile Thread downloadThread private Endpoint endpoint private volatile DownloadSession currentSession + private final Set available = new HashSet<>() DownloadWorker(Destination destination) { this.destination = destination @@ -247,7 +250,8 @@ public class Downloader { currentState = WorkerState.DOWNLOADING boolean requestPerformed while(!pieces.isComplete()) { - currentSession = new DownloadSession(me.toBase64(), pieces, getInfoHash(), endpoint, incompleteFile, pieceSize, length) + currentSession = new DownloadSession(eventBus, me.toBase64(), pieces, getInfoHash(), + endpoint, incompleteFile, pieceSize, length, available) requestPerformed = currentSession.request() if (!requestPerformed) break diff --git a/core/src/main/groovy/com/muwire/core/download/Pieces.groovy b/core/src/main/groovy/com/muwire/core/download/Pieces.groovy index 4028a6f7..1d947d76 100644 --- a/core/src/main/groovy/com/muwire/core/download/Pieces.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Pieces.groovy @@ -38,6 +38,18 @@ class Pieces { } } + synchronized int claim(Set available) { + for (int i = claimed.nextSetBit(0); i >= 0; i = claimed.nextSetBit(i+1)) + available.remove(i) + if (available.isEmpty()) + return -1 + List toList = available.toList() + Collections.shuffle(toList) + int rv = toList[0] + claimed.set(rv) + rv + } + synchronized def getDownloaded() { def rv = [] for (int i = done.nextSetBit(0); i >= 0; i = done.nextSetBit(i+1)) { @@ -62,4 +74,13 @@ class Pieces { synchronized int donePieces() { done.cardinality() } + + synchronized boolean isDownloaded(int piece) { + done.get(piece) + } + + synchronized void clearAll() { + done.clear() + claimed.clear() + } } diff --git a/core/src/main/groovy/com/muwire/core/download/SourceDiscoveredEvent.groovy b/core/src/main/groovy/com/muwire/core/download/SourceDiscoveredEvent.groovy new file mode 100644 index 00000000..47bf88ec --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/download/SourceDiscoveredEvent.groovy @@ -0,0 +1,10 @@ +package com.muwire.core.download + +import com.muwire.core.Event +import com.muwire.core.InfoHash +import com.muwire.core.Persona + +class SourceDiscoveredEvent extends Event { + InfoHash infoHash + Persona source +} diff --git a/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy b/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy new file mode 100644 index 00000000..e6e0faf7 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy @@ -0,0 +1,28 @@ +package com.muwire.core.mesh + +import com.muwire.core.InfoHash +import com.muwire.core.Persona +import com.muwire.core.download.Pieces + +import net.i2p.data.Destination +import net.i2p.util.ConcurrentHashSet + +class Mesh { + private final InfoHash infoHash + private final Set sources = new ConcurrentHashSet<>() + private final Pieces pieces + + Mesh(InfoHash infoHash, Pieces pieces) { + this.infoHash = infoHash + this.pieces = pieces + } + + Set getRandom(int n, Persona exclude) { + List tmp = new ArrayList<>(sources) + tmp.remove(exclude) + Collections.shuffle(tmp) + if (tmp.size() < n) + return tmp + tmp[0..n-1] + } +} diff --git a/core/src/main/groovy/com/muwire/core/mesh/MeshManager.groovy b/core/src/main/groovy/com/muwire/core/mesh/MeshManager.groovy new file mode 100644 index 00000000..67aadfdd --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/mesh/MeshManager.groovy @@ -0,0 +1,43 @@ +package com.muwire.core.mesh + +import com.muwire.core.Constants +import com.muwire.core.InfoHash +import com.muwire.core.download.Pieces +import com.muwire.core.download.SourceDiscoveredEvent +import com.muwire.core.files.FileManager + +class MeshManager { + + private final Map meshes = Collections.synchronizedMap(new HashMap<>()) + private final FileManager fileManager + + MeshManager(FileManager fileManager) { + this.fileManager = fileManager + } + + Mesh get(InfoHash infoHash) { + meshes.get(infoHash) + } + + Mesh getOrCreate(InfoHash infoHash, int nPieces) { + synchronized(meshes) { + if (meshes.containsKey(infoHash)) + return meshes.get(infoHash) + Pieces pieces = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO) + if (fileManager.rootToFiles.containsKey(infoHash)) { + for (int i = 0; i < nPieces; i++) + pieces.markDownloaded(i) + } + Mesh rv = new Mesh(infoHash, pieces) + meshes.put(infoHash, rv) + return rv + } + } + + void onSourceDiscoveredEvent(SourceDiscoveredEvent e) { + Mesh mesh = meshes.get(e.infoHash) + if (mesh == null) + return + mesh.sources.add(e.source) + } +} diff --git a/core/src/main/groovy/com/muwire/core/upload/ContentRequest.groovy b/core/src/main/groovy/com/muwire/core/upload/ContentRequest.groovy index 45f03336..fa500f13 100644 --- a/core/src/main/groovy/com/muwire/core/upload/ContentRequest.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/ContentRequest.groovy @@ -2,4 +2,5 @@ package com.muwire.core.upload class ContentRequest extends Request { Range range + boolean have } diff --git a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy index 49da55d5..7928c730 100644 --- a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy @@ -5,33 +5,57 @@ import java.nio.channels.FileChannel import java.nio.charset.StandardCharsets import java.nio.file.Files import java.nio.file.StandardOpenOption +import java.util.stream.Collectors +import com.muwire.core.Persona import com.muwire.core.connection.Endpoint +import com.muwire.core.mesh.Mesh +import com.muwire.core.util.DataUtil + +import net.i2p.data.Destination class ContentUploader extends Uploader { private final File file private final ContentRequest request + private final Mesh mesh + private final int pieceSize - ContentUploader(File file, ContentRequest request, Endpoint endpoint) { + ContentUploader(File file, ContentRequest request, Endpoint endpoint, Mesh mesh, int pieceSize) { super(endpoint) this.file = file this.request = request + this.mesh = mesh + this.pieceSize = pieceSize } @Override void respond() { OutputStream os = endpoint.getOutputStream() Range range = request.getRange() - if (range.start >= file.length() || range.end >= file.length()) { - os.write("416 Range Not Satisfiable\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + boolean satisfiable = true + final long length = file.length() + if (range.start >= length || range.end >= length) + satisfiable = false + if (satisfiable) { + int startPiece = range.start / (0x1 << pieceSize) + int endPiece = range.end / (0x1 << pieceSize) + for (int i = startPiece; i <= endPiece; i++) + satisfiable &= mesh.pieces.isDownloaded(i) + } + if (!satisfiable) { + os.write("416 Range Not Satisfiable\r\n".getBytes(StandardCharsets.US_ASCII)) + writeMesh(request.downloader) + os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) os.flush() return } os.write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII)) - os.write("Content-Range: $range.start-$range.end\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) - + os.write("Content-Range: $range.start-$range.end\r\n".getBytes(StandardCharsets.US_ASCII)) + writeMesh(request.downloader) + os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) + FileChannel channel try { channel = Files.newByteChannel(file.toPath(), EnumSet.of(StandardOpenOption.READ)) @@ -50,6 +74,17 @@ class ContentUploader extends Uploader { endpoint.getOutputStream().flush() } } + + private void writeMesh(Persona toExclude) { + String xHave = DataUtil.encodeXHave(mesh.pieces.getDownloaded(), mesh.pieces.nPieces) + endpoint.getOutputStream().write("X-Have: $xHave\r\n".getBytes(StandardCharsets.US_ASCII)) + + Set sources = mesh.getRandom(3, toExclude) + if (!sources.isEmpty()) { + String xAlts = sources.stream().map({ it.toBase64() }).collect(Collectors.joining(",")) + endpoint.getOutputStream().write("X-Alt: $xAlts\r\n".getBytes(StandardCharsets.US_ASCII)) + } + } @Override public String getName() { diff --git a/core/src/main/groovy/com/muwire/core/upload/Request.groovy b/core/src/main/groovy/com/muwire/core/upload/Request.groovy index a38b7d9d..a27acd59 100644 --- a/core/src/main/groovy/com/muwire/core/upload/Request.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/Request.groovy @@ -5,6 +5,7 @@ import java.nio.charset.StandardCharsets import com.muwire.core.Constants import com.muwire.core.InfoHash import com.muwire.core.Persona +import com.muwire.core.util.DataUtil import groovy.util.logging.Log import net.i2p.data.Base64 @@ -48,8 +49,14 @@ class Request { def decoded = Base64.decode(encoded) downloader = new Persona(new ByteArrayInputStream(decoded)) } + + boolean have = false + if (headers.containsKey("X-Have")) { + def encoded = headers["X-Have"].trim() + have = DataUtil.decodeXHave(encoded).size() > 0 + } new ContentRequest( infoHash : infoHash, range : new Range(start, end), - headers : headers, downloader : downloader) + headers : headers, downloader : downloader, have : have) } static Request parseHashListRequest(InfoHash infoHash, InputStream is) throws IOException { diff --git a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy index 6b7b76ba..83f90d57 100644 --- a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy @@ -6,7 +6,12 @@ import com.muwire.core.EventBus import com.muwire.core.InfoHash import com.muwire.core.SharedFile import com.muwire.core.connection.Endpoint +import com.muwire.core.download.DownloadManager +import com.muwire.core.download.Downloader +import com.muwire.core.download.SourceDiscoveredEvent import com.muwire.core.files.FileManager +import com.muwire.core.mesh.Mesh +import com.muwire.core.mesh.MeshManager import groovy.util.logging.Log import net.i2p.data.Base64 @@ -15,12 +20,17 @@ import net.i2p.data.Base64 public class UploadManager { private final EventBus eventBus private final FileManager fileManager + private final MeshManager meshManager + private final DownloadManager downloadManager public UploadManager() {} - public UploadManager(EventBus eventBus, FileManager fileManager) { + public UploadManager(EventBus eventBus, FileManager fileManager, + MeshManager meshManager, DownloadManager downloadManager) { this.eventBus = eventBus this.fileManager = fileManager + this.meshManager = meshManager + this.downloadManager = downloadManager } public void processGET(Endpoint e) throws IOException { @@ -44,8 +54,10 @@ public class UploadManager { log.info("Responding to upload request for root $infoHashString") byte [] infoHashRoot = Base64.decode(infoHashString) + InfoHash infoHash = new InfoHash(infoHashRoot) Set sharedFiles = fileManager.getSharedFiles(infoHashRoot) - if (sharedFiles == null || sharedFiles.isEmpty()) { + Downloader downloader = downloadManager.downloaders.get(infoHash) + if (downloader == null && (sharedFiles == null || sharedFiles.isEmpty())) { log.info "file not found" e.getOutputStream().write("404 File Not Found\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) e.getOutputStream().flush() @@ -61,13 +73,31 @@ public class UploadManager { return } - Request request = Request.parseContentRequest(new InfoHash(infoHashRoot), e.getInputStream()) + ContentRequest request = Request.parseContentRequest(infoHash, e.getInputStream()) if (request.downloader != null && request.downloader.destination != e.destination) { log.info("Downloader persona doesn't match their destination") e.close() return } - Uploader uploader = new ContentUploader(sharedFiles.iterator().next().file, request, e) + + if (request.have) + eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : request.downloader)) + + Mesh mesh + File file + int pieceSize + if (downloader != null) { + mesh = meshManager.get(infoHash) + file = downloader.incompleteFile + pieceSize = downloader.pieceSizePow2 + } else { + SharedFile sharedFile = sharedFiles.iterator().next(); + mesh = meshManager.getOrCreate(request.infoHash, sharedFile.NPieces) + file = sharedFile.file + pieceSize = sharedFile.pieceSize + } + + Uploader uploader = new ContentUploader(file, request, e, mesh, pieceSize) eventBus.publish(new UploadEvent(uploader : uploader)) try { uploader.respond() @@ -85,8 +115,10 @@ public class UploadManager { log.info("Responding to hashlist request for root $infoHashString") byte [] infoHashRoot = Base64.decode(infoHashString) + InfoHash infoHash = new InfoHash(infoHashRoot) + Downloader downloader = downloadManager.downloaders.get(infoHash) Set sharedFiles = fileManager.getSharedFiles(infoHashRoot) - if (sharedFiles == null || sharedFiles.isEmpty()) { + if (downloader == null && (sharedFiles == null || sharedFiles.isEmpty())) { log.info "file not found" e.getOutputStream().write("404 File Not Found\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) e.getOutputStream().flush() @@ -102,13 +134,30 @@ public class UploadManager { return } - Request request = Request.parseHashListRequest(new InfoHash(infoHashRoot), e.getInputStream()) + Request request = Request.parseHashListRequest(infoHash, e.getInputStream()) if (request.downloader != null && request.downloader.destination != e.destination) { log.info("Downloader persona doesn't match their destination") e.close() return } - Uploader uploader = new HashListUploader(e, sharedFiles.iterator().next().infoHash, request) + + InfoHash fullInfoHash + if (downloader == null) { + fullInfoHash = sharedFiles.iterator().next().infoHash + } else { + byte [] hashList = downloader.getInfoHash().getHashList() + if (hashList != null && hashList.length > 0) + fullInfoHash = downloader.getInfoHash() + else { + log.info("infohash not found in downloader") + e.getOutputStream().write("404 File Not Found\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + e.getOutputStream().flush() + e.close() + return + } + } + + Uploader uploader = new HashListUploader(e, fullInfoHash, request) eventBus.publish(new UploadEvent(uploader : uploader)) try { uploader.respond() @@ -130,8 +179,10 @@ public class UploadManager { log.info("Responding to upload request for root $infoHashString") infoHashRoot = Base64.decode(infoHashString) + infoHash = new InfoHash(infoHashRoot) sharedFiles = fileManager.getSharedFiles(infoHashRoot) - if (sharedFiles == null || sharedFiles.isEmpty()) { + downloader = downloadManager.downloaders.get(infoHash) + if (downloader == null && (sharedFiles == null || sharedFiles.isEmpty())) { log.info "file not found" e.getOutputStream().write("404 File Not Found\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) e.getOutputStream().flush() @@ -153,7 +204,25 @@ public class UploadManager { e.close() return } - uploader = new ContentUploader(sharedFiles.iterator().next().file, request, e) + + if (request.have) + eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : request.downloader)) + + Mesh mesh + File file + int pieceSize + if (downloader != null) { + mesh = meshManager.get(infoHash) + file = downloader.incompleteFile + pieceSize = downloader.pieceSizePow2 + } else { + SharedFile sharedFile = sharedFiles.iterator().next(); + mesh = meshManager.getOrCreate(request.infoHash, sharedFile.NPieces) + file = sharedFile.file + pieceSize = sharedFile.pieceSize + } + + uploader = new ContentUploader(file, request, e, mesh, pieceSize) eventBus.publish(new UploadEvent(uploader : uploader)) try { uploader.respond() diff --git a/core/src/main/groovy/com/muwire/core/util/DataUtil.groovy b/core/src/main/groovy/com/muwire/core/util/DataUtil.groovy index 25bec593..238fa871 100644 --- a/core/src/main/groovy/com/muwire/core/util/DataUtil.groovy +++ b/core/src/main/groovy/com/muwire/core/util/DataUtil.groovy @@ -4,6 +4,8 @@ import java.nio.charset.StandardCharsets import com.muwire.core.Constants +import net.i2p.data.Base64 + class DataUtil { private final static int MAX_SHORT = (0x1 << 16) - 1 @@ -79,4 +81,32 @@ class DataUtil { } new String(baos.toByteArray(), StandardCharsets.US_ASCII) } + + public static String encodeXHave(List pieces, int totalPieces) { + int bytes = totalPieces / 8 + if (totalPieces % 8 != 0) + bytes++ + byte[] raw = new byte[bytes] + pieces.each { + int byteIdx = it / 8 + int offset = it % 8 + int mask = 0x80 >>> offset + raw[byteIdx] |= mask + } + Base64.encode(raw) + } + + public static List decodeXHave(String xHave) { + byte [] availablePieces = Base64.decode(xHave) + List available = new ArrayList<>() + availablePieces.eachWithIndex {b, i -> + for (int j = 0; j < 8 ; j++) { + byte mask = 0x80 >>> j + if ((b & mask) == mask) { + available.add(i * 8 + j) + } + } + } + available + } } diff --git a/core/src/main/java/com/muwire/core/SharedFile.java b/core/src/main/java/com/muwire/core/SharedFile.java index 19c66c85..81072834 100644 --- a/core/src/main/java/com/muwire/core/SharedFile.java +++ b/core/src/main/java/com/muwire/core/SharedFile.java @@ -26,6 +26,15 @@ public class SharedFile { return pieceSize; } + public int getNPieces() { + long length = file.length(); + int rawPieceSize = 0x1 << pieceSize; + int rv = (int) (length / rawPieceSize); + if (length % pieceSize != 0) + rv++; + return rv; + } + @Override public int hashCode() { return file.hashCode() ^ infoHash.hashCode(); diff --git a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy index 9480658b..aad57ebf 100644 --- a/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy +++ b/core/src/test/groovy/com/muwire/core/download/DownloadSessionTest.groovy @@ -1,17 +1,26 @@ package com.muwire.core.download +import static org.junit.Assert.fail + import org.junit.After +import org.junit.Before import org.junit.Test +import com.muwire.core.EventBus import com.muwire.core.InfoHash +import com.muwire.core.Persona +import com.muwire.core.Personas import com.muwire.core.connection.Endpoint import com.muwire.core.files.FileHasher import static com.muwire.core.util.DataUtil.readTillRN +import static com.muwire.core.util.DataUtil.encodeXHave import net.i2p.data.Base64 +import net.i2p.util.ConcurrentHashSet class DownloadSessionTest { + private EventBus eventBus private File source, target private InfoHash infoHash private Endpoint endpoint @@ -24,6 +33,16 @@ class DownloadSessionTest { private InputStream fromDownloader, fromUploader private OutputStream toDownloader, toUploader + private volatile boolean performed + private Set available = new ConcurrentHashSet<>() + private volatile IOException thrown + + + @Before + public void setUp() { + eventBus = new EventBus() + } + private void initSession(int size, def claimedPieces = []) { Random r = new Random() byte [] content = new byte[size] @@ -56,12 +75,20 @@ class DownloadSessionTest { toUploader = new PipedOutputStream(fromDownloader) endpoint = new Endpoint(null, fromUploader, toUploader, null) - session = new DownloadSession("",pieces, infoHash, endpoint, target, pieceSize, size) - downloadThread = new Thread( { session.request() } as Runnable) + session = new DownloadSession(eventBus, "",pieces, infoHash, endpoint, target, pieceSize, size, available) + downloadThread = new Thread( { perform() } as Runnable) downloadThread.setDaemon(true) downloadThread.start() } + private void perform() { + try { + performed = session.request() + } catch (IOException e) { + thrown = e + } + } + @After public void teardown() { source?.delete() @@ -76,6 +103,7 @@ class DownloadSessionTest { assert "GET $rootBase64" == readTillRN(fromDownloader) assert "Range: 0-19" == readTillRN(fromDownloader) readTillRN(fromDownloader) + readTillRN(fromDownloader) assert "" == readTillRN(fromDownloader) toDownloader.write("200 OK\r\n".bytes) @@ -87,6 +115,9 @@ class DownloadSessionTest { assert pieces.isComplete() assert target.bytes == source.bytes + assert performed + assert available.isEmpty() + assert thrown == null } @Test @@ -98,6 +129,7 @@ class DownloadSessionTest { assert "GET $rootBase64" == readTillRN(fromDownloader) readTillRN(fromDownloader) readTillRN(fromDownloader) + readTillRN(fromDownloader) assert "" == readTillRN(fromDownloader) toDownloader.write("200 OK\r\n".bytes) @@ -108,6 +140,9 @@ class DownloadSessionTest { Thread.sleep(150) assert pieces.isComplete() assert target.bytes == source.bytes + assert performed + assert available.isEmpty() + assert thrown == null } @Test @@ -125,6 +160,7 @@ class DownloadSessionTest { assert (start == 0 && end == ((1 << pieceSize) - 1)) || (start == (1 << pieceSize) && end == (1 << pieceSize)) + readTillRN(fromDownloader) readTillRN(fromDownloader) assert "" == readTillRN(fromDownloader) @@ -138,6 +174,9 @@ class DownloadSessionTest { Thread.sleep(150) assert !pieces.isComplete() assert 1 == pieces.donePieces() + assert performed + assert available.isEmpty() + assert thrown == null } @Test @@ -145,7 +184,10 @@ class DownloadSessionTest { initSession(20, [0]) long now = System.currentTimeMillis() downloadThread.join(100) - assert 100 > (System.currentTimeMillis() - now) + assert 100 >= (System.currentTimeMillis() - now) + assert !performed + assert available.isEmpty() + assert thrown == null } @Test @@ -164,4 +206,128 @@ class DownloadSessionTest { assert pieces.claimed.get(0) assert start == 0 && end == (1 << pieceSize) - 1 } + + @Test + public void test416NoHave() { + initSession(20) + readAllHeaders(fromDownloader) + + toDownloader.write("416 don't have it\r\n\r\n".bytes) + toDownloader.flush() + Thread.sleep(150) + assert !performed + assert available.isEmpty() + assert thrown != null + } + + @Test + public void test416Have() { + initSession(20) + readAllHeaders(fromDownloader) + + toDownloader.write("416 don't have it\r\n".bytes) + toDownloader.write("X-Have: ${encodeXHave([0], 1)}\r\n\r\n".bytes) + toDownloader.flush() + + Thread.sleep(150) + assert performed + assert available.contains(0) + assert thrown == null + } + + @Test + public void test416Have2Pieces() { + int pieceSize = FileHasher.getPieceSize(1) + int size = (1 << pieceSize) + 1 + initSession(size) + readAllHeaders(fromDownloader) + + toDownloader.write("416 don't have it\r\n".bytes) + toDownloader.write("X-Have: ${encodeXHave([1], 2)}\r\n\r\n".bytes) + toDownloader.flush() + + Thread.sleep(150) + assert performed + assert available.contains(1) + assert thrown == null + } + + @Test + public void test200TwoPieces1Available() { + int pieceSize = FileHasher.getPieceSize(1) + int size = (1 << pieceSize) * 9 + 1 + initSession(size) + + Set headers = readAllHeaders(fromDownloader) + def matcher = null + headers.each { + if (it.startsWith("Range")) + matcher = (it =~ /^Range: (\d+)-(\d+)$/) + } + assert matcher.groupCount() > 0 + int start = Integer.parseInt(matcher[0][1]) + int end = Integer.parseInt(matcher[0][2]) + + if (start == 0) + fail("inconlcusive") + + toDownloader.write("416 don't have it \r\n".bytes) + toDownloader.write("X-Have: ${encodeXHave([0],2)}\r\n\r\n".bytes) + toDownloader.flush() + downloadThread.join() + + assert performed + performed = false + assert available.contains(0) + assert thrown == null + + // request same session + downloadThread = new Thread( { perform() } as Runnable) + downloadThread.setDaemon(true) + downloadThread.start() + + Thread.sleep(150) + + headers = readAllHeaders(fromDownloader) + matcher = null + headers.each { + if (it.startsWith("Range")) + matcher = (it =~ /^Range: (\d+)-(\d+)$/) + } + assert matcher.groupCount() > 0 + start = Integer.parseInt(matcher[0][1]) + end = Integer.parseInt(matcher[0][2]) + assert start == 0 + } + + @Test + public void testXAlt() throws Exception { + Personas personas = new Personas() + def sources = [] + def listener = new Object() { + public void onSourceDiscoveredEvent(SourceDiscoveredEvent e) { + sources << e.source + } + } + eventBus.register(SourceDiscoveredEvent.class, listener) + + initSession(20) + readAllHeaders(fromDownloader) + toDownloader.write("416 don't have it\r\n".bytes) + toDownloader.write("X-Alt: ${personas.persona1.toBase64()},${personas.persona2.toBase64()}\r\n\r\n".bytes) + toDownloader.flush() + + Thread.sleep(150) + assert sources.contains(personas.persona1) + assert sources.contains(personas.persona2) + assert 2 == sources.size() + } + + private static Set readAllHeaders(InputStream is) { + Set rv = new HashSet<>() + String header + while((header = readTillRN(is)) != "") + rv.add(header) + rv + } } diff --git a/core/src/test/groovy/com/muwire/core/download/PiecesTest.groovy b/core/src/test/groovy/com/muwire/core/download/PiecesTest.groovy index 09082103..8fa7bfd4 100644 --- a/core/src/test/groovy/com/muwire/core/download/PiecesTest.groovy +++ b/core/src/test/groovy/com/muwire/core/download/PiecesTest.groovy @@ -34,4 +34,19 @@ class PiecesTest { pieces.markDownloaded(piece2) assert pieces.isComplete() } + + @Test + public void testClaimAvailable() { + pieces = new Pieces(2) + int claimed = pieces.claim([0].toSet()) + assert claimed == 0 + assert -1 == pieces.claim([0].toSet()) + } + + @Test + public void testClaimNoneAvailable() { + pieces = new Pieces(20) + int claimed = pieces.claim() + assert -1 == pieces.claim([claimed].toSet()) + } }