···8899## [Unreleased]
10101111-### Breaking Change
1111+### Breaking Changes
12121313- Existing behaviour moved to `Drinkup.Firehose` namespace, to make way for
1414 alternate sync systems.
1515+1616+### Changed
1717+1818+- Refactor core connection logic for websockets into `Drinkup.Socket` to make it
1919+ easy to use across multiple different services.
15201621## [0.1.0] - 2025-05-26
1722
+63-8
lib/firehose/options.ex
···11defmodule Drinkup.Firehose.Options do
22+ @moduledoc """
33+ Configuration options for ATProto Firehose relay subscriptions.
44+55+ This module defines the configuration structure for connecting to and
66+ consuming events from an ATProto Firehose relay. The Firehose streams
77+ real-time repository events from the AT Protocol network.
88+99+ ## Options
1010+1111+ - `:consumer` (required) - Module implementing `Drinkup.Firehose.Consumer` behaviour
1212+ - `:name` - Unique name for this Firehose instance in the supervision tree (default: `Drinkup.Firehose`)
1313+ - `:host` - Firehose relay URL (default: `"https://bsky.network"`)
1414+ - `:cursor` - Optional sequence number to resume streaming from
1515+1616+ ## Example
1717+1818+ %{
1919+ consumer: MyFirehoseConsumer,
2020+ name: MyFirehose,
2121+ host: "https://bsky.network",
2222+ cursor: 12345
2323+ }
2424+ """
2525+226 use TypedStruct
327428 @default_host "https://bsky.network"
5293030+ @typedoc """
3131+ Map of configuration options accepted by `Drinkup.Firehose.child_spec/1`.
3232+ """
633 @type options() :: %{
77- required(:consumer) => module(),
88- optional(:name) => atom(),
99- optional(:host) => String.t(),
1010- optional(:cursor) => pos_integer()
3434+ required(:consumer) => consumer(),
3535+ optional(:name) => name(),
3636+ optional(:host) => host(),
3737+ optional(:cursor) => cursor()
1138 }
12394040+ @typedoc """
4141+ Module implementing the `Drinkup.Firehose.Consumer` behaviour.
4242+ """
4343+ @type consumer() :: module()
4444+4545+ @typedoc """
4646+ Unique identifier for this Firehose instance in the supervision tree.
4747+4848+ Used for Registry lookups and naming child processes.
4949+ """
5050+ @type name() :: atom()
5151+5252+ @typedoc """
5353+ HTTP/HTTPS URL of the ATProto Firehose relay.
5454+5555+ Defaults to `"https://bsky.network"` which is the public Bluesky relay.
5656+ """
5757+ @type host() :: String.t()
5858+5959+ @typedoc """
6060+ Optional sequence number to resume streaming from.
6161+6262+ When provided, the Firehose will replay events starting from this sequence
6363+ number. Useful for resuming after a restart without missing events. The
6464+ cursor is automatically tracked and updated as events are received.
6565+ """
6666+ @type cursor() :: pos_integer() | nil
6767+1368 typedstruct do
1414- field :consumer, module(), enforce: true
1515- field :name, atom(), default: Drinkup
1616- field :host, String.t(), default: @default_host
1717- field :cursor, pos_integer() | nil
6969+ field :consumer, consumer(), enforce: true
7070+ field :name, name(), default: Drinkup.Firehose
7171+ field :host, host(), default: @default_host
7272+ field :cursor, cursor()
1873 end
19742075 @spec from(options()) :: t()
+40-119
lib/firehose/socket.ex
···11defmodule Drinkup.Firehose.Socket do
22 @moduledoc """
33- gen_statem process for managing the websocket connection to an ATProto relay.
33+ WebSocket connection handler for ATProto relay subscriptions.
44+55+ Implements the Drinkup.Socket behaviour to manage connections to an ATProto
66+ Firehose relay, handling CAR/CBOR-encoded frames and dispatching events to
77+ the configured consumer.
48 """
99+1010+ use Drinkup.Socket
511612 require Logger
713 alias Drinkup.Firehose.{Event, Options}
81499- @behaviour :gen_statem
1010- @timeout :timer.seconds(5)
1111- # TODO: `flow` determines messages in buffer. Determine ideal value?
1212- @flow 10
1313-1415 @op_regular 1
1516 @op_error -1
1616-1717- defstruct [:options, :seq, :conn, :stream]
18171918 @impl true
2020- def callback_mode, do: [:state_functions, :state_enter]
2121-2222- def child_spec(opts) do
2323- %{
2424- id: __MODULE__,
2525- start: {__MODULE__, :start_link, [opts, []]},
2626- type: :worker,
2727- restart: :permanent,
2828- shutdown: 500
2929- }
1919+ def init(opts) do
2020+ options = Keyword.fetch!(opts, :options)
2121+ {:ok, %{seq: options.cursor, options: options, host: options.host}}
3022 end
31233224 def start_link(%Options{} = options, statem_opts) do
3333- :gen_statem.start_link(__MODULE__, options, statem_opts)
3434- end
3535-3636- @impl true
3737- def init(%{cursor: seq} = options) do
3838- data = %__MODULE__{seq: seq, options: options}
3939- {:ok, :disconnected, data, [{:next_event, :internal, :connect}]}
4040- end
4141-4242- def disconnected(:enter, _from, data) do
4343- Logger.debug("Initial connection")
4444- # TODO: differentiate between initial & reconnects, probably stuff to do with seq
4545- {:next_state, :disconnected, data}
4646- end
2525+ # Build opts for Drinkup.Socket from Options struct
2626+ socket_opts = [
2727+ host: options.host,
2828+ cursor: options.cursor,
2929+ options: options
3030+ ]
47314848- def disconnected(:internal, :connect, data) do
4949- {:next_state, :connecting_http, data}
3232+ Drinkup.Socket.start_link(__MODULE__, socket_opts, statem_opts)
5033 end
51345252- def connecting_http(:enter, _from, %{options: options} = data) do
5353- Logger.debug("Connecting to http")
5454-5555- %{host: host, port: port} = URI.new!(options.host)
5656-5757- {:ok, conn} =
5858- :gun.open(:binary.bin_to_list(host), port, %{
5959- retry: 0,
6060- protocols: [:http],
6161- connect_timeout: @timeout,
6262- domain_lookup_timeout: @timeout,
6363- tls_handshake_timeout: @timeout,
6464- tls_opts: [
6565- verify: :verify_peer,
6666- cacerts: :certifi.cacerts(),
6767- depth: 3,
6868- customize_hostname_check: [
6969- match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
7070- ]
7171- ]
7272- })
7373-7474- {:keep_state, %{data | conn: conn}, [{:state_timeout, @timeout, :connect_timeout}]}
3535+ @impl true
3636+ def build_path(%{seq: seq}) do
3737+ cursor_param = if seq, do: %{cursor: seq}, else: %{}
3838+ "/xrpc/com.atproto.sync.subscribeRepos?" <> URI.encode_query(cursor_param)
7539 end
76407777- def connecting_http(:info, {:gun_up, _conn, :http}, data) do
7878- {:next_state, :connecting_ws, data}
7979- end
8080-8181- def connecting_http(:state_timeout, :connect_timeout, _data) do
8282- {:stop, :connect_http_timeout}
8383- end
8484-8585- def connecting_ws(:enter, _from, %{conn: conn, seq: seq} = data) do
8686- Logger.debug("Upgrading connection to websocket")
8787- path = "/xrpc/com.atproto.sync.subscribeRepos?" <> URI.encode_query(%{cursor: seq})
8888- stream = :gun.ws_upgrade(conn, path, [], %{flow: @flow})
8989- {:keep_state, %{data | stream: stream}, [{:state_timeout, @timeout, :upgrade_timeout}]}
9090- end
9191-9292- def connecting_ws(:info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, data) do
9393- {:next_state, :connected, data}
9494- end
9595-9696- def connecting_ws(:state_timeout, :upgrade_timeout, _data) do
9797- {:stop, :connect_ws_timeout}
9898- end
9999-100100- def connected(:enter, _from, _data) do
101101- Logger.debug("Connected to websocket")
102102- :keep_state_and_data
103103- end
104104-105105- def connected(:info, {:gun_ws, conn, stream, {:binary, frame}}, %{options: options} = data) do
106106- # TODO: let clients specify a handler for raw* (*decoded) packets to support any atproto subscription
107107- # Will also need support for JSON frames
4141+ @impl true
4242+ def handle_frame({:binary, frame}, %{seq: seq, options: options} = data) do
10843 with {:ok, header, next} <- CAR.DagCbor.decode(frame),
10944 {:ok, payload, _} <- CAR.DagCbor.decode(next),
11045 {%{"op" => @op_regular, "t" => type}, _} <- {header, payload},
111111- true <- Event.valid_seq?(data.seq, payload["seq"]) do
112112- data = %{data | seq: payload["seq"] || data.seq}
113113- message = Event.from(type, payload)
114114- :ok = :gun.update_flow(conn, stream, @flow)
4646+ true <- Event.valid_seq?(seq, payload["seq"]) do
4747+ new_seq = payload["seq"] || seq
11548116116- case message do
4949+ case Event.from(type, payload) do
11750 nil ->
11851 Logger.warning("Received unrecognised event from firehose: #{inspect({type, payload})}")
11952···12154 Event.dispatch(message, options)
12255 end
12356124124- {:keep_state, data}
5757+ {:ok, %{data | seq: new_seq}}
12558 else
12659 false ->
12760 Logger.error("Got out of sequence or invalid `seq` from Firehose")
128128- {:keep_state, data}
6161+ :noop
1296213063 {%{"op" => @op_error, "t" => type}, payload} ->
13164 Logger.error("Got error from Firehose: #{inspect({type, payload})}")
132132- {:keep_state, data}
6565+ :noop
1336613467 {:error, reason} ->
13568 Logger.warning("Failed to decode frame from Firehose: #{inspect(reason)}")
136136- {:keep_state, data}
6969+ :noop
13770 end
13871 end
13972140140- def connected(:info, {:gun_ws, _conn, _stream, :close}, _data) do
7373+ @impl true
7474+ def handle_frame(:close, _data) do
14175 Logger.info("Websocket closed, reason unknown")
142142- {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
7676+ nil
14377 end
14478145145- def connected(:info, {:gun_ws, _conn, _stream, {:close, errno, reason}}, _data) do
7979+ @impl true
8080+ def handle_frame({:close, errno, reason}, _data) do
14681 Logger.info("Websocket closed, errno: #{errno}, reason: #{inspect(reason)}")
147147- {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
8282+ nil
14883 end
14984150150- def connected(:info, {:gun_down, old_conn, _proto, _reason, _killed_streams}, %{conn: new_conn})
151151- when old_conn != new_conn do
152152- Logger.debug("Ignoring received :gun_down for a previous connection.")
153153- :keep_state_and_data
154154- end
155155-156156- def connected(:info, {:gun_down, _conn, _proto, _reason, _killed_streams}, _data) do
157157- Logger.info("Websocket connection killed. Attempting to reconnect")
158158- {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
159159- end
160160-161161- def connected(:internal, :reconnect, %{conn: conn} = data) do
162162- :ok = :gun.close(conn)
163163- :ok = :gun.flush(conn)
164164-165165- # TODO: reconnect backoff
166166- {:next_state, :disconnected, %{data | conn: nil, stream: nil},
167167- [{:next_event, :internal, :connect}]}
8585+ @impl true
8686+ def handle_frame({:text, _text}, _data) do
8787+ Logger.warning("Received unexpected text frame from Firehose")
8888+ :noop
16889 end
16990end
+341
lib/socket.ex
···11+defmodule Drinkup.Socket do
22+ # TODO: talk about how to implment, but that it's for internal use
33+ @moduledoc false
44+55+ require Logger
66+77+ @behaviour :gen_statem
88+99+ @type frame ::
1010+ {:binary, binary()}
1111+ | {:text, String.t()}
1212+ | :close
1313+ | {:close, errno :: integer(), reason :: binary()}
1414+1515+ @type user_data :: term()
1616+1717+ @type reconnect_strategy ::
1818+ :exponential
1919+ | {:exponential, max_backoff :: pos_integer()}
2020+ | {:custom, (attempt :: pos_integer() -> delay_ms :: pos_integer())}
2121+2222+ @type option ::
2323+ {:host, String.t()}
2424+ | {:flow, pos_integer()}
2525+ | {:timeout, pos_integer()}
2626+ | {:tls_opts, keyword()}
2727+ | {:gun_opts, map()}
2828+ | {:reconnect_strategy, reconnect_strategy()}
2929+ | {atom(), term()}
3030+3131+ @callback init(opts :: keyword()) :: {:ok, user_data()} | {:error, reason :: term()}
3232+3333+ @callback build_path(data :: user_data()) :: String.t()
3434+3535+ @callback handle_frame(frame :: frame(), data :: user_data()) ::
3636+ {:ok, new_data :: user_data()} | :noop | nil | {:error, reason :: term()}
3737+3838+ @callback handle_connected(data :: user_data()) :: {:ok, new_data :: user_data()}
3939+4040+ @callback handle_disconnected(reason :: term(), data :: user_data()) ::
4141+ {:ok, new_data :: user_data()}
4242+4343+ @optional_callbacks handle_connected: 1, handle_disconnected: 2
4444+4545+ defstruct [
4646+ :module,
4747+ :user_data,
4848+ :options,
4949+ :conn,
5050+ :stream,
5151+ reconnect_attempts: 0
5252+ ]
5353+5454+ defmacro __using__(_opts) do
5555+ quote do
5656+ @behaviour Drinkup.Socket
5757+5858+ def start_link(opts, statem_opts \\ [])
5959+6060+ def start_link(opts, statem_opts) do
6161+ Drinkup.Socket.start_link(__MODULE__, opts, statem_opts)
6262+ end
6363+6464+ defoverridable start_link: 2
6565+6666+ def child_spec(opts) do
6767+ %{
6868+ id: __MODULE__,
6969+ start: {__MODULE__, :start_link, [opts, []]},
7070+ type: :worker,
7171+ restart: :permanent,
7272+ shutdown: 500
7373+ }
7474+ end
7575+7676+ defoverridable child_spec: 1
7777+7878+ @impl true
7979+ def handle_connected(data), do: {:ok, data}
8080+8181+ @impl true
8282+ def handle_disconnected(_reason, data), do: {:ok, data}
8383+8484+ defoverridable handle_connected: 1, handle_disconnected: 2
8585+ end
8686+ end
8787+8888+ @impl true
8989+ def callback_mode, do: [:state_functions, :state_enter]
9090+9191+ @doc """
9292+ Start a WebSocket connection process.
9393+9494+ ## Parameters
9595+9696+ * `module` - The module implementing the Drinkup.Socket behaviour
9797+ * `opts` - Keyword list of options (see module documentation)
9898+ * `statem_opts` - Options passed to `:gen_statem.start_link/3`
9999+ """
100100+ def start_link(module, opts, statem_opts) do
101101+ :gen_statem.start_link(__MODULE__, {module, opts}, statem_opts)
102102+ end
103103+104104+ @impl true
105105+ def init({module, opts}) do
106106+ case module.init(opts) do
107107+ {:ok, user_data} ->
108108+ options = parse_options(opts)
109109+110110+ data = %__MODULE__{
111111+ module: module,
112112+ user_data: user_data,
113113+ options: options,
114114+ reconnect_attempts: 0
115115+ }
116116+117117+ {:ok, :disconnected, data, [{:next_event, :internal, :connect}]}
118118+119119+ {:error, reason} ->
120120+ {:stop, {:init_failed, reason}}
121121+ end
122122+ end
123123+124124+ # :disconnected state - waiting to connect or reconnect
125125+126126+ def disconnected(:enter, _from, _data) do
127127+ Logger.debug("[Drinkup.Socket] Entering disconnected state")
128128+ :keep_state_and_data
129129+ end
130130+131131+ def disconnected(:internal, :connect, data) do
132132+ {:next_state, :connecting_http, data}
133133+ end
134134+135135+ def disconnected(:timeout, :reconnect, data) do
136136+ {:next_state, :connecting_http, data}
137137+ end
138138+139139+ # :connecting_http state - establishing HTTP connection with TLS
140140+141141+ def connecting_http(:enter, _from, %{options: options} = data) do
142142+ Logger.debug("[Drinkup.Socket] Connecting to HTTP")
143143+144144+ %{host: host, port: port} = URI.new!(options.host)
145145+146146+ gun_opts =
147147+ Map.merge(
148148+ %{
149149+ retry: 0,
150150+ protocols: [:http],
151151+ connect_timeout: options.timeout,
152152+ domain_lookup_timeout: options.timeout,
153153+ tls_handshake_timeout: options.timeout,
154154+ tls_opts: options.tls_opts
155155+ },
156156+ options.gun_opts
157157+ )
158158+159159+ case :gun.open(:binary.bin_to_list(host), port, gun_opts) do
160160+ {:ok, conn} ->
161161+ {:keep_state, %{data | conn: conn}, [{:state_timeout, options.timeout, :connect_timeout}]}
162162+163163+ {:error, reason} ->
164164+ Logger.error("[Drinkup.Socket] Failed to open connection: #{inspect(reason)}")
165165+ {:stop, {:connect_failed, reason}}
166166+ end
167167+ end
168168+169169+ def connecting_http(:info, {:gun_up, _conn, :http}, data) do
170170+ {:next_state, :connecting_ws, data}
171171+ end
172172+173173+ def connecting_http(:state_timeout, :connect_timeout, data) do
174174+ Logger.error("[Drinkup.Socket] HTTP connection timeout")
175175+ trigger_reconnect(data)
176176+ end
177177+178178+ # :connecting_ws state - upgrading to WebSocket
179179+180180+ def connecting_ws(
181181+ :enter,
182182+ _from,
183183+ %{module: module, user_data: user_data, options: options} = data
184184+ ) do
185185+ Logger.debug("[Drinkup.Socket] Upgrading connection to WebSocket")
186186+187187+ path = module.build_path(user_data)
188188+ stream = :gun.ws_upgrade(data.conn, path, [], %{flow: options.flow})
189189+190190+ {:keep_state, %{data | stream: stream}, [{:state_timeout, options.timeout, :upgrade_timeout}]}
191191+ end
192192+193193+ def connecting_ws(:info, {:gun_upgrade, _conn, _stream, ["websocket"], _headers}, data) do
194194+ {:next_state, :connected, data}
195195+ end
196196+197197+ def connecting_ws(:info, {:gun_response, _conn, _stream, _fin, status, _headers}, data) do
198198+ Logger.error("[Drinkup.Socket] WebSocket upgrade failed with status: #{status}")
199199+ trigger_reconnect(data)
200200+ end
201201+202202+ def connecting_ws(:info, {:gun_error, _conn, _stream, reason}, data) do
203203+ Logger.error("[Drinkup.Socket] WebSocket upgrade error: #{inspect(reason)}")
204204+ trigger_reconnect(data)
205205+ end
206206+207207+ def connecting_ws(:state_timeout, :upgrade_timeout, data) do
208208+ Logger.error("[Drinkup.Socket] WebSocket upgrade timeout")
209209+ trigger_reconnect(data)
210210+ end
211211+212212+ # :connected state - active WebSocket connection
213213+214214+ def connected(:enter, _from, %{module: module, user_data: user_data} = data) do
215215+ Logger.debug("[Drinkup.Socket] WebSocket connected")
216216+217217+ case module.handle_connected(user_data) do
218218+ {:ok, new_user_data} ->
219219+ {:keep_state, %{data | user_data: new_user_data, reconnect_attempts: 0}}
220220+221221+ _ ->
222222+ {:keep_state, %{data | reconnect_attempts: 0}}
223223+ end
224224+ end
225225+226226+ def connected(
227227+ :info,
228228+ {:gun_ws, conn, _stream, frame},
229229+ %{module: module, user_data: user_data, options: options} = data
230230+ ) do
231231+ result = module.handle_frame(frame, user_data)
232232+233233+ :ok = :gun.update_flow(conn, frame, options.flow)
234234+235235+ case result do
236236+ {:ok, new_user_data} ->
237237+ {:keep_state, %{data | user_data: new_user_data}}
238238+239239+ result when result in [:noop, nil] ->
240240+ :keep_state_and_data
241241+242242+ {:error, reason} ->
243243+ Logger.error("[Drinkup.Socket] Frame handler error: #{inspect(reason)}")
244244+ :keep_state_and_data
245245+ end
246246+ end
247247+248248+ def connected(:info, {:gun_ws, _conn, _stream, :close}, data) do
249249+ Logger.info("[Drinkup.Socket] WebSocket closed by remote")
250250+ trigger_reconnect(data, :remote_close)
251251+ end
252252+253253+ def connected(:info, {:gun_ws, _conn, _stream, {:close, errno, reason}}, data) do
254254+ Logger.info("[Drinkup.Socket] WebSocket closed: #{errno} - #{inspect(reason)}")
255255+ trigger_reconnect(data, {:remote_close, errno, reason})
256256+ end
257257+258258+ def connected(:info, {:gun_down, old_conn, _proto, _reason, _killed_streams}, %{conn: new_conn})
259259+ when old_conn != new_conn do
260260+ Logger.debug("[Drinkup.Socket] Ignoring :gun_down for old connection")
261261+ :keep_state_and_data
262262+ end
263263+264264+ def connected(:info, {:gun_down, _conn, _proto, reason, _killed_streams}, data) do
265265+ Logger.info("[Drinkup.Socket] Connection down: #{inspect(reason)}")
266266+ trigger_reconnect(data, {:connection_down, reason})
267267+ end
268268+269269+ def connected(
270270+ :internal,
271271+ :reconnect,
272272+ %{conn: conn, options: options, reconnect_attempts: attempts} = data
273273+ ) do
274274+ :ok = :gun.close(conn)
275275+ :ok = :gun.flush(conn)
276276+277277+ backoff = calculate_backoff(attempts, options.reconnect_strategy)
278278+279279+ Logger.info("[Drinkup.Socket] Reconnecting in #{backoff}ms (attempt #{attempts + 1})")
280280+281281+ {:next_state, :disconnected,
282282+ %{data | conn: nil, stream: nil, reconnect_attempts: attempts + 1},
283283+ [{{:timeout, :reconnect}, backoff, :reconnect}]}
284284+ end
285285+286286+ # Helper functions
287287+288288+ defp trigger_reconnect(data, reason \\ :unknown) do
289289+ %{module: module, user_data: user_data} = data
290290+291291+ case module.handle_disconnected(reason, user_data) do
292292+ {:ok, new_user_data} ->
293293+ {:keep_state, %{data | user_data: new_user_data}, [{:next_event, :internal, :reconnect}]}
294294+295295+ _ ->
296296+ {:keep_state_and_data, [{:next_event, :internal, :reconnect}]}
297297+ end
298298+ end
299299+300300+ defp parse_options(opts) do
301301+ %{
302302+ host: Keyword.fetch!(opts, :host),
303303+ flow: Keyword.get(opts, :flow, 10),
304304+ timeout: Keyword.get(opts, :timeout, :timer.seconds(5)),
305305+ tls_opts: Keyword.get(opts, :tls_opts, default_tls_opts()),
306306+ gun_opts: Keyword.get(opts, :gun_opts, %{}),
307307+ reconnect_strategy: Keyword.get(opts, :reconnect_strategy, :exponential)
308308+ }
309309+ end
310310+311311+ defp default_tls_opts do
312312+ [
313313+ verify: :verify_peer,
314314+ cacerts: :certifi.cacerts(),
315315+ depth: 3,
316316+ customize_hostname_check: [
317317+ match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
318318+ ]
319319+ ]
320320+ end
321321+322322+ defp calculate_backoff(attempt, strategy) do
323323+ case strategy do
324324+ :exponential ->
325325+ exponential_backoff(attempt, :timer.seconds(60))
326326+327327+ {:exponential, max_backoff} ->
328328+ exponential_backoff(attempt, max_backoff)
329329+330330+ {:custom, func} when is_function(func, 1) ->
331331+ func.(attempt)
332332+ end
333333+ end
334334+335335+ defp exponential_backoff(attempt, max_backoff) do
336336+ base = :timer.seconds(1)
337337+ delay = min(base * :math.pow(2, attempt), max_backoff)
338338+ jitter = :rand.uniform(trunc(delay * 0.1))
339339+ trunc(delay) + jitter
340340+ end
341341+end