Compare commits

..

1 Commits

Author SHA1 Message Date
Zlatin Balevsky
7718dc0821 force UTF8 for json serialization 2019-06-16 12:13:00 +01:00
18 changed files with 100 additions and 200 deletions

View File

@@ -34,7 +34,7 @@ class Cli {
Core core Core core
try { try {
core = new Core(props, home, "0.2.4") core = new Core(props, home, "0.2.1")
} catch (Exception bad) { } catch (Exception bad) {
bad.printStackTrace(System.out) bad.printStackTrace(System.out)
println "Failed to initialize core, exiting" println "Failed to initialize core, exiting"

View File

@@ -53,7 +53,7 @@ class CliDownloader {
Core core Core core
try { try {
core = new Core(props, home, "0.2.4") core = new Core(props, home, "0.2.1")
} catch (Exception bad) { } catch (Exception bad) {
bad.printStackTrace(System.out) bad.printStackTrace(System.out)
println "Failed to initialize core, exiting" println "Failed to initialize core, exiting"

View File

@@ -268,7 +268,7 @@ public class Core {
} }
} }
Core core = new Core(props, home, "0.2.4") Core core = new Core(props, home, "0.2.1")
core.startServices() core.startServices()
// ... at the end, sleep or execute script // ... at the end, sleep or execute script

View File

@@ -17,8 +17,6 @@ import com.muwire.core.upload.UploadManager
import com.muwire.core.search.InvalidSearchResultException import com.muwire.core.search.InvalidSearchResultException
import com.muwire.core.search.ResultsParser import com.muwire.core.search.ResultsParser
import com.muwire.core.search.SearchManager import com.muwire.core.search.SearchManager
import com.muwire.core.search.UIResultBatchEvent
import com.muwire.core.search.UIResultEvent
import com.muwire.core.search.UnexpectedResultsException import com.muwire.core.search.UnexpectedResultsException
import groovy.json.JsonOutput import groovy.json.JsonOutput
@@ -227,15 +225,13 @@ class ConnectionAcceptor {
if (sender.destination != e.getDestination()) if (sender.destination != e.getDestination())
throw new IOException("Sender destination mismatch expected $e.getDestination(), got $sender.destination") throw new IOException("Sender destination mismatch expected $e.getDestination(), got $sender.destination")
int nResults = dis.readUnsignedShort() int nResults = dis.readUnsignedShort()
UIResultEvent[] results = new UIResultEvent[nResults]
for (int i = 0; i < nResults; i++) { for (int i = 0; i < nResults; i++) {
int jsonSize = dis.readUnsignedShort() int jsonSize = dis.readUnsignedShort()
byte [] payload = new byte[jsonSize] byte [] payload = new byte[jsonSize]
dis.readFully(payload) dis.readFully(payload)
def json = slurper.parse(payload) def json = slurper.parse(payload)
results[i] = ResultsParser.parse(sender, resultsUUID, json) eventBus.publish(ResultsParser.parse(sender, resultsUUID, json))
} }
eventBus.publish(new UIResultBatchEvent(uuid: resultsUUID, results: results))
} catch (IOException | UnexpectedResultsException | InvalidSearchResultException bad) { } catch (IOException | UnexpectedResultsException | InvalidSearchResultException bad) {
log.log(Level.WARNING, "failed to process POST", bad) log.log(Level.WARNING, "failed to process POST", bad)
} finally { } finally {

View File

@@ -2,6 +2,7 @@ package com.muwire.core.connection
import java.io.InputStream import java.io.InputStream
import java.io.OutputStream import java.io.OutputStream
import java.nio.charset.StandardCharsets
import com.muwire.core.EventBus import com.muwire.core.EventBus
import com.muwire.core.MuWireSettings import com.muwire.core.MuWireSettings
@@ -66,7 +67,7 @@ class PeerConnection extends Connection {
protected void write(Object message) { protected void write(Object message) {
byte[] payload byte[] payload
if (message instanceof Map) { if (message instanceof Map) {
payload = JsonOutput.toJson(message).bytes payload = JsonOutput.toJson(message).getBytes(StandardCharsets.UTF_8)
DataUtil.packHeader(payload.length, writeHeader) DataUtil.packHeader(payload.length, writeHeader)
log.fine "$name writing message type ${message.type} length $payload.length" log.fine "$name writing message type ${message.type} length $payload.length"
writeHeader[0] &= (byte)0x7F writeHeader[0] &= (byte)0x7F

View File

@@ -16,7 +16,6 @@ import java.nio.file.Files
import java.nio.file.StandardOpenOption import java.nio.file.StandardOpenOption
import java.security.MessageDigest import java.security.MessageDigest
import java.security.NoSuchAlgorithmException import java.security.NoSuchAlgorithmException
import java.util.logging.Level
@Log @Log
class DownloadSession { class DownloadSession {
@@ -24,7 +23,7 @@ class DownloadSession {
private static int SAMPLES = 10 private static int SAMPLES = 10
private final String meB64 private final String meB64
private final Pieces pieces private final Pieces downloaded, claimed
private final InfoHash infoHash private final InfoHash infoHash
private final Endpoint endpoint private final Endpoint endpoint
private final File file private final File file
@@ -37,10 +36,11 @@ class DownloadSession {
private ByteBuffer mapped private ByteBuffer mapped
DownloadSession(String meB64, Pieces pieces, InfoHash infoHash, Endpoint endpoint, File file, DownloadSession(String meB64, Pieces downloaded, Pieces claimed, InfoHash infoHash, Endpoint endpoint, File file,
int pieceSize, long fileLength) { int pieceSize, long fileLength) {
this.meB64 = meB64 this.meB64 = meB64
this.pieces = pieces this.downloaded = downloaded
this.claimed = claimed
this.endpoint = endpoint this.endpoint = endpoint
this.infoHash = infoHash this.infoHash = infoHash
this.file = file this.file = file
@@ -63,11 +63,20 @@ class DownloadSession {
OutputStream os = endpoint.getOutputStream() OutputStream os = endpoint.getOutputStream()
InputStream is = endpoint.getInputStream() InputStream is = endpoint.getInputStream()
int piece = pieces.claim() int piece
if (piece == -1) while(true) {
return false piece = downloaded.getRandomPiece()
boolean unclaim = true if (claimed.isMarked(piece)) {
if (downloaded.donePieces() + claimed.donePieces() == downloaded.nPieces) {
log.info("all pieces claimed")
return false
}
continue
}
break
}
claimed.markDownloaded(piece)
log.info("will download piece $piece") log.info("will download piece $piece")
long start = piece * pieceSize long start = piece * pieceSize
@@ -76,6 +85,7 @@ class DownloadSession {
String root = Base64.encode(infoHash.getRoot()) String root = Base64.encode(infoHash.getRoot())
FileChannel channel
try { try {
os.write("GET $root\r\n".getBytes(StandardCharsets.US_ASCII)) os.write("GET $root\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("Range: $start-$end\r\n".getBytes(StandardCharsets.US_ASCII)) os.write("Range: $start-$end\r\n".getBytes(StandardCharsets.US_ASCII))
@@ -125,46 +135,41 @@ class DownloadSession {
} }
// start the download // start the download
FileChannel channel channel = Files.newByteChannel(file.toPath(), EnumSet.of(StandardOpenOption.READ, StandardOpenOption.WRITE,
try { StandardOpenOption.SPARSE, StandardOpenOption.CREATE)) // TODO: double-check, maybe CREATE_NEW
channel = Files.newByteChannel(file.toPath(), EnumSet.of(StandardOpenOption.READ, StandardOpenOption.WRITE, mapped = channel.map(FileChannel.MapMode.READ_WRITE, start, end - start + 1)
StandardOpenOption.SPARSE, StandardOpenOption.CREATE)) // TODO: double-check, maybe CREATE_NEW
mapped = channel.map(FileChannel.MapMode.READ_WRITE, start, end - start + 1) byte[] tmp = new byte[0x1 << 13]
while(mapped.hasRemaining()) {
byte[] tmp = new byte[0x1 << 13] if (mapped.remaining() < tmp.length)
while(mapped.hasRemaining()) { tmp = new byte[mapped.remaining()]
if (mapped.remaining() < tmp.length) int read = is.read(tmp)
tmp = new byte[mapped.remaining()] if (read == -1)
int read = is.read(tmp) throw new IOException()
if (read == -1) synchronized(this) {
throw new IOException() mapped.put(tmp, 0, read)
synchronized(this) {
mapped.put(tmp, 0, read) if (timestamps.size() == SAMPLES) {
timestamps.removeFirst()
if (timestamps.size() == SAMPLES) { reads.removeFirst()
timestamps.removeFirst()
reads.removeFirst()
}
timestamps.addLast(System.currentTimeMillis())
reads.addLast(read)
} }
timestamps.addLast(System.currentTimeMillis())
reads.addLast(read)
} }
mapped.clear()
digest.update(mapped)
byte [] hash = digest.digest()
byte [] expected = new byte[32]
System.arraycopy(infoHash.getHashList(), piece * 32, expected, 0, 32)
if (hash != expected)
throw new BadHashException()
} finally {
try { channel?.close() } catch (IOException ignore) {}
} }
pieces.markDownloaded(piece)
unclaim = false mapped.clear()
digest.update(mapped)
byte [] hash = digest.digest()
byte [] expected = new byte[32]
System.arraycopy(infoHash.getHashList(), piece * 32, expected, 0, 32)
if (hash != expected)
throw new BadHashException()
downloaded.markDownloaded(piece)
} finally { } finally {
if (unclaim) claimed.clear(piece)
pieces.unclaim(piece) try { channel?.close() } catch (IOException ignore) {}
} }
return true return true
} }

View File

@@ -7,7 +7,6 @@ import com.muwire.core.connection.Endpoint
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
import java.util.logging.Level import java.util.logging.Level
import com.muwire.core.Constants import com.muwire.core.Constants
@@ -35,7 +34,7 @@ public class Downloader {
private final DownloadManager downloadManager private final DownloadManager downloadManager
private final Persona me private final Persona me
private final File file private final File file
private final Pieces pieces private final Pieces downloaded, claimed
private final long length private final long length
private InfoHash infoHash private InfoHash infoHash
private final int pieceSize private final int pieceSize
@@ -48,8 +47,7 @@ public class Downloader {
private volatile boolean cancelled private volatile boolean cancelled
private final AtomicBoolean eventFired = new AtomicBoolean() private volatile boolean eventFired
private boolean piecesFileClosed
public Downloader(EventBus eventBus, DownloadManager downloadManager, public Downloader(EventBus eventBus, DownloadManager downloadManager,
Persona me, File file, long length, InfoHash infoHash, Persona me, File file, long length, InfoHash infoHash,
@@ -74,7 +72,8 @@ public class Downloader {
nPieces = length / pieceSize + 1 nPieces = length / pieceSize + 1
this.nPieces = nPieces this.nPieces = nPieces
pieces = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO) downloaded = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO)
claimed = new Pieces(nPieces)
} }
public synchronized InfoHash getInfoHash() { public synchronized InfoHash getInfoHash() {
@@ -101,24 +100,20 @@ public class Downloader {
return return
piecesFile.eachLine { piecesFile.eachLine {
int piece = Integer.parseInt(it) int piece = Integer.parseInt(it)
pieces.markDownloaded(piece) downloaded.markDownloaded(piece)
} }
} }
void writePieces() { void writePieces() {
synchronized(piecesFile) { piecesFile.withPrintWriter { writer ->
if (piecesFileClosed) downloaded.getDownloaded().each { piece ->
return writer.println(piece)
piecesFile.withPrintWriter { writer ->
pieces.getDownloaded().each { piece ->
writer.println(piece)
}
} }
} }
} }
public long donePieces() { public long donePieces() {
pieces.donePieces() downloaded.donePieces()
} }
@@ -141,7 +136,7 @@ public class Downloader {
allFinished &= it.currentState == WorkerState.FINISHED allFinished &= it.currentState == WorkerState.FINISHED
} }
if (allFinished) { if (allFinished) {
if (pieces.isComplete()) if (downloaded.isComplete())
return DownloadState.FINISHED return DownloadState.FINISHED
return DownloadState.FAILED return DownloadState.FAILED
} }
@@ -176,10 +171,7 @@ public class Downloader {
cancelled = true cancelled = true
stop() stop()
file.delete() file.delete()
synchronized(piecesFile) { piecesFile.delete()
piecesFileClosed = true
piecesFile.delete()
}
} }
void stop() { void stop() {
@@ -239,8 +231,8 @@ public class Downloader {
} }
currentState = WorkerState.DOWNLOADING currentState = WorkerState.DOWNLOADING
boolean requestPerformed boolean requestPerformed
while(!pieces.isComplete()) { while(!downloaded.isComplete()) {
currentSession = new DownloadSession(me.toBase64(), pieces, getInfoHash(), endpoint, file, pieceSize, length) currentSession = new DownloadSession(me.toBase64(), downloaded, claimed, getInfoHash(), endpoint, file, pieceSize, length)
requestPerformed = currentSession.request() requestPerformed = currentSession.request()
if (!requestPerformed) if (!requestPerformed)
break break
@@ -250,11 +242,9 @@ public class Downloader {
log.log(Level.WARNING,"Exception while downloading",bad) log.log(Level.WARNING,"Exception while downloading",bad)
} finally { } finally {
currentState = WorkerState.FINISHED currentState = WorkerState.FINISHED
if (pieces.isComplete() && eventFired.compareAndSet(false, true)) { if (downloaded.isComplete() && !eventFired) {
synchronized(piecesFile) { piecesFile.delete()
piecesFileClosed = true eventFired = true
piecesFile.delete()
}
eventBus.publish( eventBus.publish(
new FileDownloadedEvent( new FileDownloadedEvent(
downloadedFile : new DownloadedFile(file, getInfoHash(), pieceSizePow2, Collections.emptySet()), downloadedFile : new DownloadedFile(file, getInfoHash(), pieceSizePow2, Collections.emptySet()),

View File

@@ -1,7 +1,7 @@
package com.muwire.core.download package com.muwire.core.download
class Pieces { class Pieces {
private final BitSet done, claimed private final BitSet bitSet
private final int nPieces private final int nPieces
private final float ratio private final float ratio
private final Random random = new Random() private final Random random = new Random()
@@ -13,53 +13,52 @@ class Pieces {
Pieces(int nPieces, float ratio) { Pieces(int nPieces, float ratio) {
this.nPieces = nPieces this.nPieces = nPieces
this.ratio = ratio this.ratio = ratio
done = new BitSet(nPieces) bitSet = new BitSet(nPieces)
claimed = new BitSet(nPieces)
} }
synchronized int claim() { synchronized int getRandomPiece() {
int claimedCardinality = claimed.cardinality() int cardinality = bitSet.cardinality()
if (claimedCardinality == nPieces) if (cardinality == nPieces)
return -1 return -1
// if fuller than ratio just do sequential // if fuller than ratio just do sequential
if ( (1.0f * claimedCardinality) / nPieces > ratio) { if ( (1.0f * cardinality) / nPieces > ratio) {
int rv = claimed.nextClearBit(0) return bitSet.nextClearBit(0)
claimed.set(rv)
return rv
} }
while(true) { while(true) {
int start = random.nextInt(nPieces) int start = random.nextInt(nPieces)
if (claimed.get(start)) if (bitSet.get(start))
continue continue
claimed.set(start)
return start return start
} }
} }
synchronized def getDownloaded() { def getDownloaded() {
def rv = [] def rv = []
for (int i = done.nextSetBit(0); i >= 0; i = done.nextSetBit(i+1)) { for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i+1)) {
rv << i rv << i
} }
rv rv
} }
synchronized void markDownloaded(int piece) { synchronized void markDownloaded(int piece) {
done.set(piece) bitSet.set(piece)
claimed.set(piece)
} }
synchronized void unclaim(int piece) { synchronized void clear(int piece) {
claimed.clear(piece) bitSet.clear(piece)
} }
synchronized boolean isComplete() { synchronized boolean isComplete() {
done.cardinality() == nPieces bitSet.cardinality() == nPieces
}
synchronized boolean isMarked(int piece) {
bitSet.get(piece)
} }
synchronized int donePieces() { synchronized int donePieces() {
done.cardinality() bitSet.cardinality()
} }
} }

View File

@@ -1,8 +0,0 @@
package com.muwire.core.search
import com.muwire.core.Event
class UIResultBatchEvent extends Event {
UUID uuid
UIResultEvent[] results
}

View File

@@ -51,23 +51,4 @@ class ContentUploader extends Uploader {
} }
} }
@Override
public String getName() {
return file.getName();
}
@Override
public synchronized int getProgress() {
if (mapped == null)
return 0
int position = mapped.position()
int total = request.getRange().end - request.getRange().start
(int)(position * 100.0 / total)
}
@Override
public String getDownloader() {
request.downloader.getHumanReadableName()
}
} }

View File

@@ -6,8 +6,6 @@ import java.nio.charset.StandardCharsets
import com.muwire.core.InfoHash import com.muwire.core.InfoHash
import com.muwire.core.connection.Endpoint import com.muwire.core.connection.Endpoint
import net.i2p.data.Base64
class HashListUploader extends Uploader { class HashListUploader extends Uploader {
private final InfoHash infoHash private final InfoHash infoHash
private final HashListRequest request private final HashListRequest request
@@ -16,7 +14,6 @@ class HashListUploader extends Uploader {
super(endpoint) super(endpoint)
this.infoHash = infoHash this.infoHash = infoHash
mapped = ByteBuffer.wrap(infoHash.getHashList()) mapped = ByteBuffer.wrap(infoHash.getHashList())
this.request = request
} }
void respond() { void respond() {
@@ -35,21 +32,4 @@ class HashListUploader extends Uploader {
} }
endpoint.getOutputStream().flush() endpoint.getOutputStream().flush()
} }
@Override
public String getName() {
return "Hash list for " + Base64.encode(infoHash.getRoot());
}
@Override
public synchronized int getProgress() {
(int)(mapped.position() * 100.0 / mapped.capacity())
}
@Override
public String getDownloader() {
request.downloader.getHumanReadableName()
}
} }

View File

@@ -23,13 +23,4 @@ abstract class Uploader {
return -1 return -1
mapped.position() mapped.position()
} }
abstract String getName();
/**
* @return an integer between 0 and 100
*/
abstract int getProgress();
abstract String getDownloader();
} }

View File

@@ -25,17 +25,4 @@ public class SharedFile {
public int getPieceSize() { public int getPieceSize() {
return pieceSize; return pieceSize;
} }
@Override
public int hashCode() {
return file.hashCode() ^ infoHash.hashCode();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof SharedFile))
return false;
SharedFile other = (SharedFile)o;
return file.equals(other.file) && infoHash.equals(other.infoHash);
}
} }

View File

@@ -1,5 +1,5 @@
group = com.muwire group = com.muwire
version = 0.2.4 version = 0.2.1
groovyVersion = 2.4.15 groovyVersion = 2.4.15
slf4jVersion = 1.7.25 slf4jVersion = 1.7.25
spockVersion = 1.1-groovy-2.4 spockVersion = 1.1-groovy-2.4

View File

@@ -62,7 +62,7 @@ class MainFrameController {
def searchEvent def searchEvent
if (hashSearch) { if (hashSearch) {
searchEvent = new SearchEvent(searchHash : root, uuid : uuid, oobInfohash: true) searchEvent = new SearchEvent(searchHash : root, uuid : uuid)
} else { } else {
// this can be improved a lot // this can be improved a lot
def replaced = search.toLowerCase().trim().replaceAll(Constants.SPLIT_PATTERN, " ") def replaced = search.toLowerCase().trim().replaceAll(Constants.SPLIT_PATTERN, " ")

View File

@@ -20,7 +20,6 @@ import com.muwire.core.files.FileHashedEvent
import com.muwire.core.files.FileLoadedEvent import com.muwire.core.files.FileLoadedEvent
import com.muwire.core.files.FileSharedEvent import com.muwire.core.files.FileSharedEvent
import com.muwire.core.search.QueryEvent import com.muwire.core.search.QueryEvent
import com.muwire.core.search.UIResultBatchEvent
import com.muwire.core.search.UIResultEvent import com.muwire.core.search.UIResultEvent
import com.muwire.core.trust.TrustEvent import com.muwire.core.trust.TrustEvent
import com.muwire.core.trust.TrustService import com.muwire.core.trust.TrustService
@@ -116,7 +115,6 @@ class MainFrameModel {
core = e.getNewValue() core = e.getNewValue()
me = core.me.getHumanReadableName() me = core.me.getHumanReadableName()
core.eventBus.register(UIResultEvent.class, this) core.eventBus.register(UIResultEvent.class, this)
core.eventBus.register(UIResultBatchEvent.class, this)
core.eventBus.register(DownloadStartedEvent.class, this) core.eventBus.register(DownloadStartedEvent.class, this)
core.eventBus.register(ConnectionEvent.class, this) core.eventBus.register(ConnectionEvent.class, this)
core.eventBus.register(DisconnectionEvent.class, this) core.eventBus.register(DisconnectionEvent.class, this)
@@ -163,11 +161,6 @@ class MainFrameModel {
resultsGroup?.model.handleResult(e) resultsGroup?.model.handleResult(e)
} }
void onUIResultBatchEvent(UIResultBatchEvent e) {
MVCGroup resultsGroup = results.get(e.uuid)
resultsGroup?.model.handleResultBatch(e.results)
}
void onDownloadStartedEvent(DownloadStartedEvent e) { void onDownloadStartedEvent(DownloadStartedEvent e) {
runInsideUIAsync { runInsideUIAsync {
downloads << e downloads << e

View File

@@ -52,22 +52,4 @@ class SearchTabModel {
table.model.fireTableDataChanged() table.model.fireTableDataChanged()
} }
} }
void handleResultBatch(UIResultEvent[] batch) {
runInsideUIAsync {
batch.each {
if (uiSettings.excludeLocalResult && it.sender == core.me)
return
def bucket = hashBucket.get(it.infohash)
if (bucket == null) {
bucket = []
hashBucket[it.infohash] = bucket
}
bucket << it
results << it
}
JTable table = builder.getVariable("results-table")
table.model.fireTableDataChanged()
}
}
} }

View File

@@ -120,7 +120,7 @@ class MainFrameView {
closureColumn(header: "Progress", preferredWidth: 20, type: String, read: { row -> closureColumn(header: "Progress", preferredWidth: 20, type: String, read: { row ->
int pieces = row.downloader.nPieces int pieces = row.downloader.nPieces
int done = row.downloader.donePieces() int done = row.downloader.donePieces()
"$done/$pieces pieces".toString() "$done/$pieces pieces"
}) })
closureColumn(header: "Sources", preferredWidth : 10, type: Integer, read : {row -> row.downloader.activeWorkers()}) closureColumn(header: "Sources", preferredWidth : 10, type: Integer, read : {row -> row.downloader.activeWorkers()})
closureColumn(header: "Speed", preferredWidth: 50, type:String, read :{row -> closureColumn(header: "Speed", preferredWidth: 50, type:String, read :{row ->
@@ -160,13 +160,16 @@ class MainFrameView {
scrollPane (constraints : BorderLayout.CENTER) { scrollPane (constraints : BorderLayout.CENTER) {
table(id : "uploads-table") { table(id : "uploads-table") {
tableModel(list : model.uploads) { tableModel(list : model.uploads) {
closureColumn(header : "Name", type : String, read : {row -> row.getName() }) closureColumn(header : "Name", type : String, read : {row -> row.file.getName() })
closureColumn(header : "Progress", type : String, read : { row -> closureColumn(header : "Progress", type : String, read : { row ->
int percent = row.getProgress() int position = row.getPosition()
def range = row.request.getRange()
int total = range.end - range.start
int percent = (int)((position * 100.0) / total)
"$percent%" "$percent%"
}) })
closureColumn(header : "Downloader", type : String, read : { row -> closureColumn(header : "Downloader", type : String, read : { row ->
row.getDownloader() row.request.downloader?.getHumanReadableName()
}) })
} }
} }