···11+#!/usr/bin/env python3
22+33+import dag_cbor
44+import os
55+import redis
66+import sqlite3
77+import sys
88+from datetime import datetime, timezone
99+from firehose_utils import commit_ops
1010+from io import BytesIO
1111+1212+app_bsky_allowlist = set([
1313+ 'app.bsky.actor.profile',
1414+ 'app.bsky.feed.generator',
1515+ 'app.bsky.feed.like',
1616+ 'app.bsky.feed.post',
1717+ 'app.bsky.feed.repost',
1818+ 'app.bsky.feed.threadgate',
1919+ 'app.bsky.graph.block',
2020+ 'app.bsky.graph.follow',
2121+ 'app.bsky.graph.list',
2222+ 'app.bsky.graph.listblock',
2323+ 'app.bsky.graph.listitem',
2424+ 'app.bsky.labeler.service',
2525+])
2626+2727+def main():
2828+ redis_cnx = redis.Redis()
2929+ redis_pipe = redis_cnx.pipeline()
3030+ redis_sub = redis_cnx.pubsub(ignore_subscribe_messages=True)
3131+3232+ db_fname = '/opt/muninsky/users.db'
3333+ db_fname = 'users.db'
3434+3535+ db_cnx = sqlite3.connect(db_fname)
3636+ with db_cnx:
3737+ db_cnx.executescript("""
3838+ PRAGMA journal_mode = WAL;
3939+ PRAGMA synchronous = off;
4040+ CREATE TABLE IF NOT EXISTS users (did TEXT, ts TIMESTAMP);
4141+ CREATE UNIQUE INDEX IF NOT EXISTS did_idx on users(did);
4242+ CREATE INDEX IF NOT EXISTS ts_idx on users(ts);
4343+ """)
4444+4545+ op_count = 0
4646+ redis_sub.subscribe('bsky-tools:firehose:stream')
4747+ for event in redis_sub.listen():
4848+ frame = BytesIO(event['data'])
4949+ header = dag_cbor.decode(frame, allow_concat=True)
5050+ if header['op'] != 1 or header['t'] != '#commit':
5151+ continue
5252+5353+ payload = dag_cbor.decode(frame)
5454+ if payload['tooBig']:
5555+ # TODO(ejd): how handle these?
5656+ continue
5757+5858+ for op in commit_ops(payload):
5959+ if op['action'] != 'create':
6060+ continue
6161+6262+ collection, _ = op['path'].split('/')
6363+ if collection not in app_bsky_allowlist:
6464+ continue
6565+6666+ repo_did = payload['repo']
6767+ ts = datetime.now(timezone.utc).timestamp()
6868+ db_cnx.execute(
6969+ 'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts',
7070+ {'did': repo_did, 'ts': ts}
7171+ )
7272+7373+ redis_pipe \
7474+ .incr(collection) \
7575+ .incr('dev.edavis.muninsky.ops')
7676+7777+ op_count += 1
7878+ if op_count % 500 == 0:
7979+ payload_seq = payload['seq']
8080+ sys.stdout.write(f'checkpoint: seq: {payload_seq}\n')
8181+ redis_pipe.set('dev.edavis.muninsky.seq', payload_seq)
8282+ redis_pipe.execute()
8383+ db_cnx.commit()
8484+ sys.stdout.flush()
8585+8686+if __name__ == '__main__':
8787+ main()
+11
firehose_utils.py
···11+from atproto import CAR
22+33+def commit_ops(payload):
44+ # TODO(ejd): figure out how to validate blocks
55+ car_parsed = CAR.from_bytes(payload['blocks'])
66+ for op in payload['ops']:
77+ repo_op = op.copy()
88+ if op['cid'] is not None:
99+ repo_op['cid'] = op['cid'].encode('base32')
1010+ repo_op['record'] = car_parsed.blocks[repo_op['cid']]
1111+ yield repo_op