Persist feed metadata and items on successful fetch. Register feed manager for various events
This commit is contained in:
@@ -32,7 +32,11 @@ import com.muwire.core.filecert.CertificateManager
|
||||
import com.muwire.core.filecert.UICreateCertificateEvent
|
||||
import com.muwire.core.filecert.UIFetchCertificatesEvent
|
||||
import com.muwire.core.filecert.UIImportCertificateEvent
|
||||
import com.muwire.core.filefeeds.FeedFetchEvent
|
||||
import com.muwire.core.filefeeds.FeedItemFetchedEvent
|
||||
import com.muwire.core.filefeeds.FeedManager
|
||||
import com.muwire.core.filefeeds.UIFIlePublishedEvent
|
||||
import com.muwire.core.filefeeds.UIFeedConfigurationEvent
|
||||
import com.muwire.core.filefeeds.UIFileUnpublishedEvent
|
||||
import com.muwire.core.files.FileDownloadedEvent
|
||||
import com.muwire.core.files.FileHashedEvent
|
||||
@@ -118,6 +122,7 @@ public class Core {
|
||||
final CertificateManager certificateManager
|
||||
final ChatServer chatServer
|
||||
final ChatManager chatManager
|
||||
final FeedManager feedManager
|
||||
|
||||
private final Router router
|
||||
|
||||
@@ -315,6 +320,14 @@ public class Core {
|
||||
register(TrustEvent.class, chatServer)
|
||||
}
|
||||
|
||||
log.info("initializing feed manager")
|
||||
feedManager = new FeedManager(eventBus, home)
|
||||
eventBus.with {
|
||||
register(FeedItemFetchedEvent.class, feedManager)
|
||||
register(FeedFetchEvent.class, feedManager)
|
||||
register(UIFeedConfigurationEvent.class, feedManager)
|
||||
}
|
||||
|
||||
log.info "initializing results sender"
|
||||
ResultsSender resultsSender = new ResultsSender(eventBus, i2pConnector, me, props, certificateManager, chatServer)
|
||||
|
||||
@@ -395,6 +408,7 @@ public class Core {
|
||||
connectionEstablisher.start()
|
||||
hostCache.waitForLoad()
|
||||
updateClient?.start()
|
||||
feedManager.start()
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
@@ -428,6 +442,8 @@ public class Core {
|
||||
chatServer.stop()
|
||||
log.info("shutting down chat manager")
|
||||
chatManager.shutdown()
|
||||
log.info("shutting down feed manager")
|
||||
feedManager.stop()
|
||||
log.info("shutting down connection manager")
|
||||
connectionManager.shutdown()
|
||||
log.info("killing i2p session")
|
||||
|
||||
@@ -9,10 +9,12 @@ import java.util.concurrent.ThreadFactory
|
||||
import com.muwire.core.EventBus
|
||||
import com.muwire.core.Persona
|
||||
|
||||
import groovy.json.JsonOutput
|
||||
import groovy.json.JsonSlurper
|
||||
import groovy.util.logging.Log
|
||||
|
||||
import net.i2p.data.Base64
|
||||
import net.i2p.util.ConcurrentHashSet
|
||||
|
||||
@Log
|
||||
class FeedManager {
|
||||
@@ -40,6 +42,10 @@ class FeedManager {
|
||||
itemsFolder.mkdir()
|
||||
}
|
||||
|
||||
public Feed getFeed(Persona persona) {
|
||||
feeds.get(persona)
|
||||
}
|
||||
|
||||
void start() {
|
||||
log.info("starting feed manager")
|
||||
persister.submit({loadFeeds()} as Runnable)
|
||||
@@ -81,7 +87,7 @@ class FeedManager {
|
||||
|
||||
Set<FeedItem> items = feedItems.get(it)
|
||||
if (items == null) {
|
||||
items = new HashSet<>()
|
||||
items = new ConcurrentHashSet<>()
|
||||
feedItems.put(it, items)
|
||||
}
|
||||
items.add(item)
|
||||
@@ -90,4 +96,62 @@ class FeedManager {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void onFeedItemFetchedEvent(FeedItemFetchedEvent e) {
|
||||
Set<FeedItem> set = feedItems.get(e.item.getPublisher())
|
||||
if (set == null) {
|
||||
set = new ConcurrentHashSet<>()
|
||||
feedItems.put(e.getItem().getPublisher(), set)
|
||||
}
|
||||
set.add(e.item)
|
||||
}
|
||||
|
||||
void onFeedFetchEvent(FeedFetchEvent e) {
|
||||
if (e.status != FeedFetchStatus.FINISHED)
|
||||
return
|
||||
Feed feed = feeds.get(e.host)
|
||||
if (feed == null) {
|
||||
log.severe("Finished fetching non-existent feed " + e.host.getHumanReadableName())
|
||||
return
|
||||
}
|
||||
feed.setLastUpdated(e.getTimestamp())
|
||||
// save feed items, then save feed
|
||||
persister.submit({saveFeedItems(e.host)} as Runnable)
|
||||
persister.submit({saveFeedMetadata(feed)} as Runnable)
|
||||
}
|
||||
|
||||
void onUIFeedConfigurationEvent(UIFeedConfigurationEvent e) {
|
||||
feeds.put(e.feed.getPublisher(), e.feed)
|
||||
persister.submit({saveFeedMetadata(e.feed)} as Runnable)
|
||||
}
|
||||
|
||||
private void saveFeedItems(Persona publisher) {
|
||||
Set<FeedItem> set = feedItems.get(publisher)
|
||||
if (set == null)
|
||||
return // can happen if nothing was published
|
||||
|
||||
File itemsFile = new File(itemsFolder, publisher.destination.toBase32() + ".json")
|
||||
|
||||
itemsFile.withPrintWriter { writer ->
|
||||
set.each { item ->
|
||||
def obj = FeedItems.feedItemToObj(item)
|
||||
def json = JsonOutput.toJson(obj)
|
||||
writer.println(json)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void saveFeedMetadata(Feed feed) {
|
||||
File metadataFile = new File(metadataFolder, feed.publisher.destination.toBase32() + ".json")
|
||||
metadataFile.withPrintWriter { writer ->
|
||||
def json = [:]
|
||||
json.publisher = feed.getPublisher().toBase64()
|
||||
json.itemsToKeep = feed.getItemsToKeep()
|
||||
json.lastUpdated = feed.getLastUpdated()
|
||||
json.updateInterval = feed.getUpdateInterval()
|
||||
json.autoDownload = feed.isAutoDownload()
|
||||
json = JsonOutput.toJson(json)
|
||||
writer.println(json)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user