diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 73509f47..0c00a902 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -217,7 +217,7 @@ public class Core { eventBus.register(SourceDiscoveredEvent.class, downloadManager) log.info("initializing upload manager") - UploadManager uploadManager = new UploadManager(eventBus, fileManager) + UploadManager uploadManager = new UploadManager(eventBus, fileManager, meshManager) log.info("initializing connection establisher") connectionEstablisher = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache) diff --git a/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy b/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy index d18c75fb..2fd5546f 100644 --- a/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy +++ b/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy @@ -16,4 +16,13 @@ class Mesh { this.infoHash = infoHash this.pieces = pieces } + + Set getRandom(int n, Destination exclude) { + List tmp = new ArrayList<>(sources) + tmp.remove(exclude) + Collections.shuffle(tmp) + if (tmp.size() < n) + return tmp + tmp[0..n-1] + } } diff --git a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy index 49da55d5..c4d433ee 100644 --- a/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/ContentUploader.groovy @@ -5,18 +5,25 @@ import java.nio.channels.FileChannel import java.nio.charset.StandardCharsets import java.nio.file.Files import java.nio.file.StandardOpenOption +import java.util.stream.Collectors import com.muwire.core.connection.Endpoint +import com.muwire.core.mesh.Mesh +import com.muwire.core.util.DataUtil + +import net.i2p.data.Destination class ContentUploader extends Uploader { private final File file private final ContentRequest request + private final Mesh mesh - ContentUploader(File file, ContentRequest request, Endpoint endpoint) { + ContentUploader(File file, ContentRequest request, Endpoint endpoint, Mesh mesh) { super(endpoint) this.file = file this.request = request + this.mesh = mesh } @Override @@ -24,14 +31,18 @@ class ContentUploader extends Uploader { OutputStream os = endpoint.getOutputStream() Range range = request.getRange() if (range.start >= file.length() || range.end >= file.length()) { - os.write("416 Range Not Satisfiable\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("416 Range Not Satisfiable\r\n".getBytes(StandardCharsets.US_ASCII)) + writeMesh() + os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) os.flush() return } os.write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII)) - os.write("Content-Range: $range.start-$range.end\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) - + os.write("Content-Range: $range.start-$range.end\r\n".getBytes(StandardCharsets.US_ASCII)) + writeMesh() + os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) + FileChannel channel try { channel = Files.newByteChannel(file.toPath(), EnumSet.of(StandardOpenOption.READ)) @@ -50,6 +61,15 @@ class ContentUploader extends Uploader { endpoint.getOutputStream().flush() } } + + private void writeMesh() { + String xHave = DataUtil.encodeXHave(mesh.pieces.getDownloaded(), mesh.pieces.nPieces) + endpoint.getOutputStream().write("X-Have: $xHave\r\n".getBytes(StandardCharsets.US_ASCII)) + + Set sources = mesh.getRandom(3, endpoint.destination) + String xAlts = sources.stream().map({ it.toBase64() }).collect(Collectors.joining(",")) + endpoint.getOutputStream().write("X-Alt: $xAlts\r\n".getBytes(StandardCharsets.US_ASCII)) + } @Override public String getName() { diff --git a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy index 3045ce08..b18a717d 100644 --- a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy @@ -8,6 +8,8 @@ import com.muwire.core.SharedFile import com.muwire.core.connection.Endpoint import com.muwire.core.download.SourceDiscoveredEvent import com.muwire.core.files.FileManager +import com.muwire.core.mesh.Mesh +import com.muwire.core.mesh.MeshManager import groovy.util.logging.Log import net.i2p.data.Base64 @@ -16,12 +18,14 @@ import net.i2p.data.Base64 public class UploadManager { private final EventBus eventBus private final FileManager fileManager + private final MeshManager meshManager public UploadManager() {} - public UploadManager(EventBus eventBus, FileManager fileManager) { + public UploadManager(EventBus eventBus, FileManager fileManager, MeshManager meshManager) { this.eventBus = eventBus this.fileManager = fileManager + this.meshManager = meshManager } public void processGET(Endpoint e) throws IOException { @@ -72,7 +76,10 @@ public class UploadManager { if (request.have) eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : e.destination)) - Uploader uploader = new ContentUploader(sharedFiles.iterator().next().file, request, e) + SharedFile file = sharedFiles.iterator().next(); + Mesh mesh = meshManager.getOrCreate(request.infoHash, file.NPieces) + + Uploader uploader = new ContentUploader(file.file, request, e, mesh) eventBus.publish(new UploadEvent(uploader : uploader)) try { uploader.respond() @@ -162,7 +169,10 @@ public class UploadManager { if (request.have) eventBus.publish(new SourceDiscoveredEvent(infoHash : request.infoHash, source : e.destination)) - uploader = new ContentUploader(sharedFiles.iterator().next().file, request, e) + SharedFile file = sharedFiles.iterator().next(); + Mesh mesh = meshManager.getOrCreate(request.infoHash, file.NPieces) + + uploader = new ContentUploader(file.file, request, e, mesh) eventBus.publish(new UploadEvent(uploader : uploader)) try { uploader.respond() diff --git a/core/src/main/java/com/muwire/core/SharedFile.java b/core/src/main/java/com/muwire/core/SharedFile.java index 19c66c85..81072834 100644 --- a/core/src/main/java/com/muwire/core/SharedFile.java +++ b/core/src/main/java/com/muwire/core/SharedFile.java @@ -26,6 +26,15 @@ public class SharedFile { return pieceSize; } + public int getNPieces() { + long length = file.length(); + int rawPieceSize = 0x1 << pieceSize; + int rv = (int) (length / rawPieceSize); + if (length % pieceSize != 0) + rv++; + return rv; + } + @Override public int hashCode() { return file.hashCode() ^ infoHash.hashCode();