···11+package main
22+33+import (
44+ "context"
55+ "database/sql"
66+ _ "embed"
77+ "encoding/json"
88+ "log"
99+ "os/signal"
1010+ "syscall"
1111+ "time"
1212+1313+ jetstream "github.com/bluesky-social/jetstream/pkg/models"
1414+ "github.com/gorilla/websocket"
1515+ _ "github.com/mattn/go-sqlite3"
1616+)
1717+1818+type CheckpointResults struct {
1919+ Blocked int
2020+ Pages int
2121+ Transferred int
2222+}
2323+2424+var AppBskyAllowlist = map[string]bool{
2525+ "app.bsky.actor.profile": true,
2626+ "app.bsky.feed.generator": true,
2727+ "app.bsky.feed.like": true,
2828+ "app.bsky.feed.post": true,
2929+ "app.bsky.feed.postgate": true,
3030+ "app.bsky.feed.repost": true,
3131+ "app.bsky.feed.threadgate": true,
3232+ "app.bsky.graph.block": true,
3333+ "app.bsky.graph.follow": true,
3434+ "app.bsky.graph.list": true,
3535+ "app.bsky.graph.listblock": true,
3636+ "app.bsky.graph.listitem": true,
3737+ "app.bsky.graph.starterpack": true,
3838+ "app.bsky.labeler.service": true,
3939+ "chat.bsky.actor.declaration": true,
4040+}
4141+4242+// const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe`
4343+4444+const JetstreamUrl = `ws://localhost:6008/subscribe` // TODO(ejd): attach a reconnect cursor
4545+4646+const userTimestampUpdate = `insert into users (did, ts) values (?, ?) on conflict (did) do update set ts = ?`
4747+4848+//go:embed schema.sql
4949+var ddl string
5050+5151+func handler(ctx context.Context, events <-chan []byte, dbCnx *sql.DB) {
5252+ if _, err := dbCnx.ExecContext(ctx, ddl); err != nil {
5353+ log.Printf("could not create tables: %v\n", err)
5454+ }
5555+ if _, err := dbCnx.ExecContext(ctx, "PRAGMA wal_autocheckpoint = 0"); err != nil {
5656+ log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err)
5757+ }
5858+5959+ var (
6060+ dbTx *sql.Tx
6161+ err error
6262+ eventCount int
6363+ )
6464+6565+ for evt := range events {
6666+ if dbTx == nil {
6767+ dbTx, err = dbCnx.BeginTx(ctx, nil)
6868+ if err != nil {
6969+ log.Printf("failed to begin transaction: %v\n", err)
7070+ }
7171+ }
7272+7373+ var event jetstream.Event
7474+ if err := json.Unmarshal(evt, &event); err != nil {
7575+ continue
7676+ }
7777+7878+ if event.Kind != jetstream.EventKindCommit {
7979+ continue
8080+ }
8181+ if event.Commit.Operation != jetstream.CommitOperationCreate {
8282+ // we're missing deletes and updates but this matches how bsky-activity
8383+ // does it so we stay consistent
8484+ continue
8585+ }
8686+8787+ did := event.Did
8888+ commit := *event.Commit
8989+ ts := time.Now().UTC().Unix()
9090+9191+ if _, ok := AppBskyAllowlist[commit.Collection]; !ok {
9292+ continue
9393+ }
9494+9595+ dbTx.ExecContext(ctx, userTimestampUpdate, did, ts, ts)
9696+9797+ eventCount += 1
9898+ if eventCount%1000 == 0 {
9999+ if err := dbTx.Commit(); err != nil {
100100+ log.Printf("commit failed: %v\n")
101101+ }
102102+103103+ var results CheckpointResults
104104+ err := dbCnx.QueryRowContext(ctx, "PRAGMA wal_checkpoint(RESTART)").Scan(&results.Blocked, &results.Pages, &results.Transferred)
105105+ switch {
106106+ case err != nil:
107107+ log.Printf("failed checkpoint: %v\n", err)
108108+ case results.Blocked == 1:
109109+ log.Printf("checkpoint: blocked\n")
110110+ case results.Pages == results.Transferred:
111111+ log.Printf("checkpoint: %d pages transferred\n", results.Transferred)
112112+ case results.Pages != results.Transferred:
113113+ log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred)
114114+ }
115115+116116+ dbTx, err = dbCnx.BeginTx(ctx, nil)
117117+ if err != nil {
118118+ log.Printf("failed to begin transaction: %v\n", err)
119119+ }
120120+ }
121121+ }
122122+}
123123+124124+func main() {
125125+ ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
126126+ defer stop()
127127+128128+ conn, _, err := websocket.DefaultDialer.Dial(JetstreamUrl, nil)
129129+ if err != nil {
130130+ log.Fatalf("failed to open websocket: %v\n", err)
131131+ }
132132+ defer func() {
133133+ if err := conn.Close(); err != nil {
134134+ log.Printf("failed to close websocket: %v\n", err)
135135+ }
136136+ log.Printf("websocket closed\n")
137137+ }()
138138+139139+ // TODO(ejd): use more readable URL params for this
140140+ dbCnx, err := sql.Open("sqlite3", "data/bsky-users.db?_journal=WAL&_fk=on&_timeout=5000&_sync=1&_txlock=immediate")
141141+ if err != nil {
142142+ log.Fatalf("failed to open database: %v\n", err)
143143+ }
144144+ defer func() {
145145+ if _, err := dbCnx.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil {
146146+ log.Printf("error doing final WAL checkpoint: %v\n", err)
147147+ }
148148+ if err := dbCnx.Close(); err != nil {
149149+ log.Printf("failed to close db: %v\n", err)
150150+ }
151151+ log.Printf("db closed\n")
152152+ }()
153153+154154+ jetstreamEvents := make(chan []byte)
155155+ go handler(ctx, jetstreamEvents, dbCnx)
156156+157157+ log.Printf("starting up\n")
158158+ go func() {
159159+ for {
160160+ _, message, err := conn.ReadMessage()
161161+ if err != nil {
162162+ stop()
163163+ }
164164+ jetstreamEvents <- message
165165+ }
166166+ }()
167167+168168+ <-ctx.Done()
169169+ log.Printf("shutting down\n")
170170+}
+3
cmd/bsky-users/schema.sql
···11+CREATE TABLE IF NOT EXISTS users (did TEXT, ts TIMESTAMP);
22+CREATE UNIQUE INDEX IF NOT EXISTS did_idx on users(did);
33+CREATE INDEX IF NOT EXISTS ts_idx on users(ts);