···11-# jetstream
22-33-[](https://hex.pm/packages/jetstream)
44-[](https://hexdocs.pm/jetstream/)
55-66-```sh
77-gleam add jetstream@1
88-```
99-```gleam
1010-import jetstream
1111-1212-pub fn main() -> Nil {
1313- // TODO: An example of the project in use
1414-}
1515-```
1616-1717-Further documentation can be found at <https://hexdocs.pm/jetstream>.
1818-1919-## Development
2020-2121-```sh
2222-gleam run # Run the project
2323-gleam test # Run the tests
2424-```
-23
jetstream/gleam.toml
···11-name = "jetstream"
22-version = "1.0.0"
33-44-# Fill out these fields if you intend to generate HTML documentation or publish
55-# your project to the Hex package manager.
66-#
77-# description = ""
88-# licences = ["Apache-2.0"]
99-# repository = { type = "github", user = "", repo = "" }
1010-# links = [{ title = "Website", href = "" }]
1111-#
1212-# For a full reference of all the available options, you can have a look at
1313-# https://gleam.run/writing-gleam/gleam-toml/.
1414-1515-[dependencies]
1616-gleam_stdlib = ">= 0.44.0 and < 2.0.0"
1717-gleam_erlang = ">= 1.0.0 and < 2.0.0"
1818-gleam_http = ">= 4.0.0 and < 5.0.0"
1919-gleam_json = ">= 3.0.2 and < 4.0.0"
2020-gun = ">= 2.2.0 and < 3.0.0"
2121-2222-[dev-dependencies]
2323-gleeunit = ">= 1.0.0 and < 2.0.0"
-20
jetstream/manifest.toml
···11-# This file was generated by Gleam
22-# You typically do not need to edit this file
33-44-packages = [
55- { name = "cowlib", version = "2.16.0", build_tools = ["make", "rebar3"], requirements = [], otp_app = "cowlib", source = "hex", outer_checksum = "7F478D80D66B747344F0EA7708C187645CFCC08B11AA424632F78E25BF05DB51" },
66- { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" },
77- { name = "gleam_http", version = "4.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_http", source = "hex", outer_checksum = "82EA6A717C842456188C190AFB372665EA56CE13D8559BF3B1DD9E40F619EE0C" },
88- { name = "gleam_json", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "874FA3C3BB6E22DD2BB111966BD40B3759E9094E05257899A7C08F5DE77EC049" },
99- { name = "gleam_stdlib", version = "0.65.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "7C69C71D8C493AE11A5184828A77110EB05A7786EBF8B25B36A72F879C3EE107" },
1010- { name = "gleeunit", version = "1.6.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "FDC68A8C492B1E9B429249062CD9BAC9B5538C6FBF584817205D0998C42E1DAC" },
1111- { name = "gun", version = "2.2.0", build_tools = ["make", "rebar3"], requirements = ["cowlib"], otp_app = "gun", source = "hex", outer_checksum = "76022700C64287FEB4DF93A1795CFF6741B83FB37415C40C34C38D2A4645261A" },
1212-]
1313-1414-[requirements]
1515-gleam_erlang = { version = ">= 1.0.0 and < 2.0.0" }
1616-gleam_http = { version = ">= 4.0.0 and < 5.0.0" }
1717-gleam_json = { version = ">= 3.0.2 and < 4.0.0" }
1818-gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" }
1919-gleeunit = { version = ">= 1.0.0 and < 2.0.0" }
2020-gun = { version = ">= 2.2.0 and < 3.0.0" }
-233
jetstream/src/jetstream.gleam
···11-import gleam/dynamic.{type Dynamic}
22-import gleam/dynamic/decode
33-import gleam/erlang/process.{type Pid}
44-import gleam/io
55-import gleam/json
66-import gleam/list
77-import gleam/option.{type Option}
88-import gleam/string
99-1010-/// Jetstream event types
1111-pub type JetstreamEvent {
1212- CommitEvent(did: String, time_us: Int, commit: CommitData)
1313- IdentityEvent(did: String, time_us: Int, identity: IdentityData)
1414- AccountEvent(did: String, time_us: Int, account: AccountData)
1515- UnknownEvent(raw: String)
1616-}
1717-1818-pub type CommitData {
1919- CommitData(
2020- rev: String,
2121- operation: String,
2222- collection: String,
2323- rkey: String,
2424- record: Option(Dynamic),
2525- cid: Option(String),
2626- )
2727-}
2828-2929-pub type IdentityData {
3030- IdentityData(did: String, handle: String, seq: Int, time: String)
3131-}
3232-3333-pub type AccountData {
3434- AccountData(active: Bool, did: String, seq: Int, time: String)
3535-}
3636-3737-/// Configuration for Jetstream consumer
3838-pub type JetstreamConfig {
3939- JetstreamConfig(
4040- endpoint: String,
4141- wanted_collections: List(String),
4242- wanted_dids: List(String),
4343- )
4444-}
4545-4646-/// Create a default configuration for US East endpoint
4747-pub fn default_config() -> JetstreamConfig {
4848- JetstreamConfig(
4949- endpoint: "wss://jetstream2.us-east.bsky.network/subscribe",
5050- wanted_collections: [],
5151- wanted_dids: [],
5252- )
5353-}
5454-5555-/// Build the WebSocket URL with query parameters
5656-pub fn build_url(config: JetstreamConfig) -> String {
5757- let base = config.endpoint
5858- let mut_params = []
5959-6060- // Add wanted collections (each as a separate query parameter)
6161- let mut_params = case config.wanted_collections {
6262- [] -> mut_params
6363- collections -> {
6464- let collection_params =
6565- list.map(collections, fn(col) { "wantedCollections=" <> col })
6666- list.append(collection_params, mut_params)
6767- }
6868- }
6969-7070- // Add wanted DIDs (each as a separate query parameter)
7171- let mut_params = case config.wanted_dids {
7272- [] -> mut_params
7373- dids -> {
7474- let did_params = list.map(dids, fn(did) { "wantedDids=" <> did })
7575- list.append(did_params, mut_params)
7676- }
7777- }
7878-7979- case mut_params {
8080- [] -> base
8181- params -> base <> "?" <> string.join(list.reverse(params), "&")
8282- }
8383-}
8484-8585-/// Connect to Jetstream WebSocket using Erlang gun library
8686-@external(erlang, "jetstream_ws_ffi", "connect")
8787-pub fn connect(url: String, handler_pid: Pid) -> Result(Pid, Dynamic)
8888-8989-/// Start consuming the Jetstream feed
9090-pub fn start_consumer(
9191- config: JetstreamConfig,
9292- on_event: fn(String) -> Nil,
9393-) -> Nil {
9494- let url = build_url(config)
9595- io.println("🔗 Jetstream URL: " <> url)
9696- let self = process.self()
9797- let result = connect(url, self)
9898-9999- case result {
100100- Ok(_conn_pid) -> {
101101- receive_loop(on_event)
102102- }
103103- Error(err) -> {
104104- io.println("Failed to connect to Jetstream")
105105- io.println_error(string.inspect(err))
106106- }
107107- }
108108-}
109109-110110-/// Receive loop for WebSocket messages
111111-fn receive_loop(on_event: fn(String) -> Nil) -> Nil {
112112- // Call Erlang to receive one message
113113- case receive_ws_message() {
114114- Ok(text) -> {
115115- on_event(text)
116116- receive_loop(on_event)
117117- }
118118- Error(_) -> {
119119- // Timeout or error, continue loop
120120- receive_loop(on_event)
121121- }
122122- }
123123-}
124124-125125-/// Receive a WebSocket message from the message queue
126126-@external(erlang, "jetstream_ffi", "receive_ws_message")
127127-fn receive_ws_message() -> Result(String, Nil)
128128-129129-/// Parse a JSON event string into a JetstreamEvent
130130-pub fn parse_event(json_string: String) -> JetstreamEvent {
131131- // Try to parse as commit event first
132132- case json.parse(json_string, commit_event_decoder()) {
133133- Ok(event) -> event
134134- Error(_) -> {
135135- // Try identity event
136136- case json.parse(json_string, identity_event_decoder()) {
137137- Ok(event) -> event
138138- Error(_) -> {
139139- // Try account event
140140- case json.parse(json_string, account_event_decoder()) {
141141- Ok(event) -> event
142142- Error(_) -> UnknownEvent(json_string)
143143- }
144144- }
145145- }
146146- }
147147- }
148148-}
149149-150150-/// Decoder for commit events
151151-fn commit_event_decoder() {
152152- use did <- decode.field("did", decode.string)
153153- use time_us <- decode.field("time_us", decode.int)
154154- use commit <- decode.field("commit", commit_data_decoder())
155155- decode.success(CommitEvent(did: did, time_us: time_us, commit: commit))
156156-}
157157-158158-/// Decoder for commit data - handles both create/update (with record) and delete (without)
159159-fn commit_data_decoder() {
160160- // Try decoder with record and cid fields first (for create/update)
161161- // If that fails, try without (for delete)
162162- decode.one_of(commit_with_record_decoder(), or: [
163163- commit_without_record_decoder(),
164164- ])
165165-}
166166-167167-/// Decoder for commit with record (create/update operations)
168168-fn commit_with_record_decoder() {
169169- use rev <- decode.field("rev", decode.string)
170170- use operation <- decode.field("operation", decode.string)
171171- use collection <- decode.field("collection", decode.string)
172172- use rkey <- decode.field("rkey", decode.string)
173173- use record <- decode.field("record", decode.dynamic)
174174- use cid <- decode.field("cid", decode.string)
175175- decode.success(CommitData(
176176- rev: rev,
177177- operation: operation,
178178- collection: collection,
179179- rkey: rkey,
180180- record: option.Some(record),
181181- cid: option.Some(cid),
182182- ))
183183-}
184184-185185-/// Decoder for commit without record (delete operations)
186186-fn commit_without_record_decoder() {
187187- use rev <- decode.field("rev", decode.string)
188188- use operation <- decode.field("operation", decode.string)
189189- use collection <- decode.field("collection", decode.string)
190190- use rkey <- decode.field("rkey", decode.string)
191191- decode.success(CommitData(
192192- rev: rev,
193193- operation: operation,
194194- collection: collection,
195195- rkey: rkey,
196196- record: option.None,
197197- cid: option.None,
198198- ))
199199-}
200200-201201-/// Decoder for identity events
202202-fn identity_event_decoder() {
203203- use did <- decode.field("did", decode.string)
204204- use time_us <- decode.field("time_us", decode.int)
205205- use identity <- decode.field("identity", identity_data_decoder())
206206- decode.success(IdentityEvent(did: did, time_us: time_us, identity: identity))
207207-}
208208-209209-/// Decoder for identity data
210210-fn identity_data_decoder() {
211211- use did <- decode.field("did", decode.string)
212212- use handle <- decode.field("handle", decode.string)
213213- use seq <- decode.field("seq", decode.int)
214214- use time <- decode.field("time", decode.string)
215215- decode.success(IdentityData(did: did, handle: handle, seq: seq, time: time))
216216-}
217217-218218-/// Decoder for account events
219219-fn account_event_decoder() {
220220- use did <- decode.field("did", decode.string)
221221- use time_us <- decode.field("time_us", decode.int)
222222- use account <- decode.field("account", account_data_decoder())
223223- decode.success(AccountEvent(did: did, time_us: time_us, account: account))
224224-}
225225-226226-/// Decoder for account data
227227-fn account_data_decoder() {
228228- use active <- decode.field("active", decode.bool)
229229- use did <- decode.field("did", decode.string)
230230- use seq <- decode.field("seq", decode.int)
231231- use time <- decode.field("time", decode.string)
232232- decode.success(AccountData(active: active, did: did, seq: seq, time: time))
233233-}