Elixir ATProtocol ingestion and sync library.
6
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: namespace existing functionality to Drinkup.Firehose

+71 -283
+32 -252
AGENTS.md
··· 1 - # AGENTS.md 2 - 3 - This file provides guidance for agentic coding assistants working with the 4 - Drinkup codebase. 5 - 6 - ## Project Overview 7 - 8 - Drinkup is an Elixir library for consuming events from an ATProtocol relay 9 - (firehose/`com.atproto.sync.subscribeRepos`). It uses OTP principles with 10 - GenStatem for managing WebSocket connections and Task.Supervisor for concurrent 11 - event processing. 12 - 13 - ## Build, Lint, and Test Commands 14 - 15 - ### Running Tests 16 - 17 - ```bash 18 - # Run all tests 19 - mix test 20 - 21 - # Run a single test file 22 - mix test test/drinkup_test.exs 23 - 24 - # Run a specific test by line number 25 - mix test test/drinkup_test.exs:5 26 - 27 - # Run tests with coverage 28 - mix test --cover 29 - 30 - # Run tests matching a pattern 31 - mix test --only [tag_name] 32 - ``` 33 - 34 - ### Formatting and Linting 35 - 36 - ```bash 37 - # Format code (uses .formatter.exs config) 38 - mix format 39 - 40 - # Check if code is formatted 41 - mix format --check-formatted 42 - 43 - # Run Credo for static code analysis 44 - mix credo 45 - 46 - # Run Credo strictly 47 - mix credo --strict 48 - ``` 49 - 50 - ### Compilation and Documentation 51 - 52 - ```bash 53 - # Compile the project 54 - mix compile 55 - 56 - # Clean build artifacts 57 - mix clean 58 - 59 - # Generate documentation 60 - mix docs 61 - 62 - # Run Dialyzer for type checking (if configured) 63 - mix dialyzer 64 - ``` 65 - 66 - ## Code Style Guidelines 67 - 68 - ### Module Structure 69 - 70 - - Use `defmodule` with clear, descriptive names following `Drinkup.<Component>` 71 - namespace 72 - - Place module documentation (`@moduledoc`) immediately after `defmodule` 73 - - Group related functionality within nested modules (e.g., 74 - `Drinkup.Event.Commit.RepoOp`) 75 - - Order module contents: module attributes, types, public functions, private 76 - functions 77 - 78 - ### Imports and Aliases 79 - 80 - - Use `require` for macros (e.g., `require Logger`) 81 - - Use `alias` to shorten module names, prefer explicit aliases over `import` 82 - - Group in order: `require`, `alias`, `import` 83 - - Example: 84 - ```elixir 85 - require Logger 86 - alias Drinkup.{Event, Options} 87 - ``` 88 - 89 - ### Type Specifications 90 - 91 - - Use TypedStruct for structs with typed fields (dependency: 92 - `{:typedstruct, "~> 0.5"}`) 93 - - Define `@type` specs for complex types, unions, and public APIs 94 - - Use `@spec` for all public functions 95 - - Use `enforce: true` for required TypedStruct fields 96 - - Example: 97 - 98 - ```elixir 99 - use TypedStruct 100 - 101 - typedstruct enforce: true do 102 - field :consumer, module() 103 - field :name, atom(), default: Drinkup 104 - field :cursor, pos_integer() | nil, enforce: false 105 - end 106 - ``` 107 - 108 - ### Naming Conventions 109 - 110 - - Modules: PascalCase (`Drinkup.Event.Commit`) 111 - - Functions: snake_case (`handle_event/1`, `from/1`) 112 - - Variables: snake_case (`repo_op`, `last_seq`) 113 - - Private functions: prefix with `defp`, mark with `@spec` if complex 114 - - Atoms: lowercase with underscores (`:ok`, `:connect_timeout`) 115 - - Behaviours: use `@behaviour` (not `@behavior`) 116 - 117 - ### Function Definitions 118 - 119 - - Pattern match in function heads when possible 120 - - Use guard clauses for simple type/value checks 121 - - Prefer multiple function heads over large case statements 122 - - Example: 123 - ```elixir 124 - def valid_seq?(nil, seq) when is_integer(seq), do: true 125 - def valid_seq?(last_seq, nil) when is_integer(last_seq), do: true 126 - def valid_seq?(last_seq, seq) when is_integer(last_seq) and is_integer(seq), 127 - do: seq > last_seq 128 - def valid_seq?(_last_seq, _seq), do: false 129 - ``` 130 - 131 - ### Error Handling 132 - 133 - - Use `try/rescue` for expected errors, catch and log appropriately 134 - - Use Logger for errors: 135 - `Logger.error("Message: #{Exception.format(:error, e, __STACKTRACE__)}")` 136 - - Return tagged tuples: `{:ok, result}` or `{:error, reason}` 137 - - Use `with` for chaining operations that may fail 138 - - Example from Socket module: 139 - ```elixir 140 - with {:ok, header, next} <- CAR.DagCbor.decode(frame), 141 - {:ok, payload, _} <- CAR.DagCbor.decode(next), 142 - {%{"op" => @op_regular}, _} <- {header, payload} do 143 - # happy path 144 - else 145 - {:error, reason} -> Logger.warning("Failed to decode: #{inspect(reason)}") 146 - end 147 - ``` 148 - 149 - ### OTP and Concurrency Patterns 150 - 151 - - Use `child_spec/1` for custom supervisor specifications 152 - - Prefer `GenServer` for stateful processes, `:gen_statem` for state machines 153 - - Use `Task.Supervisor` for concurrent, fire-and-forget work (see 154 - `Event.dispatch/2`) 155 - - Register processes via Registry for named lookups 156 - - Define proper restart strategies (`:permanent`, `:transient`, `:temporary`) 157 - 158 - ### Comments 159 - 160 - - Avoid obvious comments; prefer self-documenting code 161 - - Use `# TODO:` for future improvements (see existing TODOs in codebase) 162 - - Use `# DEPRECATED` for deprecated fields (see Commit struct) 163 - - Document complex algorithms or non-obvious business logic 164 - - Use module-level `@moduledoc` and function-level `@doc` for public APIs 165 - 166 - ### Formatting 167 - 168 - - Use `mix format` (configured in `.formatter.exs`) 169 - - Import deps for formatting: `import_deps: [:typedstruct]` 170 - - Line length: default Elixir formatter settings 171 - - Use 2-space indentation (enforced by formatter) 172 - 173 - ### Testing 174 - 175 - - Use ExUnit for tests (files in `test/` with `_test.exs` suffix) 176 - - Use `use ExUnit.Case` in test modules 177 - - Use `doctest Module` for testing documentation examples 178 - - Tag tests for selective running: `@tag :integration` 179 - - Use descriptive test names: `test "validates sequence numbers correctly"` 180 - 181 - ## Project-Specific Patterns 182 - 183 - ### Consumer Behaviour Pattern 184 - 185 - - Implement `@behaviour Drinkup.Consumer` with `handle_event/1` callback 186 - - Use pattern matching to handle different event types 187 - - Return any value; errors are caught by Task.Supervisor wrapper 188 - 189 - ### RecordConsumer Macro Pattern 190 - 191 - - Use `use Drinkup.RecordConsumer` with `collections:` opt for filtering 192 - - Override `handle_create/1`, `handle_update/1`, `handle_delete/1` as needed 193 - - Collections can be exact strings or Regex patterns: `~r/app\.bsky\.graph\..+/` 194 - 195 - ### WebSocket State Machine 196 - 197 - - Socket module uses `:gen_statem` with states: `:disconnected`, 198 - `:connecting_http`, `:connecting_ws`, `:connected` 199 - - State functions match on events: `state_name(:enter, from, data)` or 200 - `state_name(:info, msg, data)` 201 - - Use `{:next_event, :internal, event}` for internal state transitions 202 - 203 - ## Dependencies 204 - 205 - - `{:gun, "~> 2.2"}` - HTTP/WebSocket client 206 - - `{:car, "~> 0.1.0"}` - CAR (Content Addressable aRchive) format 207 - - `{:cbor, "~> 1.0.0"}` - CBOR encoding/decoding 208 - - `{:typedstruct, "~> 0.5"}` - Typed structs 209 - - `{:credo, "~> 1.7"}` - Static analysis (dev/test only) 210 - 211 - ## Common Tasks 212 - 213 - ### Adding a New Event Type 1 + # Agent Guidelines for Drinkup 214 2 215 - 1. Create `lib/event/your_event.ex` with TypedStruct definition 216 - 2. Add `from/1` function to parse payload 217 - 3. Add pattern match in `Drinkup.Event.from/2` 218 - 4. Add to `@type t()` union in `Drinkup.Event` 219 - 5. Update `CHANGELOG.md` under `[Unreleased]` section with the new feature 3 + ## Commands 220 4 221 - ### Debugging Connection Issues 5 + - **Test**: `mix test` (all), `mix test test/path/to/file_test.exs` (single file), `mix test test/path/to/file_test.exs:42` (single test at line) 6 + - **Format**: `mix format` (auto-formats all code) 7 + - **Lint**: `mix credo` (static analysis), `mix credo --strict` (strict mode) 8 + - **Compile**: `mix compile` 9 + - **Docs**: `mix docs` 10 + - **Type Check**: `mix dialyzer` (if configured) 222 11 223 - - Check `:gun` connection logs in Socket module 224 - - Verify sequence tracking with `Event.valid_seq?/2` 225 - - Monitor state transitions: `:disconnected` → `:connecting_http` → 226 - `:connecting_ws` → `:connected` 12 + ## Code Style 227 13 228 - ## Changelog Management 14 + - **Imports**: Use `alias` for modules (e.g., `alias Drinkup.Firehose.{Event, Options}`), `require` for macros (e.g., `require Logger`) 15 + - **Formatting**: Elixir 1.18+, auto-formatted via `.formatter.exs` with `import_deps: [:typedstruct]` 16 + - **Naming**: snake_case for functions/variables, PascalCase for modules, `:lowercase_atoms` for atoms, `@behaviour` (not `@behavior`) 17 + - **Types**: Use `@type` and `@spec` for all functions; use TypedStruct for structs with `enforce: true` for required fields 18 + - **Moduledocs**: Public modules need `@moduledoc`, public functions need `@doc` with examples 19 + - **Error Handling**: Return `{:ok, result}` or `{:error, reason}` tuples; use `with` for chaining operations; log errors with `Logger.error("#{Exception.format(:error, e, __STACKTRACE__)}")` 20 + - **Pattern Matching**: Prefer pattern matching in function heads over conditionals; use guard clauses when appropriate 21 + - **OTP**: Use `child_spec/1` for custom supervisor specs; `:gen_statem` for state machines; `Task.Supervisor` for concurrent tasks; Registry for named lookups 22 + - **Tests**: Use ExUnit with `use ExUnit.Case`; use `doctest Module` for documentation examples 23 + - **Dependencies**: Core deps include gun (WebSocket), car (CAR format), cbor (encoding), TypedStruct (typed structs), Credo (linting) 229 24 230 - **IMPORTANT**: After completing any feature or fixing a bug from a previous 231 - release, you MUST update `CHANGELOG.md`. 25 + ## Project Structure 232 26 233 - ### Changelog Format 234 - 235 - - Follows [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) format 236 - - Uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html) 237 - - Group changes under appropriate sections: `Added`, `Changed`, `Deprecated`, 238 - `Removed`, `Fixed`, `Security` 27 + - **Namespace**: All firehose functionality under `Drinkup.Firehose.*` 28 + - `Drinkup.Firehose` - Main supervisor 29 + - `Drinkup.Firehose.Consumer` - Behaviour for handling all events 30 + - `Drinkup.Firehose.RecordConsumer` - Macro for handling commit record events with filtering 31 + - `Drinkup.Firehose.Event` - Event types (`Commit`, `Sync`, `Identity`, `Account`, `Info`) 32 + - `Drinkup.Firehose.Socket` - `:gen_statem` WebSocket connection manager 33 + - **Consumer Pattern**: Implement `@behaviour Drinkup.Firehose.Consumer` with `handle_event/1` 34 + - **RecordConsumer Pattern**: `use Drinkup.Firehose.RecordConsumer, collections: [~r/app\.bsky\.graph\..+/, "app.bsky.feed.post"]` with `handle_create/1`, `handle_update/1`, `handle_delete/1` overrides 239 35 240 - ### When to Update 36 + ## Important Notes 241 37 242 - - **New features**: Add under `## [Unreleased]` → `### Added` 243 - - **Bug fixes**: Add under `## [Unreleased]` → `### Fixed` 244 - - **Breaking changes**: Add under `## [Unreleased]` → `### Breaking Changes` 245 - - **Deprecations**: Add under `## [Unreleased]` → `### Deprecated` 246 - - **Security fixes**: Add under `## [Unreleased]` → `### Security` 247 - 248 - ### Example Entry 249 - 250 - ```markdown 251 - ## [Unreleased] 252 - 253 - ### Added 254 - 255 - - Support for `#handle` event type in firehose consumer 256 - 257 - ### Fixed 258 - 259 - - Sequence validation now correctly handles nil cursor on initial connection 260 - ``` 38 + - **Update CHANGELOG.md** when adding features, changes, or fixes under `## [Unreleased]` with appropriate sections (`Added`, `Changed`, `Fixed`, `Deprecated`, `Removed`, `Security`) 39 + - **WebSocket States**: Socket uses `:disconnected` → `:connecting_http` → `:connecting_ws` → `:connected` flow 40 + - **Sequence Tracking**: Use `Event.valid_seq?/2` to validate sequence numbers from firehose
+7
CHANGELOG.md
··· 6 6 and this project adheres to 7 7 [Semantic Versioning](https://semver.org/spec/v2.0.0.html). 8 8 9 + ## [Unreleased] 10 + 11 + ### Breaking Change 12 + 13 + - Existing behaviour moved to `Drinkup.Firehose` namespace, to make way for 14 + alternate sync systems. 15 + 9 16 ## [0.1.0] - 2025-05-26 10 17 11 18 Initial release.
+3 -3
examples/basic_consumer.ex
··· 1 1 defmodule BasicConsumer do 2 - @behaviour Drinkup.Consumer 2 + @behaviour Drinkup.Firehose.Consumer 3 3 4 - def handle_event(%Drinkup.Event.Commit{} = event) do 4 + def handle_event(%Drinkup.Firehose.Event.Commit{} = event) do 5 5 IO.inspect(event, label: "Got commit event") 6 6 end 7 7 ··· 18 18 @impl true 19 19 def init(_) do 20 20 children = [ 21 - {Drinkup, %{consumer: BasicConsumer}} 21 + {Drinkup.Firehose, %{consumer: BasicConsumer}} 22 22 ] 23 23 24 24 Supervisor.init(children, strategy: :one_for_one)
+5 -5
examples/multiple_consumers.ex
··· 1 1 defmodule PostDeleteConsumer do 2 - use Drinkup.RecordConsumer, collections: ["app.bsky.feed.post"] 2 + use Drinkup.Firehose.RecordConsumer, collections: ["app.bsky.feed.post"] 3 3 4 4 def handle_delete(record) do 5 5 IO.inspect(record, label: "update") ··· 7 7 end 8 8 9 9 defmodule IdentityConsumer do 10 - @behaviour Drinkup.Consumer 10 + @behaviour Drinkup.Firehose.Consumer 11 11 12 - def handle_event(%Drinkup.Event.Identity{} = event) do 12 + def handle_event(%Drinkup.Firehose.Event.Identity{} = event) do 13 13 IO.inspect(event, label: "identity event") 14 14 end 15 15 ··· 26 26 @impl true 27 27 def init(_) do 28 28 children = [ 29 - {Drinkup, %{consumer: PostDeleteConsumer}}, 30 - {Drinkup, %{consumer: IdentityConsumer, name: :identities}} 29 + {Drinkup.Firehose, %{consumer: PostDeleteConsumer}}, 30 + {Drinkup.Firehose, %{consumer: IdentityConsumer, name: :identities}} 31 31 ] 32 32 33 33 Supervisor.init(children, strategy: :one_for_one)
+3 -2
examples/record_consumer.ex
··· 1 1 defmodule ExampleRecordConsumer do 2 - use Drinkup.RecordConsumer, collections: [~r/app\.bsky\.graph\..+/, "app.bsky.feed.post"] 2 + use Drinkup.Firehose.RecordConsumer, 3 + collections: [~r/app\.bsky\.graph\..+/, "app.bsky.feed.post"] 3 4 4 5 def handle_create(record) do 5 6 IO.inspect(record, label: "create") ··· 24 25 @impl true 25 26 def init(_) do 26 27 children = [ 27 - {Drinkup, %{consumer: ExampleRecordConsumer}} 28 + {Drinkup.Firehose, %{consumer: ExampleRecordConsumer}} 28 29 ] 29 30 30 31 Supervisor.init(children, strategy: :one_for_one)
+2 -2
lib/consumer.ex lib/firehose/consumer.ex
··· 1 - defmodule Drinkup.Consumer do 1 + defmodule Drinkup.Firehose.Consumer do 2 2 @moduledoc """ 3 3 An unopinionated consumer of the Firehose. Will receive all events, not just commits. 4 4 """ 5 5 6 - alias Drinkup.Event 6 + alias Drinkup.Firehose.Event 7 7 8 8 @callback handle_event(Event.t()) :: any() 9 9 end
+3 -3
lib/drinkup.ex lib/firehose.ex
··· 1 - defmodule Drinkup do 1 + defmodule Drinkup.Firehose do 2 2 use Supervisor 3 - alias Drinkup.Options 3 + alias Drinkup.Firehose.Options 4 4 5 5 @dialyzer nowarn_function: {:init, 1} 6 6 @impl true 7 7 def init({%Options{name: name} = drinkup_options, supervisor_options}) do 8 8 children = [ 9 9 {Task.Supervisor, name: {:via, Registry, {Drinkup.Registry, {name, Tasks}}}}, 10 - {Drinkup.Socket, drinkup_options} 10 + {Drinkup.Firehose.Socket, drinkup_options} 11 11 ] 12 12 13 13 Supervisor.start_link(
+2 -2
lib/event.ex lib/firehose/event.ex
··· 1 - defmodule Drinkup.Event do 1 + defmodule Drinkup.Firehose.Event do 2 2 require Logger 3 - alias Drinkup.{Event, Options} 3 + alias Drinkup.Firehose.{Event, Options} 4 4 5 5 @type t() :: 6 6 Event.Commit.t()
+1 -1
lib/event/account.ex lib/firehose/event/account.ex
··· 1 - defmodule Drinkup.Event.Account do 1 + defmodule Drinkup.Firehose.Event.Account do 2 2 @moduledoc """ 3 3 Struct for account events from the ATProto Firehose. 4 4 """
+1 -1
lib/event/commit.ex lib/firehose/event/commit.ex
··· 1 - defmodule Drinkup.Event.Commit do 1 + defmodule Drinkup.Firehose.Event.Commit do 2 2 @moduledoc """ 3 3 Struct for commit events from the ATProto Firehose. 4 4 """
+1 -1
lib/event/identity.ex lib/firehose/event/identity.ex
··· 1 - defmodule Drinkup.Event.Identity do 1 + defmodule Drinkup.Firehose.Event.Identity do 2 2 @moduledoc """ 3 3 Struct for identity events from the ATProto Firehose. 4 4 """
+1 -1
lib/event/info.ex lib/firehose/event/info.ex
··· 1 - defmodule Drinkup.Event.Info do 1 + defmodule Drinkup.Firehose.Event.Info do 2 2 @moduledoc """ 3 3 Struct for info events from the ATProto Firehose. 4 4 """
+1 -1
lib/event/sync.ex lib/firehose/event/sync.ex
··· 1 - defmodule Drinkup.Event.Sync do 1 + defmodule Drinkup.Firehose.Event.Sync do 2 2 @moduledoc """ 3 3 Struct for sync events from the ATProto Firehose. 4 4 """
+1 -1
lib/options.ex lib/firehose/options.ex
··· 1 - defmodule Drinkup.Options do 1 + defmodule Drinkup.Firehose.Options do 2 2 use TypedStruct 3 3 4 4 @default_host "https://bsky.network"
+6 -6
lib/record_consumer.ex lib/firehose/record_consumer.ex
··· 1 - defmodule Drinkup.RecordConsumer do 1 + defmodule Drinkup.Firehose.RecordConsumer do 2 2 @moduledoc """ 3 3 An opinionated consumer of the Firehose that eats consumers 4 4 """ ··· 11 11 {collections, _opts} = Keyword.pop(opts, :collections, []) 12 12 13 13 quote location: :keep do 14 - @behaviour Drinkup.Consumer 15 - @behaviour Drinkup.RecordConsumer 14 + @behaviour Drinkup.Firehose.Consumer 15 + @behaviour Drinkup.Firehose.RecordConsumer 16 16 17 - def handle_event(%Drinkup.Event.Commit{} = event) do 17 + def handle_event(%Drinkup.Firehose.Event.Commit{} = event) do 18 18 event.ops 19 19 |> Enum.filter(fn %{path: path} -> 20 20 path |> String.split("/") |> Enum.at(0) |> matches_collections?() 21 21 end) 22 - |> Enum.map(&Drinkup.RecordConsumer.Record.from(&1, event.repo)) 22 + |> Enum.map(&Drinkup.Firehose.RecordConsumer.Record.from(&1, event.repo)) 23 23 |> Enum.each(&apply(__MODULE__, :"handle_#{&1.action}", [&1])) 24 24 end 25 25 ··· 56 56 end 57 57 58 58 defmodule Record do 59 - alias Drinkup.Event.Commit.RepoOp 59 + alias Drinkup.Firehose.Event.Commit.RepoOp 60 60 use TypedStruct 61 61 62 62 typedstruct do
+2 -2
lib/socket.ex lib/firehose/socket.ex
··· 1 - defmodule Drinkup.Socket do 1 + defmodule Drinkup.Firehose.Socket do 2 2 @moduledoc """ 3 3 gen_statem process for managing the websocket connection to an ATProto relay. 4 4 """ 5 5 6 6 require Logger 7 - alias Drinkup.{Event, Options} 7 + alias Drinkup.Firehose.{Event, Options} 8 8 9 9 @behaviour :gen_statem 10 10 @timeout :timer.seconds(5)