AT Protocol Terminal Interface Explorer
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

initial jetstream view

+297 -36
+73
at/jetsream.go
··· 1 + package at 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + 7 + jetstream "github.com/bluesky-social/jetstream/pkg/client" 8 + "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 9 + "github.com/bluesky-social/jetstream/pkg/models" 10 + ) 11 + 12 + type JetStreamClient struct { 13 + sched jetstream.Scheduler 14 + log *slog.Logger 15 + out chan *models.Event 16 + err chan error 17 + } 18 + 19 + func NewJetstreamClient() *JetStreamClient { 20 + log := slog.Default() 21 + c := &JetStreamClient{ 22 + log: log, 23 + out: make(chan *models.Event, 512), 24 + err: make(chan error, 1), 25 + } 26 + scheduler := sequential.NewScheduler("jetstream", slog.Default(), c.handleEvent) 27 + c.sched = scheduler 28 + return c 29 + 30 + } 31 + 32 + func (c *JetStreamClient) Start(ctx context.Context, cxs, dids []string, cursor *int64) { 33 + config := &jetstream.ClientConfig{ 34 + WebsocketURL: "wss://jetstream1.us-west.bsky.network/subscribe", 35 + Compress: false, 36 + WantedDids: dids, 37 + WantedCollections: cxs, 38 + ExtraHeaders: map[string]string{ 39 + "User-Agent": "attie/0.0.1", 40 + }, 41 + } 42 + jc, err := jetstream.NewClient(config, c.log, c.sched) 43 + if err != nil { 44 + c.err <- err 45 + return 46 + } 47 + 48 + c.err <- jc.ConnectAndRead(ctx, cursor) 49 + } 50 + 51 + func (c *JetStreamClient) Out() <-chan *models.Event { 52 + return c.out 53 + } 54 + func (c *JetStreamClient) Err() <-chan error { 55 + return c.err 56 + } 57 + 58 + func (c *JetStreamClient) handleEvent(ctx context.Context, ev *models.Event) error { 59 + slog.Info("Received event", "did", ev.Did, "kind", ev.Kind) 60 + if ev.Commit == nil { 61 + slog.Info("skipping non commit event ", "did", ev.Did, "kind", ev.Kind) 62 + return nil 63 + } 64 + select { 65 + case <-ctx.Done(): 66 + return ctx.Err() 67 + case c.out <- ev: 68 + return nil 69 + default: 70 + slog.Warn("deopped event", "did", ev.Did, "kind", ev.Kind) 71 + } 72 + return nil 73 + }
+15 -12
go.mod
··· 4 4 5 5 require ( 6 6 github.com/bluesky-social/indigo v0.0.0-20260213232405-1286ca7a7cb2 7 + github.com/bluesky-social/jetstream v0.0.0-20260121001058-f4e39a4b5bbc 7 8 github.com/charmbracelet/bubbles v1.0.0 8 9 github.com/charmbracelet/bubbletea v1.3.10 9 10 github.com/charmbracelet/lipgloss v1.1.0 10 - github.com/sirupsen/logrus v1.9.4 11 11 ) 12 12 13 13 require ( 14 14 github.com/atotto/clipboard v0.1.4 // indirect 15 15 github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect 16 16 github.com/beorn7/perks v1.0.1 // indirect 17 - github.com/cespare/xxhash/v2 v2.2.0 // indirect 17 + github.com/cespare/xxhash/v2 v2.3.0 // indirect 18 18 github.com/charmbracelet/colorprofile v0.4.1 // indirect 19 19 github.com/charmbracelet/x/ansi v0.11.6 // indirect 20 20 github.com/charmbracelet/x/cellbuf v0.0.15 // indirect ··· 24 24 github.com/clipperhouse/uax29/v2 v2.5.0 // indirect 25 25 github.com/earthboundkid/versioninfo/v2 v2.24.1 // indirect 26 26 github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect 27 - github.com/google/go-cmp v0.6.0 // indirect 27 + github.com/goccy/go-json v0.10.2 // indirect 28 + github.com/gorilla/websocket v1.5.1 // indirect 28 29 github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect 29 30 github.com/ipfs/go-cid v0.4.1 // indirect 31 + github.com/klauspost/compress v1.17.9 // indirect 30 32 github.com/klauspost/cpuid/v2 v2.2.7 // indirect 31 33 github.com/lucasb-eyer/go-colorful v1.3.0 // indirect 32 34 github.com/mattn/go-isatty v0.0.20 // indirect 33 35 github.com/mattn/go-localereader v0.0.1 // indirect 34 36 github.com/mattn/go-runewidth v0.0.19 // indirect 35 - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect 36 37 github.com/minio/sha256-simd v1.0.1 // indirect 37 38 github.com/mr-tron/base58 v1.2.0 // indirect 38 39 github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect ··· 43 44 github.com/multiformats/go-multibase v0.2.0 // indirect 44 45 github.com/multiformats/go-multihash v0.2.3 // indirect 45 46 github.com/multiformats/go-varint v0.0.7 // indirect 46 - github.com/prometheus/client_golang v1.17.0 // indirect 47 - github.com/prometheus/client_model v0.5.0 // indirect 48 - github.com/prometheus/common v0.45.0 // indirect 49 - github.com/prometheus/procfs v0.12.0 // indirect 47 + github.com/prometheus/client_golang v1.19.1 // indirect 48 + github.com/prometheus/client_model v0.6.1 // indirect 49 + github.com/prometheus/common v0.54.0 // indirect 50 + github.com/prometheus/procfs v0.15.1 // indirect 50 51 github.com/rivo/uniseg v0.4.7 // indirect 51 52 github.com/sahilm/fuzzy v0.1.1 // indirect 52 53 github.com/spaolacci/murmur3 v1.1.0 // indirect ··· 54 55 github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect 55 56 gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b // indirect 56 57 gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 // indirect 57 - golang.org/x/crypto v0.21.0 // indirect 58 + go.uber.org/atomic v1.11.0 // indirect 59 + golang.org/x/crypto v0.22.0 // indirect 60 + golang.org/x/net v0.24.0 // indirect 58 61 golang.org/x/sys v0.38.0 // indirect 59 - golang.org/x/text v0.14.0 // indirect 60 - golang.org/x/time v0.3.0 // indirect 62 + golang.org/x/text v0.16.0 // indirect 63 + golang.org/x/time v0.5.0 // indirect 61 64 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect 62 - google.golang.org/protobuf v1.33.0 // indirect 65 + google.golang.org/protobuf v1.34.2 // indirect 63 66 lukechampine.com/blake3 v1.2.1 // indirect 64 67 )
+32 -24
go.sum
··· 8 8 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= 9 9 github.com/bluesky-social/indigo v0.0.0-20260213232405-1286ca7a7cb2 h1:q/dijVJ+cA17e2qmJZPNuB7anByq1W6+uYJr1D9gfto= 10 10 github.com/bluesky-social/indigo v0.0.0-20260213232405-1286ca7a7cb2/go.mod h1:VG/LeqLGNI3Ew7lsYixajnZGFfWPv144qbUddh+Oyag= 11 - github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= 12 - github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 11 + github.com/bluesky-social/jetstream v0.0.0-20260121001058-f4e39a4b5bbc h1:PhAg2bDYjOGxMTgBe4VgwrTyeEVRX+8R997TSoT15QM= 12 + github.com/bluesky-social/jetstream v0.0.0-20260121001058-f4e39a4b5bbc/go.mod h1:vt8kVRKtvrBspt9G38wDD8+BotjIMO8u8IYoVnyE4zY= 13 + github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= 14 + github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 13 15 github.com/charmbracelet/bubbles v1.0.0 h1:12J8/ak/uCZEMQ6KU7pcfwceyjLlWsDLAxB5fXonfvc= 14 16 github.com/charmbracelet/bubbles v1.0.0/go.mod h1:9d/Zd5GdnauMI5ivUIVisuEm3ave1XwXtD1ckyV6r3E= 15 17 github.com/charmbracelet/bubbletea v1.3.10 h1:otUDHWMMzQSB0Pkc87rm691KZ3SWa4KUlvF9nRvCICw= ··· 38 40 github.com/earthboundkid/versioninfo/v2 v2.24.1/go.mod h1:VcWEooDEuyUJnMfbdTh0uFN4cfEIg+kHMuWB2CDCLjw= 39 41 github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f h1:Y/CXytFA4m6baUTXGLOoWe4PQhGxaX0KpnayAqC48p4= 40 42 github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97MGsxSvLiUE2X8qFplwetxpGLQrlU1Q9AUEIzCaM= 43 + github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= 44 + github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= 41 45 github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= 42 46 github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= 47 + github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= 48 + github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= 43 49 github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= 44 50 github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= 45 51 github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= 46 52 github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= 53 + github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= 54 + github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= 47 55 github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= 48 56 github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= 49 57 github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= ··· 56 64 github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88= 57 65 github.com/mattn/go-runewidth v0.0.19 h1:v++JhqYnZuu5jSKrk9RbgF5v4CGUjqRfBm05byFGLdw= 58 66 github.com/mattn/go-runewidth v0.0.19/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs= 59 - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= 60 - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= 61 67 github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= 62 68 github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= 63 69 github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= ··· 80 86 github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= 81 87 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 82 88 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 83 - github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= 84 - github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= 85 - github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= 86 - github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= 87 - github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= 88 - github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= 89 - github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= 90 - github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= 89 + github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= 90 + github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= 91 + github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= 92 + github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= 93 + github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= 94 + github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ= 95 + github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= 96 + github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= 91 97 github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= 92 98 github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= 93 99 github.com/sahilm/fuzzy v0.1.1 h1:ceu5RHF8DGgoi+/dR5PsECjCDH1BE3Fnmpo7aVXOdRA= 94 100 github.com/sahilm/fuzzy v0.1.1/go.mod h1:VFvziUEIMCrT6A6tw2RFIXPXXmzXbOsSHF0DOI8ZK9Y= 95 - github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w= 96 - github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC42RjbV2g= 97 101 github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= 98 102 github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= 99 103 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= ··· 106 110 gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b/go.mod h1:/y/V339mxv2sZmYYR64O07VuCpdNZqCTwO8ZcouTMI8= 107 111 gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 h1:qwDnMxjkyLmAFgcfgTnfJrmYKWhHnci3GjDqcZp1M3Q= 108 112 gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02/go.mod h1:JTnUj0mpYiAsuZLmKjTx/ex3AtMowcCgnE7YNyCEP0I= 109 - golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= 110 - golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= 111 - golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= 112 - golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= 113 + go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= 114 + go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= 115 + golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= 116 + golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= 117 + golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 h1:LoYXNGAShUG3m/ehNk4iFctuhGX/+R1ZpfJ4/ia80JM= 118 + golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= 119 + golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= 120 + golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= 113 121 golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 114 122 golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 115 123 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 116 124 golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= 117 125 golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= 118 - golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= 119 - golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= 120 - golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= 121 - golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= 126 + golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= 127 + golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= 128 + golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= 129 + golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= 122 130 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= 123 131 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= 124 - google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= 125 - google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= 132 + google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= 133 + google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= 126 134 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= 127 135 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= 128 136 lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI=
+23
ui/app.go
··· 32 32 spinner spinner.Model 33 33 loading bool 34 34 actx *AppContext 35 + 36 + jetstream *JetStreamView 35 37 } 36 38 37 39 func NewApp(query string) *App { ··· 39 41 repoView := NewRepoView() 40 42 spin := spinner.New() 41 43 spin.Spinner = spinner.Dot 44 + 45 + jc := at.NewJetstreamClient() 46 + jv := NewJetStreamView(jc) 42 47 return &App{ 43 48 query: query, 44 49 client: at.NewClient(""), ··· 50 55 spinner: spin, 51 56 loading: false, 52 57 actx: &AppContext{}, 58 + jetstream: jv, 53 59 } 54 60 } 55 61 ··· 113 119 a.active = a.search 114 120 a.search.loading = false 115 121 return a, a.search.Init() 122 + case "ctrl+j": 123 + a.active = a.jetstream 124 + if a.jetstream.Running() { 125 + return a, a.jetstream.Stop() 126 + } else { 127 + cxs := []string{} 128 + dids := []string{} 129 + if a.actx.collection != "" { 130 + cxs = append(cxs, a.actx.collection) 131 + } 132 + if a.actx.identity != nil { 133 + dids = append(dids, a.actx.identity.DID.String()) 134 + } 135 + return a, a.jetstream.Start(cxs, dids, nil) 136 + } 116 137 case "esc": 117 138 switch a.active { 118 139 case a.repoView: ··· 134 155 } 135 156 a.active = a.rlist 136 157 return a, nil 158 + case a.jetstream: 159 + return a, a.jetstream.Stop() 137 160 } 138 161 } 139 162
+154
ui/jetstream.go
··· 1 + package ui 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + 8 + "github.com/bluesky-social/jetstream/pkg/models" 9 + "github.com/charmbracelet/bubbles/list" 10 + tea "github.com/charmbracelet/bubbletea" 11 + "github.com/charmbracelet/lipgloss" 12 + "github.com/treethought/attie/at" 13 + ) 14 + 15 + var ( 16 + opStyle = lipgloss.NewStyle().Foreground(lipgloss.Color("205")) 17 + didStyle = lipgloss.NewStyle().Foreground(lipgloss.Color("42")) 18 + ) 19 + 20 + type jetEventItem struct { 21 + evt *models.Event 22 + } 23 + 24 + func (j jetEventItem) FilterValue() string { 25 + return "" 26 + } 27 + func (j jetEventItem) Title() string { 28 + return fmt.Sprintf("%s %s %s", 29 + j.evt.Commit.Operation, j.evt.Commit.Collection, j.evt.Commit.RKey, 30 + ) 31 + } 32 + 33 + func (j jetEventItem) Description() string { 34 + return fmt.Sprintf("%s - %s", j.evt.Did, j.evt.TimeUS) 35 + } 36 + 37 + type eventMsg struct { 38 + evt *models.Event 39 + } 40 + 41 + type jetStreamErrorMsg struct { 42 + err error 43 + } 44 + 45 + type JetStreamView struct { 46 + list list.Model 47 + jc *at.JetStreamClient 48 + ctx context.Context 49 + cancel context.CancelFunc 50 + } 51 + 52 + func NewJetStreamView(jc *at.JetStreamClient) *JetStreamView { 53 + del := list.DefaultDelegate{ 54 + ShowDescription: true, 55 + Styles: list.NewDefaultItemStyles(), 56 + } 57 + del.SetHeight(2) 58 + 59 + l := list.New(nil, del, 80, 20) 60 + l.SetShowTitle(false) 61 + l.SetShowStatusBar(false) 62 + l.SetFilteringEnabled(false) 63 + return &JetStreamView{ 64 + list: l, 65 + jc: jc, 66 + } 67 + } 68 + 69 + func (m *JetStreamView) Listen() tea.Cmd { 70 + return func() tea.Msg { 71 + slog.Info("Listening for JetStream events...") 72 + select { 73 + case err := <-m.jc.Err(): 74 + slog.Error("JetStream client error", "error", err) 75 + return jetStreamErrorMsg{err: err} 76 + case evt := <-m.jc.Out(): 77 + slog.Info("Received JetStream event", "did", evt.Did, "kind", evt.Kind) 78 + return eventMsg{evt: evt} 79 + } 80 + } 81 + } 82 + 83 + func (m *JetStreamView) AddEvent(evt *models.Event) tea.Cmd { 84 + item := jetEventItem{evt: evt} 85 + return m.list.InsertItem(0, item) 86 + } 87 + 88 + func (m *JetStreamView) Running() bool { 89 + return m.ctx != nil 90 + } 91 + func (m *JetStreamView) Clear() tea.Cmd { 92 + return func() tea.Msg { 93 + m.list.SetItems(nil) 94 + return nil 95 + } 96 + } 97 + 98 + func (m *JetStreamView) Start(cxs, dids []string, cursor *int64) tea.Cmd { 99 + if m.ctx != nil { 100 + slog.Warn("JetStream client already running") 101 + return nil 102 + } 103 + m.ctx, m.cancel = context.WithCancel(context.Background()) 104 + slog.Info("Starting JetStream client", "collections", cxs, "dids", dids, "cursor", cursor) 105 + go m.jc.Start(m.ctx, cxs, dids, cursor) 106 + return m.Listen() 107 + } 108 + func (m *JetStreamView) Stop() tea.Cmd { 109 + if m.cancel != nil { 110 + slog.Info("Stopping JetStream client") 111 + m.cancel() 112 + m.ctx = nil 113 + } 114 + return nil 115 + } 116 + 117 + func (m *JetStreamView) Init() tea.Cmd { 118 + return nil 119 + } 120 + func (m *JetStreamView) SetSize(w, h int) { 121 + m.list.SetSize(w, h) 122 + } 123 + 124 + func (m *JetStreamView) Update(msg tea.Msg) (tea.Model, tea.Cmd) { 125 + switch msg := msg.(type) { 126 + 127 + // case jetStreamStartMsg: 128 + // return m, m.Start(msg.cxs, msg.dids, msg.cur) 129 + // 130 + // case jetStreamStopMsg: 131 + // m.Stop() 132 + // return m, nil 133 + 134 + case jetStreamErrorMsg: 135 + slog.Error("JetStream client error", "error", msg.err) 136 + return m, nil 137 + 138 + case eventMsg: 139 + return m, tea.Batch( 140 + m.AddEvent(msg.evt), 141 + m.Listen(), 142 + ) 143 + } 144 + l, cmd := m.list.Update(msg) 145 + m.list = l 146 + return m, cmd 147 + } 148 + 149 + func (m *JetStreamView) View() string { 150 + if m.ctx == nil { 151 + return dimStyle.Render("JetStream client not running") 152 + } 153 + return m.list.View() 154 + }