Merge pull request #37 from LoveIsGrief/change-persister

Introduce persister that uses a directory structure
This commit is contained in:
Zlatin Balevsky
2020-01-25 14:36:41 +00:00
committed by GitHub
11 changed files with 317 additions and 163 deletions

View File

@@ -4,7 +4,7 @@ FROM jlesage/baseimage-gui:alpine-3.10-glibc
ARG DOCKER_IMAGE_VERSION=unknown
# JDK version
ARG JDK=9
ARG JDK=11
# Important directories
ARG TMP_DIR=/muwire-tmp

View File

@@ -21,7 +21,6 @@ import com.muwire.core.filecert.UICreateCertificateEvent
import com.muwire.core.files.DirectoryUnsharedEvent
import com.muwire.core.files.FileSharedEvent
import com.muwire.core.files.FileUnsharedEvent
import com.muwire.core.files.UIPersistFilesEvent
class FilesView extends BasicWindow {
private final FilesModel model
@@ -84,7 +83,6 @@ class FilesView extends BasicWindow {
Button unshareButton = new Button("Unshare", {
core.eventBus.publish(new FileUnsharedEvent(unsharedFile : sf))
core.eventBus.publish(new UIPersistFilesEvent())
MessageDialog.showMessageDialog(textGUI, "File Unshared", "Unshared "+sf.getFile().getName(), MessageDialogButton.OK)
} )
Button addCommentButton = new Button("Add Comment", {

View File

@@ -1,5 +1,8 @@
package com.muwire.core
import com.muwire.core.files.PersisterDoneEvent
import com.muwire.core.files.PersisterFolderService
import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.AtomicBoolean
@@ -31,7 +34,6 @@ import com.muwire.core.filecert.UIFetchCertificatesEvent
import com.muwire.core.filecert.UIImportCertificateEvent
import com.muwire.core.files.FileDownloadedEvent
import com.muwire.core.files.FileHashedEvent
import com.muwire.core.files.FileHashingEvent
import com.muwire.core.files.FileHasher
import com.muwire.core.files.FileLoadedEvent
import com.muwire.core.files.FileManager
@@ -41,7 +43,7 @@ import com.muwire.core.files.HasherService
import com.muwire.core.files.PersisterService
import com.muwire.core.files.SideCarFileEvent
import com.muwire.core.files.UICommentEvent
import com.muwire.core.files.UIPersistFilesEvent
import com.muwire.core.files.AllFilesLoadedEvent
import com.muwire.core.files.DirectoryUnsharedEvent
import com.muwire.core.files.DirectoryWatchedEvent
@@ -74,10 +76,8 @@ import net.i2p.client.I2PClientFactory
import net.i2p.client.I2PSession
import net.i2p.client.streaming.I2PSocketManager
import net.i2p.client.streaming.I2PSocketManagerFactory
import net.i2p.client.streaming.I2PSocketOptions
import net.i2p.client.streaming.I2PSocketManager.DisconnectListener
import net.i2p.crypto.DSAEngine
import net.i2p.crypto.SigType
import net.i2p.data.Destination
import net.i2p.data.PrivateKey
import net.i2p.data.Signature
@@ -100,6 +100,7 @@ public class Core {
final TrustService trustService
final TrustSubscriber trustSubscriber
private final PersisterService persisterService
private final PersisterFolderService persisterFolderService
private final HostCache hostCache
private final ConnectionManager connectionManager
private final CacheClient cacheClient
@@ -259,7 +260,14 @@ public class Core {
log.info "initializing persistence service"
persisterService = new PersisterService(new File(home, "files.json"), eventBus, 60000, fileManager)
eventBus.register(UILoadedEvent.class, persisterService)
eventBus.register(UIPersistFilesEvent.class, persisterService)
log.info "initializing folder persistence service"
persisterFolderService = new PersisterFolderService(this, new File(home, "files"), eventBus)
eventBus.register(PersisterDoneEvent.class, persisterFolderService)
eventBus.register(FileDownloadedEvent.class, persisterFolderService)
eventBus.register(FileLoadedEvent.class, persisterFolderService)
eventBus.register(FileHashedEvent.class, persisterFolderService)
eventBus.register(FileUnsharedEvent.class, persisterFolderService)
log.info("initializing host cache")
File hostStorage = new File(home, "hosts.json")
@@ -398,6 +406,8 @@ public class Core {
trustService.stop()
log.info("shutting down persister service")
persisterService.stop()
log.info("shutting down persisterFolder service")
persisterFolderService.stop()
log.info("shutting down download manager")
downloadManager.shutdown()
log.info("shutting down connection acceptor")

View File

@@ -0,0 +1,109 @@
package com.muwire.core.files
import com.muwire.core.DownloadedFile
import com.muwire.core.InfoHash
import com.muwire.core.Persona
import com.muwire.core.Service
import com.muwire.core.SharedFile
import com.muwire.core.util.DataUtil
import net.i2p.data.Base64
import net.i2p.data.Destination
import java.util.stream.Collectors
abstract class BasePersisterService extends Service{
protected static FileLoadedEvent fromJson(def json) {
if (json.file == null || json.length == null || json.infoHash == null || json.hashList == null)
throw new IllegalArgumentException()
if (!(json.hashList instanceof List))
throw new IllegalArgumentException()
def file = new File(DataUtil.readi18nString(Base64.decode(json.file)))
file = file.getCanonicalFile()
if (!file.exists() || file.isDirectory())
return null
long length = Long.valueOf(json.length)
if (length != file.length())
return null
List hashList = (List) json.hashList
ByteArrayOutputStream baos = new ByteArrayOutputStream()
hashList.each {
byte [] hash = Base64.decode it.toString()
if (hash == null)
throw new IllegalArgumentException()
baos.write hash
}
byte[] hashListBytes = baos.toByteArray()
InfoHash ih = InfoHash.fromHashList(hashListBytes)
byte [] root = Base64.decode(json.infoHash.toString())
if (root == null)
throw new IllegalArgumentException()
if (!Arrays.equals(root, ih.getRoot()))
return null
int pieceSize = 0
if (json.pieceSize != null)
pieceSize = json.pieceSize
if (json.sources != null) {
List sources = (List)json.sources
Set<Destination> sourceSet = sources.stream().map({ d -> new Destination(d.toString())}).collect Collectors.toSet()
DownloadedFile df = new DownloadedFile(file, ih, pieceSize, sourceSet)
df.setComment(json.comment)
return new FileLoadedEvent(loadedFile : df)
}
SharedFile sf = new SharedFile(file, ih, pieceSize)
sf.setComment(json.comment)
if (json.downloaders != null)
sf.getDownloaders().addAll(json.downloaders)
if (json.searchers != null) {
json.searchers.each {
Persona searcher = null
if (it.searcher != null)
searcher = new Persona(new ByteArrayInputStream(Base64.decode(it.searcher)))
long timestamp = it.timestamp
String query = it.query
sf.hit(searcher, timestamp, query)
}
}
return new FileLoadedEvent(loadedFile: sf)
}
protected static toJson(SharedFile sf) {
def json = [:]
json.file = sf.getB64EncodedFileName()
json.length = sf.getCachedLength()
InfoHash ih = sf.getInfoHash()
json.infoHash = sf.getB64EncodedHashRoot()
json.pieceSize = sf.getPieceSize()
json.hashList = sf.getB64EncodedHashList()
json.comment = sf.getComment()
json.hits = sf.getHits()
json.downloaders = sf.getDownloaders()
if (!sf.searches.isEmpty()) {
Set searchers = new HashSet<>()
sf.searches.each {
def search = [:]
if (it.searcher != null)
search.searcher = it.searcher.toBase64()
search.timestamp = it.timestamp
search.query = it.query
searchers.add(search)
}
json.searchers = searchers
}
if (sf instanceof DownloadedFile) {
json.sources = sf.sources.stream().map( {d -> d.toBase64()}).collect(Collectors.toList())
}
json
}
}

View File

@@ -6,4 +6,5 @@ import com.muwire.core.SharedFile
class FileLoadedEvent extends Event {
SharedFile loadedFile
String source
}

View File

@@ -0,0 +1,12 @@
package com.muwire.core.files
import com.muwire.core.Event
/**
* Should be triggered by the old PersisterService
* once it has finished reading the old file
*
* @see PersisterService
*/
class PersisterDoneEvent extends Event{
}

View File

@@ -0,0 +1,150 @@
package com.muwire.core.files
import com.muwire.core.*
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.util.logging.Log
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.logging.Level
/**
* A persister that stores information about the files shared using
* individual JSON files in directories.
*
* The absolute path's 32bit hash to the shared file is used
* to build the directory and filename.
*
* This persister only starts working once the old persister has finished loading
* @see PersisterFolderService#getJsonPath
*/
@Log
class PersisterFolderService extends BasePersisterService {
final static int CUT_LENGTH = 6
private final Core core;
final File location
final EventBus listener
final int interval
final Timer timer
final ExecutorService persisterExecutor = Executors.newSingleThreadExecutor({ r ->
new Thread(r, "file persister")
} as ThreadFactory)
PersisterFolderService(Core core, File location, EventBus listener) {
this.core = core;
this.location = location
this.listener = listener
this.interval = interval
timer = new Timer("file-folder persister timer", true)
}
void stop() {
timer.cancel()
persisterExecutor.shutdown()
}
void onPersisterDoneEvent(PersisterDoneEvent persisterDoneEvent) {
log.info("Old persister done")
load()
}
void onFileHashedEvent(FileHashedEvent hashedEvent) {
persistFile(hashedEvent.sharedFile)
}
void onFileDownloadedEvent(FileDownloadedEvent downloadedEvent) {
if (core.getMuOptions().getShareDownloadedFiles()) {
persistFile(downloadedEvent.downloadedFile)
}
}
/**
* Get rid of the json of unshared files
* @param unsharedEvent
*/
void onFileUnsharedEvent(FileUnsharedEvent unsharedEvent) {
def jsonPath = getJsonPath(unsharedEvent.unsharedFile)
def jsonFile = jsonPath.toFile()
if(jsonFile.isFile()){
jsonFile.delete()
}
}
void onFileLoadedEvent(FileLoadedEvent loadedEvent) {
if(loadedEvent.source == "PersisterService"){
log.info("Migrating persisted file from PersisterService: "
+ loadedEvent.loadedFile.file.absolutePath.toString())
persistFile(loadedEvent.loadedFile)
}
}
void load() {
log.fine("Loading...")
Thread.currentThread().setPriority(Thread.MIN_PRIORITY)
if (location.exists() && location.isDirectory()) {
try {
_load()
}
catch (IllegalArgumentException e) {
log.log(Level.WARNING, "couldn't load files", e)
}
} else {
location.mkdirs()
listener.publish(new AllFilesLoadedEvent())
}
loaded = true
}
/**
* Loads every JSON into memory
*/
private void _load() {
int loaded = 0
def slurper = new JsonSlurper()
Files.walk(location.toPath())
.filter({ it.fileName.endsWith(".json") })
.forEach({
def parsed = slurper.parse it.toFile()
def event = fromJson parsed
if (event == null) return
log.fine("loaded file $event.loadedFile.file")
listener.publish event
loaded++
if (loaded % 10 == 0)
Thread.sleep(20)
})
listener.publish(new AllFilesLoadedEvent())
}
private void persistFile(SharedFile sf) {
persisterExecutor.submit({
def jsonPath = getJsonPath(sf)
def startTime = System.currentTimeMillis()
jsonPath.parent.toFile().mkdirs()
jsonPath.toFile().withPrintWriter { writer ->
def json = toJson sf
json = JsonOutput.toJson(json)
writer.println json
}
log.fine("Time(ms) to write json: " + (System.currentTimeMillis() - startTime))
} as Runnable)
}
private Path getJsonPath(SharedFile sf){
def pathHash = sf.getB64PathHash()
return Paths.get(
location.getAbsolutePath(),
pathHash.substring(0, CUT_LENGTH),
pathHash.substring(CUT_LENGTH) + ".json"
)
}
}

View File

@@ -1,40 +1,24 @@
package com.muwire.core.files
import java.nio.file.CopyOption
import java.nio.file.Files
import java.nio.file.StandardCopyOption
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.logging.Level
import java.util.stream.Collectors
import com.muwire.core.DownloadedFile
import com.muwire.core.EventBus
import com.muwire.core.InfoHash
import com.muwire.core.Persona
import com.muwire.core.Service
import com.muwire.core.SharedFile
import com.muwire.core.UILoadedEvent
import com.muwire.core.util.DataUtil
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.util.logging.Log
import net.i2p.data.Base64
import net.i2p.data.Destination
@Log
class PersisterService extends Service {
class PersisterService extends BasePersisterService {
final File location
final EventBus listener
final int interval
final Timer timer
final FileManager fileManager
final ExecutorService persisterExecutor = Executors.newSingleThreadExecutor({ r ->
new Thread(r, "file persister")
} as ThreadFactory)
PersisterService(File location, EventBus listener, int interval, FileManager fileManager) {
this.location = location
@@ -51,10 +35,6 @@ class PersisterService extends Service {
void onUILoadedEvent(UILoadedEvent e) {
timer.schedule({load()} as TimerTask, 1)
}
void onUIPersistFilesEvent(UIPersistFilesEvent e) {
persistFiles()
}
void load() {
Thread.currentThread().setPriority(Thread.MIN_PRIORITY)
@@ -69,6 +49,7 @@ class PersisterService extends Service {
def event = fromJson parsed
if (event != null) {
log.fine("loaded file $event.loadedFile.file")
event.source = "PersisterService"
listener.publish event
loaded++
if (loaded % 10 == 0)
@@ -76,126 +57,18 @@ class PersisterService extends Service {
}
}
}
listener.publish(new AllFilesLoadedEvent())
} catch (IllegalArgumentException|NumberFormatException e) {
// Backup the old hashes
location.renameTo(
new File(location.absolutePath + ".bak")
)
listener.publish(new PersisterDoneEvent())
} catch (IllegalArgumentException e) {
log.log(Level.WARNING, "couldn't load files",e)
}
} else {
listener.publish(new AllFilesLoadedEvent())
listener.publish(new PersisterDoneEvent())
}
timer.schedule({persistFiles()} as TimerTask, 1000, interval)
loaded = true
}
private static FileLoadedEvent fromJson(def json) {
if (json.file == null || json.length == null || json.infoHash == null || json.hashList == null)
throw new IllegalArgumentException()
if (!(json.hashList instanceof List))
throw new IllegalArgumentException()
def file = new File(DataUtil.readi18nString(Base64.decode(json.file)))
file = file.getCanonicalFile()
if (!file.exists() || file.isDirectory())
return null
long length = Long.valueOf(json.length)
if (length != file.length())
return null
List hashList = (List) json.hashList
ByteArrayOutputStream baos = new ByteArrayOutputStream()
hashList.each {
byte [] hash = Base64.decode it.toString()
if (hash == null)
throw new IllegalArgumentException()
baos.write hash
}
byte[] hashListBytes = baos.toByteArray()
InfoHash ih = InfoHash.fromHashList(hashListBytes)
byte [] root = Base64.decode(json.infoHash.toString())
if (root == null)
throw new IllegalArgumentException()
if (!Arrays.equals(root, ih.getRoot()))
return null
int pieceSize = 0
if (json.pieceSize != null)
pieceSize = json.pieceSize
if (json.sources != null) {
List sources = (List)json.sources
Set<Destination> sourceSet = sources.stream().map({d -> new Destination(d.toString())}).collect Collectors.toSet()
DownloadedFile df = new DownloadedFile(file, ih, pieceSize, sourceSet)
df.setComment(json.comment)
return new FileLoadedEvent(loadedFile : df)
}
SharedFile sf = new SharedFile(file, ih, pieceSize)
sf.setComment(json.comment)
if (json.downloaders != null)
sf.getDownloaders().addAll(json.downloaders)
if (json.searchers != null) {
json.searchers.each {
Persona searcher = null
if (it.searcher != null)
searcher = new Persona(new ByteArrayInputStream(Base64.decode(it.searcher)))
long timestamp = it.timestamp
String query = it.query
sf.hit(searcher, timestamp, query)
}
}
return new FileLoadedEvent(loadedFile: sf)
}
private void persistFiles() {
persisterExecutor.submit( {
def sharedFiles = fileManager.getSharedFiles()
File tmp = File.createTempFile("muwire-files", "tmp")
tmp.deleteOnExit()
tmp.withPrintWriter { writer ->
sharedFiles.each { k, v ->
def json = toJson(k,v)
json = JsonOutput.toJson(json)
writer.println json
}
}
Files.copy(tmp.toPath(), location.toPath(), StandardCopyOption.REPLACE_EXISTING)
tmp.delete()
} as Runnable)
}
private def toJson(File f, SharedFile sf) {
def json = [:]
json.file = sf.getB64EncodedFileName()
json.length = sf.getCachedLength()
InfoHash ih = sf.getInfoHash()
json.infoHash = sf.getB64EncodedHashRoot()
json.pieceSize = sf.getPieceSize()
json.hashList = sf.getB64EncodedHashList()
json.comment = sf.getComment()
json.hits = sf.getHits()
json.downloaders = sf.getDownloaders()
if (!sf.searches.isEmpty()) {
Set searchers = new HashSet<>()
sf.searches.each {
def search = [:]
if (it.searcher != null)
search.searcher = it.searcher.toBase64()
search.timestamp = it.timestamp
search.query = it.query
searchers.add(search)
}
json.searchers = searchers
}
if (sf instanceof DownloadedFile) {
json.sources = sf.sources.stream().map( {d -> d.toBase64()}).collect(Collectors.toList())
}
json
}
}

View File

@@ -1,6 +0,0 @@
package com.muwire.core.files
import com.muwire.core.Event
class UIPersistFilesEvent extends Event {
}

View File

@@ -2,6 +2,8 @@ package com.muwire.core;
import java.io.File;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -21,7 +23,8 @@ public class SharedFile {
private final String cachedPath;
private final long cachedLength;
private String b64PathHash;
private final String b64EncodedFileName;
private final String b64EncodedHashRoot;
private final List<String> b64EncodedHashList;
@@ -38,7 +41,7 @@ public class SharedFile {
this.cachedLength = file.length();
this.b64EncodedFileName = Base64.encode(DataUtil.encodei18nString(file.toString()));
this.b64EncodedHashRoot = Base64.encode(infoHash.getRoot());
List<String> b64List = new ArrayList<String>();
byte[] tmp = new byte[32];
for (int i = 0; i < infoHash.getHashList().length / 32; i++) {
@@ -52,6 +55,19 @@ public class SharedFile {
return file;
}
public byte[] getPathHash() throws NoSuchAlgorithmException {
var digester = MessageDigest.getInstance("SHA-256");
digester.update(file.getAbsolutePath().getBytes());
return digester.digest();
}
public String getB64PathHash() throws NoSuchAlgorithmException {
if(b64PathHash == null){
b64PathHash = Base64.encode(getPathHash());
}
return b64PathHash;
}
public InfoHash getInfoHash() {
return infoHash;
}

View File

@@ -3,15 +3,11 @@ package com.muwire.gui
import griffon.core.GriffonApplication
import griffon.core.artifact.GriffonController
import griffon.core.controller.ControllerAction
import griffon.core.mvc.MVCGroup
import griffon.core.mvc.MVCGroupConfiguration
import griffon.inject.MVCMember
import griffon.metadata.ArtifactProviderFor
import groovy.json.StringEscapeUtils
import net.i2p.crypto.DSAEngine
import net.i2p.data.Base64
import net.i2p.data.Signature
import net.i2p.data.SigningPrivateKey
import java.awt.Desktop
import java.awt.Toolkit
@@ -30,15 +26,11 @@ import com.muwire.core.Persona
import com.muwire.core.SharedFile
import com.muwire.core.SplitPattern
import com.muwire.core.download.Downloader
import com.muwire.core.download.DownloadStartedEvent
import com.muwire.core.download.UIDownloadCancelledEvent
import com.muwire.core.download.UIDownloadEvent
import com.muwire.core.download.UIDownloadPausedEvent
import com.muwire.core.download.UIDownloadResumedEvent
import com.muwire.core.filecert.UICreateCertificateEvent
import com.muwire.core.files.DirectoryUnsharedEvent
import com.muwire.core.files.FileUnsharedEvent
import com.muwire.core.files.UIPersistFilesEvent
import com.muwire.core.search.QueryEvent
import com.muwire.core.search.SearchEvent
import com.muwire.core.trust.RemoteTrustList
@@ -371,7 +363,6 @@ class MainFrameController {
sf.each {
core.eventBus.publish(new FileUnsharedEvent(unsharedFile : it))
}
core.eventBus.publish(new UIPersistFilesEvent())
}
@ControllerAction
@@ -534,4 +525,4 @@ class MainFrameController {
core = e.getNewValue()
})
}
}
}