Code and data for arewedecentralizedyet.online and related projects
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

More robust error handling

+18 -3
+18 -3
data-fetchers/at-mau-watcher/async-track-accounts-firehose.py
··· 457 457 458 458 async with aiohttp.ClientSession() as session: 459 459 if args.relay: 460 - client = AsyncFirehoseSubscribeReposClient(base_uri=args.relay) 461 460 print(f"Using relay: {args.relay}") 462 461 else: 463 - client = AsyncFirehoseSubscribeReposClient() 464 462 print("Using default relay from atproto library") 465 463 466 464 # Start resolution workers ··· 474 472 # Background task: periodic snapshot (no stdout prints) 475 473 asyncio.create_task(periodic_snapshot(snapshot_path, snapshot_interval)) 476 474 475 + reconnect_delay = 1 476 + max_reconnect_delay = 60 477 477 try: 478 - await client.start(on_message) 478 + while True: 479 + try: 480 + if args.relay: 481 + client = AsyncFirehoseSubscribeReposClient(base_uri=args.relay) 482 + else: 483 + client = AsyncFirehoseSubscribeReposClient() 484 + 485 + await client.start(on_message) 486 + print("Firehose client stopped; reconnecting...") 487 + except asyncio.CancelledError: 488 + raise 489 + except Exception as exc: 490 + print(f"Firehose client error: {exc}; reconnecting in {reconnect_delay}s") 491 + await async_save_snapshot(snapshot_path, verbose=True) 492 + await asyncio.sleep(reconnect_delay) 493 + reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay) 479 494 finally: 480 495 # On exit, save one last snapshot 481 496 await async_save_snapshot(snapshot_path, verbose=True)