wip on parsing X-Have from uploader

This commit is contained in:
Zlatin Balevsky
2019-06-21 05:30:56 +01:00
parent 1249ad29e0
commit b23226e8c6

View File

@@ -30,6 +30,7 @@ class DownloadSession {
private final File file private final File file
private final int pieceSize private final int pieceSize
private final long fileLength private final long fileLength
private final Set<Integer> available
private final MessageDigest digest private final MessageDigest digest
private final LinkedList<Long> timestamps = new LinkedList<>() private final LinkedList<Long> timestamps = new LinkedList<>()
@@ -38,7 +39,7 @@ class DownloadSession {
private ByteBuffer mapped private ByteBuffer mapped
DownloadSession(String meB64, Pieces pieces, InfoHash infoHash, Endpoint endpoint, File file, DownloadSession(String meB64, Pieces pieces, InfoHash infoHash, Endpoint endpoint, File file,
int pieceSize, long fileLength) { int pieceSize, long fileLength, Set<Integer> available) {
this.meB64 = meB64 this.meB64 = meB64
this.pieces = pieces this.pieces = pieces
this.endpoint = endpoint this.endpoint = endpoint
@@ -46,6 +47,7 @@ class DownloadSession {
this.file = file this.file = file
this.pieceSize = pieceSize this.pieceSize = pieceSize
this.fileLength = fileLength this.fileLength = fileLength
this.available = available
try { try {
digest = MessageDigest.getInstance("SHA-256") digest = MessageDigest.getInstance("SHA-256")
} catch (NoSuchAlgorithmException impossible) { } catch (NoSuchAlgorithmException impossible) {
@@ -63,7 +65,11 @@ class DownloadSession {
OutputStream os = endpoint.getOutputStream() OutputStream os = endpoint.getOutputStream()
InputStream is = endpoint.getInputStream() InputStream is = endpoint.getInputStream()
int piece = pieces.claim() int piece
if (available.isEmpty())
piece = pieces.claim()
else
piece = pieces.claim(available)
if (piece == -1) if (piece == -1)
return false return false
boolean unclaim = true boolean unclaim = true
@@ -81,43 +87,60 @@ class DownloadSession {
os.write("Range: $start-$end\r\n".getBytes(StandardCharsets.US_ASCII)) os.write("Range: $start-$end\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("X-Persona: $meB64\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) os.write("X-Persona: $meB64\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
os.flush() os.flush()
String code = readTillRN(is) String codeString = readTillRN(is)
if (code.startsWith("404 ")) { codeString = codeString.substring(codeString.indexOf(' '))
int code = Integer.parseInt(codeString.trim())
if (code == 404) {
log.warning("file not found") log.warning("file not found")
endpoint.close() endpoint.close()
return false return false
} }
if (code.startsWith("416 ")) { if (!(code == 200 || code == 416)) {
log.warning("range $start-$end cannot be satisfied")
return // leave endpoint open
}
if (!code.startsWith("200 ")) {
log.warning("unknown code $code") log.warning("unknown code $code")
endpoint.close() endpoint.close()
return false return false
} }
// parse all headers // parse all headers
Set<String> headers = new HashSet<>() Map<String,String> headers = new HashMap<>()
String header String header
while((header = readTillRN(is)) != "" && headers.size() < Constants.MAX_HEADERS) while((header = readTillRN(is)) != "" && headers.size() < Constants.MAX_HEADERS) {
headers.add(header) int colon = header.indexOf(':')
if (colon == -1 || colon == header.length() - 1)
throw new IOException("invalid header $header")
String key = header.substring(0, colon)
String value = header.substring(colon + 1)
headers[key] = value
}
long receivedStart = -1 // parse X-Have if present
long receivedEnd = -1 if (headers.containsKey("X-Have")) {
for (String receivedHeader : headers) { updateAvailablePieces(headers["X-Have"])
def group = (receivedHeader =~ /^Content-Range: (\d+)-(\d+)$/) if (!available.contains(piece))
if (group.size() != 1) { return true // try again next time
log.info("ignoring header $receivedHeader") } else {
continue if (code != 200)
} throw new IOException("Code $code but no X-Have")
available.clear()
receivedStart = Long.parseLong(group[0][1])
receivedEnd = Long.parseLong(group[0][2])
} }
if (code != 200)
return true
String range = headers["Content-Range"]
if (range == null)
throw new IOException("Code 200 but no Content-Range")
def group = (range =~ /^ (\d+)-(\d+)$/)
if (group.size() != 1)
throw new IOException("invalid Content-Range header $range")
long receivedStart = Long.parseLong(group[0][1])
long receivedEnd = Long.parseLong(group[0][2])
if (receivedStart != start || receivedEnd != end) { if (receivedStart != start || receivedEnd != end) {
log.warning("We don't support mismatching ranges yet") log.warning("We don't support mismatching ranges yet")
endpoint.close() endpoint.close()
@@ -196,4 +219,15 @@ class DownloadSession {
totalRead += reads[idx] totalRead += reads[idx]
(int)(totalRead * 1000.0 / interval) (int)(totalRead * 1000.0 / interval)
} }
private void updateAvailablePieces(String xHave) {
byte [] availablePieces = Base64.decode(xHave)
availablePieces.eachWithIndex {b, i ->
for (int j = 0; j < 8 ; j++) {
int mask = 0x80 >> j
if ((b & mask) == mask)
available.add(i * 8 + j)
}
}
}
} }