uploader code
This commit is contained in:
@@ -46,7 +46,7 @@ public class UploadManager {
|
||||
}
|
||||
|
||||
Request request = Request.parse(new InfoHash(infoHashRoot), e.getInputStream())
|
||||
Uploader uploader = new Uploader(request, e)
|
||||
Uploader uploader = new Uploader(sharedFiles.iterator().next().file, request, e)
|
||||
eventBus.publish(new UploadEvent(uploader))
|
||||
try {
|
||||
uploader.respond()
|
||||
|
||||
@@ -1,17 +1,59 @@
|
||||
package com.muwire.core.upload
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.channels.FileChannel
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.StandardOpenOption
|
||||
|
||||
import com.muwire.core.connection.Endpoint
|
||||
|
||||
class Uploader {
|
||||
private final File file
|
||||
private final Request request
|
||||
private final Endpoint endpoint
|
||||
private ByteBuffer mapped
|
||||
|
||||
Uploader(Request request, Endpoint endpoint) {
|
||||
Uploader(File file, Request request, Endpoint endpoint) {
|
||||
this.file = file
|
||||
this.request = request
|
||||
this.endpoint = endpoint
|
||||
}
|
||||
|
||||
void respond() {
|
||||
|
||||
OutputStream os = endpoint.getOutputStream()
|
||||
Range range = request.getRange()
|
||||
if (range.start >= file.length() || range.end >= file.length()) {
|
||||
os.write("416 Range Not Satisfiable\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
|
||||
os.flush()
|
||||
return
|
||||
}
|
||||
|
||||
os.write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII))
|
||||
os.write("Content-Range: $range.start-$range.end\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
|
||||
|
||||
FileChannel channel
|
||||
try {
|
||||
channel = Files.newByteChannel(file.toPath(), EnumSet.of(StandardOpenOption.READ))
|
||||
mapped = channel.map(FileChannel.MapMode.READ_ONLY, range.start, range.end - range.start + 1)
|
||||
byte [] tmp = new byte[0x1 << 13]
|
||||
while(mapped.hasRemaining()) {
|
||||
int start = mapped.position()
|
||||
synchronized(this) {
|
||||
mapped.get(tmp, 0, Math.min(tmp.length, mapped.remaining()))
|
||||
}
|
||||
int read = mapped.position() - start
|
||||
endpoint.getOutputStream().write(tmp, 0, read)
|
||||
}
|
||||
} finally {
|
||||
try {channel?.close() } catch (IOException ignored) {}
|
||||
endpoint.getOutputStream().flush()
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized int getPosition() {
|
||||
if (mapped == null)
|
||||
return -1
|
||||
mapped.position()
|
||||
}
|
||||
}
|
||||
|
||||
117
core/src/test/groovy/com/muwire/core/upload/UploaderTest.groovy
Normal file
117
core/src/test/groovy/com/muwire/core/upload/UploaderTest.groovy
Normal file
@@ -0,0 +1,117 @@
|
||||
package com.muwire.core.upload
|
||||
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
|
||||
import com.muwire.core.InfoHash
|
||||
import com.muwire.core.connection.Endpoint
|
||||
|
||||
class UploaderTest {
|
||||
|
||||
Endpoint endpoint
|
||||
File file
|
||||
Thread uploadThread
|
||||
|
||||
InputStream is
|
||||
OutputStream os
|
||||
|
||||
Request request
|
||||
Uploader uploader
|
||||
|
||||
byte[] inFile
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
file?.delete()
|
||||
file = File.createTempFile("uploadTest", "dat")
|
||||
file.deleteOnExit()
|
||||
is = new PipedInputStream(0x1 << 14)
|
||||
os = new PipedOutputStream(is)
|
||||
endpoint = new Endpoint(null, is, os, null)
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() {
|
||||
file?.delete()
|
||||
uploadThread?.interrupt()
|
||||
Thread.sleep(50)
|
||||
}
|
||||
|
||||
private void fillFile(int length) {
|
||||
byte [] data = new byte[length]
|
||||
def random = new Random()
|
||||
random.nextBytes(data)
|
||||
def fos = new FileOutputStream(file)
|
||||
fos.write(data)
|
||||
fos.close()
|
||||
inFile = data
|
||||
}
|
||||
|
||||
private void startUpload() {
|
||||
uploader = new Uploader(file, request, endpoint)
|
||||
uploadThread = new Thread(uploader.respond() as Runnable)
|
||||
uploadThread.setDaemon(true)
|
||||
uploadThread.start()
|
||||
}
|
||||
|
||||
private String readUntilRN() {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream()
|
||||
while(true) {
|
||||
byte read = is.read()
|
||||
if (read == -1)
|
||||
throw new IOException()
|
||||
if (read != '\r') {
|
||||
baos.write(read)
|
||||
continue
|
||||
}
|
||||
assert is.read() == '\n'
|
||||
break
|
||||
}
|
||||
new String(baos.toByteArray(), StandardCharsets.US_ASCII)
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallFile() {
|
||||
fillFile(20)
|
||||
request = new Request(range : new Range(0,19))
|
||||
startUpload()
|
||||
assert "200 OK" == readUntilRN()
|
||||
assert "Content-Range: 0-19" == readUntilRN()
|
||||
assert "" == readUntilRN()
|
||||
|
||||
byte [] data = new byte[20]
|
||||
DataInputStream dis = new DataInputStream(is)
|
||||
dis.readFully(data)
|
||||
assert inFile == data
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestMiddle() {
|
||||
fillFile(20)
|
||||
request = new Request(range : new Range(5,15))
|
||||
startUpload()
|
||||
assert "200 OK" == readUntilRN()
|
||||
assert "Content-Range: 5-15" == readUntilRN()
|
||||
assert "" == readUntilRN()
|
||||
|
||||
byte [] data = new byte[11]
|
||||
DataInputStream dis = new DataInputStream(is)
|
||||
dis.readFully(data)
|
||||
for (int i = 0; i < data.length; i++)
|
||||
assert inFile[i+5] == data[i]
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOutOfRange() {
|
||||
fillFile(20)
|
||||
request = new Request(range : new Range(0,20))
|
||||
startUpload()
|
||||
assert "416 Range Not Satisfiable" == readUntilRN()
|
||||
assert "" == readUntilRN()
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user