this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add FirehoseManager

+29 -10
+7 -10
feedgen.py
··· 2 2 3 3 import asyncio 4 4 import dag_cbor 5 - import redis 6 5 import sys 7 6 import websockets 8 7 from atproto import CAR ··· 12 11 from feeds.rapidfire import RapidFireFeed 13 12 from feeds.popular import PopularFeed 14 13 14 + from firehose_utils import FirehoseManager 15 + 15 16 async def firehose_events(): 16 - redis_cnx = redis.Redis() 17 + firehose_manager = FirehoseManager() 18 + 17 19 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 18 - firehose_seq = redis_cnx.get('dev.edavis.feedgen.seq') 19 - if firehose_seq: 20 - relay_url += f'?cursor={firehose_seq.decode()}' 20 + seq = firehose_manager.get_sequence_number() 21 + if seq: 22 + relay_url += f'?cursor={seq}' 21 23 22 24 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 23 25 sys.stdout.flush() 24 26 25 27 async with websockets.connect(relay_url, ping_timeout=None) as firehose: 26 - op_count = 0 27 28 while True: 28 29 frame = BytesIO(await firehose.recv()) 29 30 header = dag_cbor.decode(frame, allow_concat=True) ··· 47 48 repo_op['record'] = car_parsed.blocks[repo_op['cid']] 48 49 message['op'] = repo_op 49 50 yield message 50 - 51 - op_count += 1 52 - if op_count % 500 == 0: 53 - redis_cnx.set('dev.edavis.feedgen.seq', payload['seq']) 54 51 55 52 async def main(): 56 53 manager = Manager()
+22
firehose_utils.py
··· 1 + import apsw 1 2 import dag_cbor 2 3 import redis 3 4 import sys 4 5 import websockets 5 6 from atproto import CAR 6 7 from io import BytesIO 8 + 9 + class FirehoseManager: 10 + def __init__(self, fname='firehose.db'): 11 + self.db_cnx = apsw.Connection(fname) 12 + with self.db_cnx: 13 + self.db_cnx.execute("create table if not exists firehose(key text unique, value text)") 14 + 15 + def get_sequence_number(self): 16 + cur = self.db_cnx.execute("select * from firehose where key = 'seq'") 17 + row = cur.fetchone() 18 + if row is None: 19 + return None 20 + (key, value) = row 21 + return int(value) 22 + 23 + def set_sequence_number(self, value): 24 + with self.db_cnx: 25 + self.db_cnx.execute( 26 + "insert into firehose (key, value) values ('seq', :value) on conflict(key) do update set value = :value", 27 + dict(value=value) 28 + ) 7 29 8 30 async def bsky_activity(): 9 31 redis_cnx = redis.Redis()