this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Remove firehose-checkpoint.py

-41
-41
firehose-checkpoint.py
··· 1 - #!/usr/bin/env python3 2 - 3 - import asyncio 4 - import dag_cbor 5 - import redis 6 - import sys 7 - import websockets 8 - from datetime import datetime, timezone 9 - from io import BytesIO 10 - 11 - async def main(): 12 - redis_cnx = redis.Redis() 13 - relay_url = 'ws://127.0.0.1:9060' 14 - sys.stdout.write(f'opening websocket connection to {relay_url}\n') 15 - sys.stdout.flush() 16 - 17 - async with websockets.connect(relay_url, ping_timeout=None) as firehose: 18 - current_minute = None 19 - while True: 20 - frame = BytesIO(await firehose.recv()) 21 - header = dag_cbor.decode(frame, allow_concat=True) 22 - if header['op'] != 1: 23 - continue 24 - 25 - # checkpoint the seq 26 - now = datetime.now(timezone.utc) 27 - if now.time().minute != current_minute: 28 - current_minute = now.time().minute 29 - 30 - payload = dag_cbor.decode(frame) 31 - payload_seq = payload['seq'] 32 - payload_time = datetime.strptime(payload['time'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc) 33 - payload_lag = now - payload_time 34 - 35 - redis_cnx.set('bsky-tools:firehose:subscribe-repos:seq', payload_seq) 36 - sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n') 37 - 38 - sys.stdout.flush() 39 - 40 - if __name__ == '__main__': 41 - asyncio.run(main())