hook up persistence service
This commit is contained in:
@@ -19,6 +19,7 @@ import com.muwire.core.files.FileManager
|
|||||||
import com.muwire.core.files.FileSharedEvent
|
import com.muwire.core.files.FileSharedEvent
|
||||||
import com.muwire.core.files.FileUnsharedEvent
|
import com.muwire.core.files.FileUnsharedEvent
|
||||||
import com.muwire.core.files.HasherService
|
import com.muwire.core.files.HasherService
|
||||||
|
import com.muwire.core.files.PersisterService
|
||||||
import com.muwire.core.hostcache.CacheClient
|
import com.muwire.core.hostcache.CacheClient
|
||||||
import com.muwire.core.hostcache.HostCache
|
import com.muwire.core.hostcache.HostCache
|
||||||
import com.muwire.core.hostcache.HostDiscoveredEvent
|
import com.muwire.core.hostcache.HostDiscoveredEvent
|
||||||
@@ -181,6 +182,10 @@ class Core {
|
|||||||
eventBus.register(FileUnsharedEvent.class, fileManager)
|
eventBus.register(FileUnsharedEvent.class, fileManager)
|
||||||
eventBus.register(SearchEvent.class, fileManager)
|
eventBus.register(SearchEvent.class, fileManager)
|
||||||
|
|
||||||
|
log.info "initializing persistence service"
|
||||||
|
PersisterService persisterService = new PersisterService(new File(home, "files.json"), eventBus, 5000, fileManager)
|
||||||
|
persisterService.start()
|
||||||
|
|
||||||
|
|
||||||
// ... at the end, sleep or execute script
|
// ... at the end, sleep or execute script
|
||||||
if (args.length == 0) {
|
if (args.length == 0) {
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ class FileManager {
|
|||||||
|
|
||||||
final EventBus eventBus
|
final EventBus eventBus
|
||||||
final Map<byte[], Set<SharedFile>> rootToFiles = new HashMap<>()
|
final Map<byte[], Set<SharedFile>> rootToFiles = new HashMap<>()
|
||||||
final Map<File, SharedFile> fileToSharedFile = new HashMap<>()
|
final Map<File, SharedFile> fileToSharedFile = Collections.synchronizedMap(new HashMap<>())
|
||||||
final Map<String, Set<File>> nameToFiles = new HashMap<>()
|
final Map<String, Set<File>> nameToFiles = new HashMap<>()
|
||||||
final SearchIndex index = new SearchIndex()
|
final SearchIndex index = new SearchIndex()
|
||||||
|
|
||||||
@@ -79,8 +79,9 @@ class FileManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Map<File, SharedFile> getSharedFiles() {
|
Map<File, SharedFile> getSharedFiles() {
|
||||||
// TODO: figure out locking
|
synchronized(fileToSharedFile) {
|
||||||
fileToSharedFile
|
return new HashMap<>(fileToSharedFile)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void onSearchEvent(SearchEvent e) {
|
void onSearchEvent(SearchEvent e) {
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.muwire.core.files
|
package com.muwire.core.files
|
||||||
|
|
||||||
|
import java.util.logging.Level
|
||||||
import java.util.stream.Collectors
|
import java.util.stream.Collectors
|
||||||
|
|
||||||
import com.muwire.core.DownloadedFile
|
import com.muwire.core.DownloadedFile
|
||||||
@@ -7,25 +8,28 @@ import com.muwire.core.EventBus
|
|||||||
import com.muwire.core.InfoHash
|
import com.muwire.core.InfoHash
|
||||||
import com.muwire.core.Service
|
import com.muwire.core.Service
|
||||||
import com.muwire.core.SharedFile
|
import com.muwire.core.SharedFile
|
||||||
|
import com.muwire.core.util.DataUtil
|
||||||
|
|
||||||
import groovy.json.JsonOutput
|
import groovy.json.JsonOutput
|
||||||
import groovy.json.JsonSlurper
|
import groovy.json.JsonSlurper
|
||||||
import net.i2p.data.Base32
|
import groovy.util.logging.Log
|
||||||
|
import net.i2p.data.Base64
|
||||||
import net.i2p.data.Destination
|
import net.i2p.data.Destination
|
||||||
|
|
||||||
|
@Log
|
||||||
class PersisterService extends Service {
|
class PersisterService extends Service {
|
||||||
|
|
||||||
final File location
|
final File location
|
||||||
final EventBus listener
|
final EventBus listener
|
||||||
final int interval
|
final int interval
|
||||||
final Timer timer
|
final Timer timer
|
||||||
final def fileSource
|
final FileManager fileManager
|
||||||
|
|
||||||
PersisterService(File location, EventBus listener, int interval, def fileSource) {
|
PersisterService(File location, EventBus listener, int interval, FileManager fileManager) {
|
||||||
this.location = location
|
this.location = location
|
||||||
this.listener = listener
|
this.listener = listener
|
||||||
this.interval = interval
|
this.interval = interval
|
||||||
this.fileSource = fileSource
|
this.fileManager = fileManager
|
||||||
timer = new Timer("file persister", true)
|
timer = new Timer("file persister", true)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -45,12 +49,14 @@ class PersisterService extends Service {
|
|||||||
if (it.trim().length() > 0) {
|
if (it.trim().length() > 0) {
|
||||||
def parsed = slurper.parseText it
|
def parsed = slurper.parseText it
|
||||||
def event = fromJson parsed
|
def event = fromJson parsed
|
||||||
if (event != null)
|
if (event != null) {
|
||||||
|
log.fine("loaded file $event.loadedFile.file")
|
||||||
listener.publish event
|
listener.publish event
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IllegalArgumentException|NumberFormatException e) {
|
} catch (IllegalArgumentException|NumberFormatException e) {
|
||||||
// abort loading
|
log.log(Level.WARNING, "couldn't load files",e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
timer.schedule({persistFiles()} as TimerTask, 0, interval)
|
timer.schedule({persistFiles()} as TimerTask, 0, interval)
|
||||||
@@ -63,7 +69,7 @@ class PersisterService extends Service {
|
|||||||
if (!(json.hashList instanceof List))
|
if (!(json.hashList instanceof List))
|
||||||
throw new IllegalArgumentException()
|
throw new IllegalArgumentException()
|
||||||
|
|
||||||
def file = new File(json.file)
|
def file = new File(DataUtil.readi18nString(Base64.decode(json.file)))
|
||||||
file = file.getCanonicalFile()
|
file = file.getCanonicalFile()
|
||||||
if (!file.exists() || file.isDirectory())
|
if (!file.exists() || file.isDirectory())
|
||||||
return null
|
return null
|
||||||
@@ -74,7 +80,7 @@ class PersisterService extends Service {
|
|||||||
List hashList = (List) json.hashList
|
List hashList = (List) json.hashList
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream()
|
ByteArrayOutputStream baos = new ByteArrayOutputStream()
|
||||||
hashList.each {
|
hashList.each {
|
||||||
byte [] hash = Base32.decode it.toString()
|
byte [] hash = Base64.decode it.toString()
|
||||||
if (hash == null)
|
if (hash == null)
|
||||||
throw new IllegalArgumentException()
|
throw new IllegalArgumentException()
|
||||||
baos.write hash
|
baos.write hash
|
||||||
@@ -82,7 +88,7 @@ class PersisterService extends Service {
|
|||||||
byte[] hashListBytes = baos.toByteArray()
|
byte[] hashListBytes = baos.toByteArray()
|
||||||
|
|
||||||
InfoHash ih = InfoHash.fromHashList(hashListBytes)
|
InfoHash ih = InfoHash.fromHashList(hashListBytes)
|
||||||
byte [] root = Base32.decode(json.infoHash.toString())
|
byte [] root = Base64.decode(json.infoHash.toString())
|
||||||
if (root == null)
|
if (root == null)
|
||||||
throw new IllegalArgumentException()
|
throw new IllegalArgumentException()
|
||||||
if (!Arrays.equals(root, ih.getRoot()))
|
if (!Arrays.equals(root, ih.getRoot()))
|
||||||
@@ -102,7 +108,7 @@ class PersisterService extends Service {
|
|||||||
|
|
||||||
private void persistFiles() {
|
private void persistFiles() {
|
||||||
location.delete()
|
location.delete()
|
||||||
def sharedFiles = fileSource.getSharedFiles()
|
def sharedFiles = fileManager.getSharedFiles()
|
||||||
location.withPrintWriter { writer ->
|
location.withPrintWriter { writer ->
|
||||||
sharedFiles.each { k, v ->
|
sharedFiles.each { k, v ->
|
||||||
def json = toJson(k,v)
|
def json = toJson(k,v)
|
||||||
@@ -114,15 +120,15 @@ class PersisterService extends Service {
|
|||||||
|
|
||||||
private def toJson(File f, SharedFile sf) {
|
private def toJson(File f, SharedFile sf) {
|
||||||
def json = [:]
|
def json = [:]
|
||||||
json.file = f.getCanonicalFile().toString()
|
json.file = Base64.encode DataUtil.encodei18nString(f.getCanonicalFile().toString())
|
||||||
json.length = f.length()
|
json.length = f.length()
|
||||||
InfoHash ih = sf.getInfoHash()
|
InfoHash ih = sf.getInfoHash()
|
||||||
json.infoHash = Base32.encode ih.getRoot()
|
json.infoHash = Base64.encode ih.getRoot()
|
||||||
byte [] tmp = new byte [32]
|
byte [] tmp = new byte [32]
|
||||||
json.hashList = []
|
json.hashList = []
|
||||||
for (int i = 0;i < ih.getHashList().length / 32; i++) {
|
for (int i = 0;i < ih.getHashList().length / 32; i++) {
|
||||||
System.arraycopy(ih.getHashList(), i * 32, tmp, 0, 32)
|
System.arraycopy(ih.getHashList(), i * 32, tmp, 0, 32)
|
||||||
json.hashList.add Base32.encode(tmp)
|
json.hashList.add Base64.encode(tmp)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sf instanceof DownloadedFile) {
|
if (sf instanceof DownloadedFile) {
|
||||||
|
|||||||
@@ -49,4 +49,16 @@ class DataUtil {
|
|||||||
System.arraycopy(encoded, 2, string, 0, length)
|
System.arraycopy(encoded, 2, string, 0, length)
|
||||||
new String(string, StandardCharsets.UTF_8)
|
new String(string, StandardCharsets.UTF_8)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static byte[] encodei18nString(String string) {
|
||||||
|
byte [] utf8 = string.getBytes(StandardCharsets.UTF_8)
|
||||||
|
if (utf8.length > Short.MAX_VALUE)
|
||||||
|
throw new IllegalArgumentException("String in utf8 too long $utf8.length")
|
||||||
|
def baos = new ByteArrayOutputStream()
|
||||||
|
def daos = new DataOutputStream(baos)
|
||||||
|
daos.writeShort((short) utf8.length)
|
||||||
|
daos.write(utf8)
|
||||||
|
daos.close()
|
||||||
|
baos.toByteArray()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user