···4545 sys.stdout.flush()
46464747 op_count = 0
4848- for commit, op in subscribe_commits(redis_cnx):
4848+ for commit, op in subscribe_commits(redis_cnx, parse_car_blocks=False):
4949 if op['action'] != 'create':
5050 continue
5151
+5-3
firehose_utils.py
···33from atproto import CAR
44from io import BytesIO
5566-def subscribe_commits(redis_cnx=None):
66+def subscribe_commits(redis_cnx=None, parse_car_blocks=False):
77 if redis_cnx is None:
88 redis_cnx = redis.Redis()
99 redis_sub = redis_cnx.pubsub(ignore_subscribe_messages=True)
···22222323 # TODO(ejd): figure out how to validate blocks
2424 blocks = payload.pop('blocks')
2525- car_parsed = CAR.from_bytes(blocks)
2525+ if parse_car_blocks:
2626+ car_parsed = CAR.from_bytes(blocks)
26272728 message = payload.copy()
2829 del message['ops']
···3233 op = commit_op.copy()
3334 if op['cid'] is not None:
3435 op['cid'] = op['cid'].encode('base32')
3535- op['record'] = car_parsed.blocks[op['cid']]
3636+ if parse_car_blocks:
3737+ op['record'] = car_parsed.blocks[op['cid']]
3638 yield message, op