diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 9409725b..f3b047c1 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -32,7 +32,11 @@ import com.muwire.core.filecert.CertificateManager import com.muwire.core.filecert.UICreateCertificateEvent import com.muwire.core.filecert.UIFetchCertificatesEvent import com.muwire.core.filecert.UIImportCertificateEvent +import com.muwire.core.filefeeds.FeedFetchEvent +import com.muwire.core.filefeeds.FeedItemFetchedEvent +import com.muwire.core.filefeeds.FeedManager import com.muwire.core.filefeeds.UIFIlePublishedEvent +import com.muwire.core.filefeeds.UIFeedConfigurationEvent import com.muwire.core.filefeeds.UIFileUnpublishedEvent import com.muwire.core.files.FileDownloadedEvent import com.muwire.core.files.FileHashedEvent @@ -118,6 +122,7 @@ public class Core { final CertificateManager certificateManager final ChatServer chatServer final ChatManager chatManager + final FeedManager feedManager private final Router router @@ -315,6 +320,14 @@ public class Core { register(TrustEvent.class, chatServer) } + log.info("initializing feed manager") + feedManager = new FeedManager(eventBus, home) + eventBus.with { + register(FeedItemFetchedEvent.class, feedManager) + register(FeedFetchEvent.class, feedManager) + register(UIFeedConfigurationEvent.class, feedManager) + } + log.info "initializing results sender" ResultsSender resultsSender = new ResultsSender(eventBus, i2pConnector, me, props, certificateManager, chatServer) @@ -395,6 +408,7 @@ public class Core { connectionEstablisher.start() hostCache.waitForLoad() updateClient?.start() + feedManager.start() } public void shutdown() { @@ -428,6 +442,8 @@ public class Core { chatServer.stop() log.info("shutting down chat manager") chatManager.shutdown() + log.info("shutting down feed manager") + feedManager.stop() log.info("shutting down connection manager") connectionManager.shutdown() log.info("killing i2p session") diff --git a/core/src/main/groovy/com/muwire/core/filefeeds/FeedManager.groovy b/core/src/main/groovy/com/muwire/core/filefeeds/FeedManager.groovy index 3214b163..72f3eab1 100644 --- a/core/src/main/groovy/com/muwire/core/filefeeds/FeedManager.groovy +++ b/core/src/main/groovy/com/muwire/core/filefeeds/FeedManager.groovy @@ -9,10 +9,12 @@ import java.util.concurrent.ThreadFactory import com.muwire.core.EventBus import com.muwire.core.Persona +import groovy.json.JsonOutput import groovy.json.JsonSlurper import groovy.util.logging.Log import net.i2p.data.Base64 +import net.i2p.util.ConcurrentHashSet @Log class FeedManager { @@ -40,6 +42,10 @@ class FeedManager { itemsFolder.mkdir() } + public Feed getFeed(Persona persona) { + feeds.get(persona) + } + void start() { log.info("starting feed manager") persister.submit({loadFeeds()} as Runnable) @@ -81,7 +87,7 @@ class FeedManager { Set items = feedItems.get(it) if (items == null) { - items = new HashSet<>() + items = new ConcurrentHashSet<>() feedItems.put(it, items) } items.add(item) @@ -90,4 +96,62 @@ class FeedManager { } } } + + void onFeedItemFetchedEvent(FeedItemFetchedEvent e) { + Set set = feedItems.get(e.item.getPublisher()) + if (set == null) { + set = new ConcurrentHashSet<>() + feedItems.put(e.getItem().getPublisher(), set) + } + set.add(e.item) + } + + void onFeedFetchEvent(FeedFetchEvent e) { + if (e.status != FeedFetchStatus.FINISHED) + return + Feed feed = feeds.get(e.host) + if (feed == null) { + log.severe("Finished fetching non-existent feed " + e.host.getHumanReadableName()) + return + } + feed.setLastUpdated(e.getTimestamp()) + // save feed items, then save feed + persister.submit({saveFeedItems(e.host)} as Runnable) + persister.submit({saveFeedMetadata(feed)} as Runnable) + } + + void onUIFeedConfigurationEvent(UIFeedConfigurationEvent e) { + feeds.put(e.feed.getPublisher(), e.feed) + persister.submit({saveFeedMetadata(e.feed)} as Runnable) + } + + private void saveFeedItems(Persona publisher) { + Set set = feedItems.get(publisher) + if (set == null) + return // can happen if nothing was published + + File itemsFile = new File(itemsFolder, publisher.destination.toBase32() + ".json") + + itemsFile.withPrintWriter { writer -> + set.each { item -> + def obj = FeedItems.feedItemToObj(item) + def json = JsonOutput.toJson(obj) + writer.println(json) + } + } + } + + private void saveFeedMetadata(Feed feed) { + File metadataFile = new File(metadataFolder, feed.publisher.destination.toBase32() + ".json") + metadataFile.withPrintWriter { writer -> + def json = [:] + json.publisher = feed.getPublisher().toBase64() + json.itemsToKeep = feed.getItemsToKeep() + json.lastUpdated = feed.getLastUpdated() + json.updateInterval = feed.getUpdateInterval() + json.autoDownload = feed.isAutoDownload() + json = JsonOutput.toJson(json) + writer.println(json) + } + } }