this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add checkpointer

+62 -2
+40
firehose-checkpoint.py
··· 1 + #!/usr/bin/env python3 2 + 3 + import asyncio 4 + import dag_cbor 5 + import redis 6 + import sys 7 + import websockets 8 + from datetime import datetime, timezone 9 + from io import BytesIO 10 + 11 + async def main(): 12 + redis_cnx = redis.Redis() 13 + relay_url = 'ws://127.0.0.1:9060' 14 + sys.stdout.write(f'opening websocket connection to {relay_url}\n') 15 + sys.stdout.flush() 16 + 17 + async with websockets.connect(relay_url, ping_timeout=None) as firehose: 18 + current_minute = None 19 + while True: 20 + frame = BytesIO(await firehose.recv()) 21 + header = dag_cbor.decode(frame, allow_concat=True) 22 + if header['op'] != 1: 23 + continue 24 + 25 + # checkpoint the seq 26 + now = datetime.now(timezone.utc) 27 + if now.time().minute != current_minute: 28 + current_minute = now.time().minute 29 + 30 + payload = dag_cbor.decode(frame) 31 + payload_seq = payload['seq'] 32 + payload_time = datetime.strptime(payload['time'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc) 33 + payload_lag = now - payload_time 34 + 35 + redis_cnx.set('bsky-tools:firehose:subscribe-repos:seq', payload_seq) 36 + sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n') 37 + sys.stdout.flush() 38 + 39 + if __name__ == '__main__': 40 + asyncio.run(main())
+5 -1
firehose.sh
··· 1 1 #!/usr/bin/env bash 2 2 3 - websocat -v -bE ws-l:127.0.0.1:9060 wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos 3 + cursor="$(redis-cli <<<'get bsky-tools:firehose:subscribe-repos:seq')" 4 + 5 + websocat -v -bE \ 6 + ws-l:127.0.0.1:9060 \ 7 + "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos?cursor=${cursor}"
+1 -1
firehose_utils.py
··· 5 5 from io import BytesIO 6 6 7 7 async def subscribe_commits(): 8 - relay_url = 'ws://localhost:9060' 8 + relay_url = 'ws://127.0.0.1:9060' 9 9 sys.stdout.write(f'opening websocket connection to {relay_url}\n') 10 10 sys.stdout.flush() 11 11
+16
service/firehose-checkpoint.service
··· 1 + [Unit] 2 + Description=Bsky Firehose Checkpoint 3 + After=network.target syslog.target 4 + 5 + [Service] 6 + Type=simple 7 + User=eric 8 + WorkingDirectory=/home/eric/bsky-tools 9 + ExecStart=/home/eric/.local/bin/pipenv run ./firehose-checkpoint.py 10 + TimeoutSec=15 11 + Restart=on-failure 12 + RestartSec=1 13 + StandardOutput=journal 14 + 15 + [Install] 16 + WantedBy=multi-user.target