this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Misc fixes

+68 -42
+14 -8
feedgen.py
··· 4 4 import dag_cbor 5 5 import sys 6 6 import websockets 7 + 7 8 from atproto import CAR 8 9 from io import BytesIO 10 + from datetime import datetime, timezone 9 11 10 - from feeds import Manager 12 + from feeds import FeedManager 11 13 from feeds.rapidfire import RapidFireFeed 12 14 from feeds.popular import PopularFeed 15 + from firehose_manager import FirehoseManager 13 16 14 - from firehose_utils import FirehoseManager 15 - 16 - async def firehose_events(): 17 - firehose_manager = FirehoseManager() 18 - 17 + async def firehose_events(firehose_manager): 19 18 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 20 19 seq = firehose_manager.get_sequence_number() 21 20 if seq: ··· 56 55 feed_manager.register(RapidFireFeed) 57 56 feed_manager.register(PopularFeed) 58 57 59 - async for commit in firehose_events(): 60 - feed_manager.process(commit) 58 + current_minute = None 59 + async for commit in firehose_events(firehose_manager): 60 + feed_manager.process_commit(commit) 61 + 62 + now = datetime.now(timezone.utc) 63 + if now.minute != current_minute: 64 + current_minute = now.minute 65 + feed_manager.run_tasks_minute() 66 + firehose_manager.set_sequence_number(commit['seq']) 61 67 62 68 if __name__ == '__main__': 63 69 asyncio.run(main())
+18 -4
feeds/__init__.py
··· 1 + class BaseFeed: 2 + def process_commit(self, commit): 3 + raise NotImplementedError 4 + 5 + def serve_feed(self, limit, offset, langs): 6 + raise NotImplementedError 7 + 8 + def run_tasks_minute(self): 9 + pass 10 + 1 11 class FeedManager: 2 12 def __init__(self): 3 13 self.feeds = {} ··· 5 15 def register(self, feed): 6 16 self.feeds[feed.FEED_URI] = feed() 7 17 8 - def process(self, commit): 18 + def process_commit(self, commit): 9 19 for feed in self.feeds.values(): 10 - feed.process(commit) 20 + feed.process_commit(commit) 11 21 12 - def serve(self, feed_uri, limit, offset, langs): 22 + def serve_feed(self, feed_uri, limit, offset, langs): 13 23 feed = self.feeds.get(feed_uri) 14 24 if feed is not None: 15 - return feed.serve(limit, offset, langs) 25 + return feed.serve_feed(limit, offset, langs) 26 + 27 + def run_tasks_minute(self): 28 + for feed in self.feeds.values(): 29 + feed.run_tasks_minute()
+5 -3
feeds/popular.py
··· 3 3 import math 4 4 import sqlite3 5 5 6 - class PopularFeed: 6 + from . import BaseFeed 7 + 8 + class PopularFeed(BaseFeed): 7 9 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/popular' 8 10 9 11 def __init__(self): ··· 25 27 26 28 self.cleanup_checkpoint = 0 27 29 28 - def process(self, commit): 30 + def process_commit(self, commit): 29 31 op = commit['op'] 30 32 if op['action'] != 'create': 31 33 return ··· 58 60 "pragma wal_checkpoint(TRUNCATE)" 59 61 ) 60 62 61 - def serve(self, limit, offset, langs): 63 + def serve_feed(self, limit, offset, langs): 62 64 cur = self.db_cnx.execute(( 63 65 "select uri from posts " 64 66 "order by temperature * exp( "
+5 -3
feeds/rapidfire.py
··· 2 2 import sys 3 3 import sqlite3 4 4 5 + from . import BaseFeed 6 + 5 7 MAX_TEXT_LENGTH = 140 6 8 7 - class RapidFireFeed: 9 + class RapidFireFeed(BaseFeed): 8 10 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire' 9 11 10 12 def __init__(self): ··· 25 27 26 28 self.checkpoint = 0 27 29 28 - def process(self, commit): 30 + def process_commit(self, commit): 29 31 op = commit['op'] 30 32 if op['action'] != 'create': 31 33 return ··· 65 67 sys.stdout.flush() 66 68 self.db_cnx.execute("pragma wal_checkpoint(TRUNCATE)") 67 69 68 - def serve(self, limit, offset, langs): 70 + def serve_feed(self, limit, offset, langs): 69 71 if '*' in langs: 70 72 cur = self.db_cnx.execute( 71 73 "select uri from posts order by create_ts desc limit :limit offset :offset",
+3 -3
feedweb.py
··· 1 1 #!/usr/bin/env python3 2 2 3 - from feeds import Manager 3 + from feeds import FeedManager 4 4 from feeds.rapidfire import RapidFireFeed 5 5 from feeds.popular import PopularFeed 6 6 from flask import Flask, request, jsonify ··· 9 9 10 10 @app.route('/xrpc/app.bsky.feed.getFeedSkeleton') 11 11 def get_feed_skeleton(): 12 - manager = Manager() 12 + manager = FeedManager() 13 13 manager.register(RapidFireFeed) 14 14 manager.register(PopularFeed) 15 15 ··· 29 29 feed_uri = request.args['feed'] 30 30 31 31 langs = request.accept_languages 32 - posts = manager.serve(feed_uri, limit, offset, langs) 32 + posts = manager.serve_feed(feed_uri, limit, offset, langs) 33 33 offset += len(posts) 34 34 35 35 return dict(cursor=str(offset), feed=[dict(post=uri) for uri in posts])
+23
firehose_manager.py
··· 1 + import apsw 2 + 3 + class FirehoseManager: 4 + def __init__(self, fname='firehose.db'): 5 + self.db_cnx = apsw.Connection(fname) 6 + self.db_cnx.pragma('journal_mode', 'WAL') 7 + with self.db_cnx: 8 + self.db_cnx.execute("create table if not exists firehose(key text unique, value text)") 9 + 10 + def get_sequence_number(self): 11 + cur = self.db_cnx.execute("select * from firehose where key = 'seq'") 12 + row = cur.fetchone() 13 + if row is None: 14 + return None 15 + (key, value) = row 16 + return int(value) 17 + 18 + def set_sequence_number(self, value): 19 + with self.db_cnx: 20 + self.db_cnx.execute( 21 + "insert into firehose (key, value) values ('seq', :value) on conflict(key) do update set value = :value", 22 + dict(value=value) 23 + )
-21
firehose_utils.py
··· 6 6 from atproto import CAR 7 7 from io import BytesIO 8 8 9 - class FirehoseManager: 10 - def __init__(self, fname='firehose.db'): 11 - self.db_cnx = apsw.Connection(fname) 12 - with self.db_cnx: 13 - self.db_cnx.execute("create table if not exists firehose(key text unique, value text)") 14 - 15 - def get_sequence_number(self): 16 - cur = self.db_cnx.execute("select * from firehose where key = 'seq'") 17 - row = cur.fetchone() 18 - if row is None: 19 - return None 20 - (key, value) = row 21 - return int(value) 22 - 23 - def set_sequence_number(self, value): 24 - with self.db_cnx: 25 - self.db_cnx.execute( 26 - "insert into firehose (key, value) values ('seq', :value) on conflict(key) do update set value = :value", 27 - dict(value=value) 28 - ) 29 - 30 9 async def bsky_activity(): 31 10 redis_cnx = redis.Redis() 32 11 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'