this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

bsky-activity.py: go back to bsky.network for now

+40 -2
+2 -2
bsky-activity.py
··· 6 6 import sqlite3 7 7 import sys 8 8 from datetime import datetime, timezone 9 - from firehose_utils import subscribe_commits 9 + from firehose_utils import subscribe_commits, bsky_activity 10 10 11 11 app_bsky_allowlist = set([ 12 12 'app.bsky.actor.profile', ··· 46 46 sys.stdout.flush() 47 47 48 48 op_count = 0 49 - async for commit, op in subscribe_commits(): 49 + async for commit, op in bsky_activity(): 50 50 if op['action'] != 'create': 51 51 continue 52 52
+38
firehose_utils.py
··· 1 1 import dag_cbor 2 + import redis 2 3 import sys 3 4 import websockets 4 5 from atproto import CAR ··· 35 36 op['cid'] = op['cid'].encode('base32') 36 37 op['record'] = car_parsed.blocks[op['cid']] 37 38 yield message, op 39 + 40 + async def bsky_activity(): 41 + redis_cnx = redis.Redis() 42 + relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 43 + firehose_seq = redis_cnx.get('dev.edavis.muninsky.seq') 44 + if firehose_seq: 45 + relay_url += f'?cursor={firehose_seq.decode()}' 46 + 47 + sys.stdout.write(f'opening websocket connection to {relay_url}\n') 48 + sys.stdout.flush() 49 + 50 + async with websockets.connect(relay_url, ping_timeout=None) as firehose: 51 + while True: 52 + frame = BytesIO(await firehose.recv()) 53 + header = dag_cbor.decode(frame, allow_concat=True) 54 + if header['op'] != 1 or header['t'] != '#commit': 55 + continue 56 + 57 + payload = dag_cbor.decode(frame) 58 + if payload['tooBig']: 59 + # TODO(ejd): figure out how to get blocks out-of-band 60 + continue 61 + 62 + # TODO(ejd): figure out how to validate blocks 63 + blocks = payload.pop('blocks') 64 + car_parsed = CAR.from_bytes(blocks) 65 + 66 + message = payload.copy() 67 + del message['ops'] 68 + message['commit'] = message['commit'].encode('base32') 69 + 70 + for commit_op in payload['ops']: 71 + op = commit_op.copy() 72 + if op['cid'] is not None: 73 + op['cid'] = op['cid'].encode('base32') 74 + op['record'] = car_parsed.blocks[op['cid']] 75 + yield message, op