diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index f3b047c1..c954f79d 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -32,11 +32,13 @@ import com.muwire.core.filecert.CertificateManager import com.muwire.core.filecert.UICreateCertificateEvent import com.muwire.core.filecert.UIFetchCertificatesEvent import com.muwire.core.filecert.UIImportCertificateEvent +import com.muwire.core.filefeeds.FeedClient import com.muwire.core.filefeeds.FeedFetchEvent import com.muwire.core.filefeeds.FeedItemFetchedEvent import com.muwire.core.filefeeds.FeedManager import com.muwire.core.filefeeds.UIFIlePublishedEvent import com.muwire.core.filefeeds.UIFeedConfigurationEvent +import com.muwire.core.filefeeds.UIFeedUpdateEvent import com.muwire.core.filefeeds.UIFileUnpublishedEvent import com.muwire.core.files.FileDownloadedEvent import com.muwire.core.files.FileHashedEvent @@ -123,6 +125,7 @@ public class Core { final ChatServer chatServer final ChatManager chatManager final FeedManager feedManager + private final FeedClient feedClient private final Router router @@ -328,6 +331,10 @@ public class Core { register(UIFeedConfigurationEvent.class, feedManager) } + log.info("initializing feed client") + feedClient = new FeedClient(i2pConnector, eventBus, me, feedManager) + eventBus.register(UIFeedUpdateEvent.class, feedClient) + log.info "initializing results sender" ResultsSender resultsSender = new ResultsSender(eventBus, i2pConnector, me, props, certificateManager, chatServer) @@ -409,6 +416,7 @@ public class Core { hostCache.waitForLoad() updateClient?.start() feedManager.start() + feedClient.start() } public void shutdown() { @@ -444,6 +452,8 @@ public class Core { chatManager.shutdown() log.info("shutting down feed manager") feedManager.stop() + log.info("shutting down feed client") + feedClient.stop() log.info("shutting down connection manager") connectionManager.shutdown() log.info("killing i2p session") diff --git a/core/src/main/groovy/com/muwire/core/filefeeds/FeedClient.groovy b/core/src/main/groovy/com/muwire/core/filefeeds/FeedClient.groovy new file mode 100644 index 00000000..7defb831 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/filefeeds/FeedClient.groovy @@ -0,0 +1,109 @@ +package com.muwire.core.filefeeds + +import java.lang.System.Logger.Level +import java.nio.charset.StandardCharsets +import java.util.concurrent.Executor +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.zip.GZIPInputStream + +import com.muwire.core.EventBus +import com.muwire.core.Persona +import com.muwire.core.connection.Endpoint +import com.muwire.core.connection.I2PConnector +import com.muwire.core.util.DataUtil + +import groovy.json.JsonSlurper +import groovy.util.logging.Log + +@Log +class FeedClient { + + private final I2PConnector connector + private final EventBus eventBus + private final Persona me + private final FeedManager feedManager + + private final ExecutorService feedFetcher = Executors.newCachedThreadPool() + private final Timer feedUpdater = new Timer("feed-updater", true) + + FeedClient(I2PConnector connector, EventBus eventBus, Persona me, FeedManager feedManager) { + this.connector = connector + this.eventBus = eventBus + this.me = me + this.feedManager = feedManager + } + + private void start() { + feedUpdater.schedule({updateAnyFeeds()} as TimerTask, 60000, 60000) + } + + private void stop() { + feedUpdater.cancel() + feedFetcher.shutdown() + } + + private void updateAnyFeeds() { + feedManager.getFeedsToUpdate().each { + feedFetcher.execute({updateFeed(it)} as Runnable) + } + } + + void onUIFeedUpdateEvent(UIFeedUpdateEvent e) { + Feed feed = feedManager.getFeed(e.host) + if (feed == null) { + log.severe("UI request to update non-existent feed " + e.host.getHumanReadableName()) + return + } + + feedFetcher.execute({updateFeed(feed)} as Runnable) + } + + private void updateFeed(Feed feed) { + log.info("updating feed " + feed.getPublisher().getHumanReadableName()) + Endpoint endpoint = null + try { + eventBus.publish(new FeedFetchEvent(host : feed.getPublisher(), status : FeedFetchStatus.CONNECTING)) + endpoint = connector.connect(feed.getPublisher().getDestination()) + OutputStream os = endpoint.getOutputStream() + os.write("FEED\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("Persona:${me.toBase64()}\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("Timestamp:${feed.getLastUpdated()}\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) + os.flush() + + InputStream is = endpoint.getInputStream() + String code = DataUtil.readTillRN(is) + if (!code.startsWith("200")) + throw new IOException("Invalid code $code") + + // parse all headers + Map headers = DataUtil.readAllHeaders(is) + + if (!headers.containsKey("Count")) + throw new IOException("No count header") + + int items = Integer.parseInt(headers['Count']) + + eventBus.publish(new FeedFetchEvent(host : feed.getPublisher(), status : FeedFetchStatus.FETCHING, totalItems: items)) + + JsonSlurper slurper = new JsonSlurper() + DataInputStream dis = new DataInputStream(new GZIPInputStream(is)) + for (int i = 0; i < items; i++) { + int size = dis.readUnsignedShort() + byte [] tmp = new byte[size] + dis.readFully(tmp) + def json = slurper.parse(tmp) + FeedItem item = FeedItems.objToFeedItem(json, feed.getPublisher()) + eventBus.publish(new FeedItemFetchedEvent(item: item)) + } + + eventBus.publish(new FeedFetchEvent(host : feed.getPublisher(), status : FeedFetchStatus.FINISHED)) + } catch (Exception bad) { + log.log(Level.WARNING, "Feed update failed", bad) + eventBus.publish(new FeedFetchEvent(host : feed.getPublisher(), status : FeedFetchStatus.FAILED)) + } finally { + endpoint?.close() + } + } +} diff --git a/core/src/main/groovy/com/muwire/core/filefeeds/FeedManager.groovy b/core/src/main/groovy/com/muwire/core/filefeeds/FeedManager.groovy index 22cb0f4d..592a5752 100644 --- a/core/src/main/groovy/com/muwire/core/filefeeds/FeedManager.groovy +++ b/core/src/main/groovy/com/muwire/core/filefeeds/FeedManager.groovy @@ -5,6 +5,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.ThreadFactory +import java.util.stream.Collectors import com.muwire.core.EventBus import com.muwire.core.Persona @@ -46,6 +47,13 @@ class FeedManager { feeds.get(persona) } + public List getFeedsToUpdate() { + long now = System.currentTimeMillis() + feeds.values().stream(). + filter({Feed f -> f.getLastUpdated() + f.getUpdateInterval() <= now}) + .collect(Collectors.toList()) + } + void start() { log.info("starting feed manager") persister.submit({loadFeeds()} as Runnable) diff --git a/core/src/main/groovy/com/muwire/core/filefeeds/UIFeedUpdateEvent.groovy b/core/src/main/groovy/com/muwire/core/filefeeds/UIFeedUpdateEvent.groovy new file mode 100644 index 00000000..836cd4f9 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/filefeeds/UIFeedUpdateEvent.groovy @@ -0,0 +1,8 @@ +package com.muwire.core.filefeeds + +import com.muwire.core.Event +import com.muwire.core.Persona + +class UIFeedUpdateEvent extends Event { + Persona host +}