this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Migrate to websocat

+36 -77
+4 -3
bsky-activity.py
··· 1 1 #!/usr/bin/env python3 2 2 3 + import asyncio 3 4 import os 4 5 import redis 5 6 import sqlite3 ··· 22 23 'app.bsky.labeler.service', 23 24 ]) 24 25 25 - def main(): 26 + async def main(): 26 27 redis_cnx = redis.Redis() 27 28 redis_pipe = redis_cnx.pipeline() 28 29 ··· 45 46 sys.stdout.flush() 46 47 47 48 op_count = 0 48 - for commit, op in subscribe_commits(redis_cnx): 49 + async for commit, op in subscribe_commits(): 49 50 if op['action'] != 'create': 50 51 continue 51 52 ··· 77 78 sys.stdout.flush() 78 79 79 80 if __name__ == '__main__': 80 - main() 81 + asyncio.run(main())
-46
firehose.py
··· 1 - #!/usr/bin/env python3 2 - 3 - import asyncio 4 - import dag_cbor 5 - import redis 6 - import sys 7 - import websockets 8 - from datetime import datetime, timezone 9 - from io import BytesIO 10 - 11 - async def main(): 12 - redis_cnx = redis.Redis() 13 - relay_url = 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos' 14 - firehose_seq = redis_cnx.get('bsky-tools:firehose:subscribe-repos:seq') 15 - if firehose_seq: 16 - relay_url += f'?cursor={firehose_seq.decode()}' 17 - 18 - sys.stdout.write(f'opening websocket to {relay_url}\n') 19 - sys.stdout.flush() 20 - 21 - async with websockets.connect(relay_url, ping_timeout=None) as firehose: 22 - current_minute = None 23 - while True: 24 - frame = BytesIO(await firehose.recv()) 25 - header = dag_cbor.decode(frame, allow_concat=True) 26 - if header['op'] != 1: 27 - continue 28 - 29 - redis_cnx.publish('bsky-tools:firehose:stream', frame.getvalue()) 30 - 31 - # checkpoint the seq 32 - now = datetime.now(timezone.utc) 33 - if now.time().minute != current_minute: 34 - current_minute = now.time().minute 35 - 36 - payload = dag_cbor.decode(frame) 37 - payload_seq = payload['seq'] 38 - payload_time = datetime.strptime(payload['time'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc) 39 - payload_lag = now - payload_time 40 - 41 - redis_cnx.set('bsky-tools:firehose:subscribe-repos:seq', payload_seq) 42 - sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n') 43 - sys.stdout.flush() 44 - 45 - if __name__ == '__main__': 46 - asyncio.run(main())
+3
firehose.sh
··· 1 + #!/usr/bin/env bash 2 + 3 + websocat -v -bE ws-l:127.0.0.1:9060 wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos
+28 -27
firehose_utils.py
··· 1 1 import dag_cbor 2 - import redis 2 + import sys 3 + import websockets 3 4 from atproto import CAR 4 5 from io import BytesIO 5 6 6 - def subscribe_commits(redis_cnx=None): 7 - if redis_cnx is None: 8 - redis_cnx = redis.Redis() 9 - redis_sub = redis_cnx.pubsub(ignore_subscribe_messages=True) 10 - redis_sub.subscribe('bsky-tools:firehose:stream') 7 + async def subscribe_commits(): 8 + relay_url = 'ws://localhost:9060' 9 + sys.stdout.write(f'opening websocket connection to {relay_url}\n') 10 + sys.stdout.flush() 11 11 12 - for event in redis_sub.listen(): 13 - frame = BytesIO(event['data']) 14 - header = dag_cbor.decode(frame, allow_concat=True) 15 - if header['op'] != 1 or header['t'] != '#commit': 16 - continue 12 + async with websockets.connect(relay_url, ping_timeout=None) as firehose: 13 + while True: 14 + frame = BytesIO(await firehose.recv()) 15 + header = dag_cbor.decode(frame, allow_concat=True) 16 + if header['op'] != 1 or header['t'] != '#commit': 17 + continue 17 18 18 - payload = dag_cbor.decode(frame) 19 - if payload['tooBig']: 20 - # TODO(ejd): figure out how to get blocks out-of-band 21 - continue 19 + payload = dag_cbor.decode(frame) 20 + if payload['tooBig']: 21 + # TODO(ejd): figure out how to get blocks out-of-band 22 + continue 22 23 23 - # TODO(ejd): figure out how to validate blocks 24 - blocks = payload.pop('blocks') 25 - car_parsed = CAR.from_bytes(blocks) 24 + # TODO(ejd): figure out how to validate blocks 25 + blocks = payload.pop('blocks') 26 + car_parsed = CAR.from_bytes(blocks) 26 27 27 - message = payload.copy() 28 - del message['ops'] 29 - message['commit'] = message['commit'].encode('base32') 28 + message = payload.copy() 29 + del message['ops'] 30 + message['commit'] = message['commit'].encode('base32') 30 31 31 - for commit_op in payload['ops']: 32 - op = commit_op.copy() 33 - if op['cid'] is not None: 34 - op['cid'] = op['cid'].encode('base32') 35 - op['record'] = car_parsed.blocks[op['cid']] 36 - yield message, op 32 + for commit_op in payload['ops']: 33 + op = commit_op.copy() 34 + if op['cid'] is not None: 35 + op['cid'] = op['cid'].encode('base32') 36 + op['record'] = car_parsed.blocks[op['cid']] 37 + yield message, op
+1 -1
service/bsky-firehose.service
··· 6 6 Type=simple 7 7 User=eric 8 8 WorkingDirectory=/home/eric/bsky-tools 9 - ExecStart=/home/eric/.local/bin/pipenv run ./firehose.py 9 + ExecStart=./firehose.sh 10 10 TimeoutSec=15 11 11 Restart=on-failure 12 12 RestartSec=1