From 453935c3cb8445533d3fba29f9565c2822ebcb2b Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Tue, 28 May 2019 21:50:21 +0100 Subject: [PATCH] uploader code --- .../muwire/core/upload/UploadManager.groovy | 2 +- .../com/muwire/core/upload/Uploader.groovy | 46 ++++++- .../muwire/core/upload/UploaderTest.groovy | 117 ++++++++++++++++++ 3 files changed, 162 insertions(+), 3 deletions(-) create mode 100644 core/src/test/groovy/com/muwire/core/upload/UploaderTest.groovy diff --git a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy index 0cd47556..533e9447 100644 --- a/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/UploadManager.groovy @@ -46,7 +46,7 @@ public class UploadManager { } Request request = Request.parse(new InfoHash(infoHashRoot), e.getInputStream()) - Uploader uploader = new Uploader(request, e) + Uploader uploader = new Uploader(sharedFiles.iterator().next().file, request, e) eventBus.publish(new UploadEvent(uploader)) try { uploader.respond() diff --git a/core/src/main/groovy/com/muwire/core/upload/Uploader.groovy b/core/src/main/groovy/com/muwire/core/upload/Uploader.groovy index 05a08f19..ea231ddd 100644 --- a/core/src/main/groovy/com/muwire/core/upload/Uploader.groovy +++ b/core/src/main/groovy/com/muwire/core/upload/Uploader.groovy @@ -1,17 +1,59 @@ package com.muwire.core.upload +import java.nio.ByteBuffer +import java.nio.channels.FileChannel +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.StandardOpenOption + import com.muwire.core.connection.Endpoint class Uploader { + private final File file private final Request request private final Endpoint endpoint + private ByteBuffer mapped - Uploader(Request request, Endpoint endpoint) { + Uploader(File file, Request request, Endpoint endpoint) { + this.file = file this.request = request this.endpoint = endpoint } void respond() { - + OutputStream os = endpoint.getOutputStream() + Range range = request.getRange() + if (range.start >= file.length() || range.end >= file.length()) { + os.write("416 Range Not Satisfiable\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + os.flush() + return + } + + os.write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("Content-Range: $range.start-$range.end\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + + FileChannel channel + try { + channel = Files.newByteChannel(file.toPath(), EnumSet.of(StandardOpenOption.READ)) + mapped = channel.map(FileChannel.MapMode.READ_ONLY, range.start, range.end - range.start + 1) + byte [] tmp = new byte[0x1 << 13] + while(mapped.hasRemaining()) { + int start = mapped.position() + synchronized(this) { + mapped.get(tmp, 0, Math.min(tmp.length, mapped.remaining())) + } + int read = mapped.position() - start + endpoint.getOutputStream().write(tmp, 0, read) + } + } finally { + try {channel?.close() } catch (IOException ignored) {} + endpoint.getOutputStream().flush() + } + } + + public synchronized int getPosition() { + if (mapped == null) + return -1 + mapped.position() } } diff --git a/core/src/test/groovy/com/muwire/core/upload/UploaderTest.groovy b/core/src/test/groovy/com/muwire/core/upload/UploaderTest.groovy new file mode 100644 index 00000000..fb4fed22 --- /dev/null +++ b/core/src/test/groovy/com/muwire/core/upload/UploaderTest.groovy @@ -0,0 +1,117 @@ +package com.muwire.core.upload + + +import java.nio.charset.StandardCharsets + +import org.junit.After +import org.junit.Before +import org.junit.Test + +import com.muwire.core.InfoHash +import com.muwire.core.connection.Endpoint + +class UploaderTest { + + Endpoint endpoint + File file + Thread uploadThread + + InputStream is + OutputStream os + + Request request + Uploader uploader + + byte[] inFile + + @Before + public void setup() { + file?.delete() + file = File.createTempFile("uploadTest", "dat") + file.deleteOnExit() + is = new PipedInputStream(0x1 << 14) + os = new PipedOutputStream(is) + endpoint = new Endpoint(null, is, os, null) + } + + @After + public void teardown() { + file?.delete() + uploadThread?.interrupt() + Thread.sleep(50) + } + + private void fillFile(int length) { + byte [] data = new byte[length] + def random = new Random() + random.nextBytes(data) + def fos = new FileOutputStream(file) + fos.write(data) + fos.close() + inFile = data + } + + private void startUpload() { + uploader = new Uploader(file, request, endpoint) + uploadThread = new Thread(uploader.respond() as Runnable) + uploadThread.setDaemon(true) + uploadThread.start() + } + + private String readUntilRN() { + ByteArrayOutputStream baos = new ByteArrayOutputStream() + while(true) { + byte read = is.read() + if (read == -1) + throw new IOException() + if (read != '\r') { + baos.write(read) + continue + } + assert is.read() == '\n' + break + } + new String(baos.toByteArray(), StandardCharsets.US_ASCII) + } + + @Test + public void testSmallFile() { + fillFile(20) + request = new Request(range : new Range(0,19)) + startUpload() + assert "200 OK" == readUntilRN() + assert "Content-Range: 0-19" == readUntilRN() + assert "" == readUntilRN() + + byte [] data = new byte[20] + DataInputStream dis = new DataInputStream(is) + dis.readFully(data) + assert inFile == data + } + + @Test + public void testRequestMiddle() { + fillFile(20) + request = new Request(range : new Range(5,15)) + startUpload() + assert "200 OK" == readUntilRN() + assert "Content-Range: 5-15" == readUntilRN() + assert "" == readUntilRN() + + byte [] data = new byte[11] + DataInputStream dis = new DataInputStream(is) + dis.readFully(data) + for (int i = 0; i < data.length; i++) + assert inFile[i+5] == data[i] + } + + @Test + public void testOutOfRange() { + fillFile(20) + request = new Request(range : new Range(0,20)) + startUpload() + assert "416 Range Not Satisfiable" == readUntilRN() + assert "" == readUntilRN() + } + +}