download side of oob hashlist

This commit is contained in:
Zlatin Balevsky
2019-06-12 22:13:16 +01:00
parent 04ceaba514
commit e389090b7e
5 changed files with 116 additions and 9 deletions

View File

@@ -106,7 +106,7 @@ class DownloadSession {
if (!code.startsWith("200 ")) {
log.warning("unknown code $code")
endpoint.close()
return
return false
}
// parse all headers
@@ -131,7 +131,7 @@ class DownloadSession {
if (receivedStart != start || receivedEnd != end) {
log.warning("We don't support mismatching ranges yet")
endpoint.close()
return
return false
}
// start the download

View File

@@ -20,8 +20,8 @@ import net.i2p.data.Destination
@Log
public class Downloader {
public enum DownloadState { CONNECTING, DOWNLOADING, FAILED, CANCELLED, FINISHED }
private enum WorkerState { CONNECTING, DOWNLOADING, FINISHED}
public enum DownloadState { CONNECTING, HASHLIST, DOWNLOADING, FAILED, CANCELLED, FINISHED }
private enum WorkerState { CONNECTING, HASHLIST, DOWNLOADING, FINISHED}
private static final ExecutorService executorService = Executors.newCachedThreadPool({r ->
Thread rv = new Thread(r)
@@ -36,7 +36,7 @@ public class Downloader {
private final File file
private final Pieces downloaded, claimed
private final long length
private final InfoHash infoHash
private InfoHash infoHash
private final int pieceSize
private final I2PConnector connector
private final Set<Destination> destinations
@@ -76,6 +76,14 @@ public class Downloader {
claimed = new Pieces(nPieces)
}
private synchronized InfoHash getInfoHash() {
infoHash
}
private synchronized void setInfoHash(InfoHash infoHash) {
this.infoHash = infoHash
}
void download() {
readPieces()
destinations.each {
@@ -145,6 +153,17 @@ public class Downloader {
if (oneDownloading)
return DownloadState.DOWNLOADING
// at least one is requesting hashlist
boolean oneHashlist = false
activeWorkers.values().each {
if (it.currentState == WorkerState.HASHLIST) {
oneHashlist = true
return
}
}
if (oneHashlist)
return DownloadState.HASHLIST
return DownloadState.CONNECTING
}
@@ -198,10 +217,16 @@ public class Downloader {
Endpoint endpoint = null
try {
endpoint = connector.connect(destination)
while(getInfoHash().hashList == null) {
currentState = WorkerState.HASHLIST
HashListSession session = new HashListSession(me.toBase64(), infoHash, endpoint)
InfoHash received = session.request()
setInfoHash(received)
}
currentState = WorkerState.DOWNLOADING
boolean requestPerformed
while(!downloaded.isComplete()) {
currentSession = new DownloadSession(me.toBase64(), downloaded, claimed, infoHash, endpoint, file, pieceSize, length)
currentSession = new DownloadSession(me.toBase64(), downloaded, claimed, getInfoHash(), endpoint, file, pieceSize, length)
requestPerformed = currentSession.request()
if (!requestPerformed)
break
@@ -214,7 +239,7 @@ public class Downloader {
if (downloaded.isComplete() && !eventFired) {
piecesFile.delete()
eventFired = true
eventBus.publish(new FileDownloadedEvent(downloadedFile : new DownloadedFile(file, infoHash, pieceSizePow2, Collections.emptySet())),
eventBus.publish(new FileDownloadedEvent(downloadedFile : new DownloadedFile(file, getInfoHash(), pieceSizePow2, Collections.emptySet())),
downloader : Downloader.this)
}
endpoint?.close()

View File

@@ -0,0 +1,82 @@
package com.muwire.core.download
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import java.security.NoSuchAlgorithmException
import com.muwire.core.Constants
import com.muwire.core.InfoHash
import com.muwire.core.connection.Endpoint
import groovy.util.logging.Log
import static com.muwire.core.util.DataUtil.readTillRN
import net.i2p.data.Base64
@Log
class HashListSession {
private final String meB64
private final InfoHash infoHash
private final Endpoint endpoint
HashListSession(String meB64, InfoHash infoHash, Endpoint endpoint) {
this.meB64 = meB64
this.infoHash = infoHash
this.endpoint = endpoint
}
InfoHash request() throws IOException {
InputStream is = endpoint.getInputStream()
OutputStream os = endpoint.getOutputStream()
String root = Base64.encode(infoHash.getRoot())
os.write("HASHLIST $root\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("X-Persona: $meB64\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
os.flush()
String code = readTillRN(is)
if (!code.startsWith("200"))
throw new IOException("unknown code $code")
// parse all headers
Set<String> headers = new HashSet<>()
String header
while((header = readTillRN(is)) != "" && headers.size() < Constants.MAX_HEADERS)
headers.add(header)
long receivedStart = -1
long receivedEnd = -1
for (String receivedHeader : headers) {
def group = (receivedHeader =~ /^Content-Range: (\d+)-(\d+)$/)
if (group.size() != 1) {
log.info("ignoring header $receivedHeader")
continue
}
receivedStart = Long.parseLong(group[0][1])
receivedEnd = Long.parseLong(group[0][2])
}
if (receivedStart != 0)
throw new IOException("hashlist started at $receivedStart")
byte[] hashList = new byte[receivedEnd]
ByteBuffer hashListBuf = ByteBuffer.wrap(hashList)
byte[] tmp = new byte[0x1 << 13]
while(hashListBuf.hasRemaining()) {
if (hashListBuf.remaining() > tmp.length)
tmp = new byte[hashListBuf.remaining()]
int read = is.read(tmp)
if (read == -1)
throw new IOException()
hashListBuf.put(tmp, 0, read)
}
InfoHash received = InfoHash.fromHashList(hashList)
if (received.getRoot() != infoHash.getRoot())
throw new IOException("fetched list doesn't match root")
received
}
}

View File

@@ -19,7 +19,7 @@ class HashListUploader extends Uploader {
void respond() {
OutputStream os = endpoint.getOutputStream()
os.write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("Content-Range: 0-${mapped.remaining()}")
os.write("Content-Range: 0-${mapped.remaining()}\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
byte[]tmp = new byte[0x1 << 13]
while(mapped.hasRemaining()) {

View File

@@ -108,7 +108,7 @@ public class UploadManager {
e.close()
return
}
Uploader uploader = new HashListUploader(e, sharedFiles.iterator().next().infoHash, request, request)
Uploader uploader = new HashListUploader(e, sharedFiles.iterator().next().infoHash, request)
eventBus.publish(new UploadEvent(uploader : uploader))
try {
uploader.respond()