this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Clear out dead code

+1 -33
+1 -1
bsky-activity.py
··· 6 6 import sqlite3 7 7 import sys 8 8 from datetime import datetime, timezone 9 - from firehose_utils import subscribe_commits, bsky_activity 9 + from firehose_utils import bsky_activity 10 10 11 11 app_bsky_allowlist = set([ 12 12 'app.bsky.actor.profile',
-32
firehose_utils.py
··· 5 5 from atproto import CAR 6 6 from io import BytesIO 7 7 8 - async def subscribe_commits(): 9 - relay_url = 'ws://127.0.0.1:9060' 10 - sys.stdout.write(f'opening websocket connection to {relay_url}\n') 11 - sys.stdout.flush() 12 - 13 - async with websockets.connect(relay_url, ping_timeout=None) as firehose: 14 - while True: 15 - frame = BytesIO(await firehose.recv()) 16 - header = dag_cbor.decode(frame, allow_concat=True) 17 - if header['op'] != 1 or header['t'] != '#commit': 18 - continue 19 - 20 - payload = dag_cbor.decode(frame) 21 - if payload['tooBig']: 22 - # TODO(ejd): figure out how to get blocks out-of-band 23 - continue 24 - 25 - # TODO(ejd): figure out how to validate blocks 26 - blocks = payload.pop('blocks') 27 - car_parsed = CAR.from_bytes(blocks) 28 - 29 - message = payload.copy() 30 - del message['ops'] 31 - message['commit'] = message['commit'].encode('base32') 32 - 33 - for commit_op in payload['ops']: 34 - op = commit_op.copy() 35 - if op['cid'] is not None: 36 - op['cid'] = op['cid'].encode('base32') 37 - op['record'] = car_parsed.blocks[op['cid']] 38 - yield message, op 39 - 40 8 async def bsky_activity(): 41 9 redis_cnx = redis.Redis() 42 10 relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'