Elixir ATProtocol ingestion and sync library.
6
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 100 lines 2.8 kB view raw
1defmodule Drinkup.Jetstream.Event do 2 @moduledoc """ 3 Event handling and dispatch for Jetstream events. 4 5 Parses incoming JSON events from Jetstream and dispatches them to the 6 configured consumer via Task.Supervisor. 7 """ 8 9 require Logger 10 alias Drinkup.Jetstream.{Event, Options} 11 12 @type t() :: Event.Commit.t() | Event.Identity.t() | Event.Account.t() 13 14 @doc """ 15 Parse a JSON map into an event struct. 16 17 Jetstream events have a top-level structure with a "kind" field that 18 determines the event type, and a nested object with the event data. 19 20 ## Example Event Structure 21 22 %{ 23 "did" => "did:plc:...", 24 "time_us" => 1726880765818347, 25 "kind" => "commit", 26 "commit" => %{...} 27 } 28 29 Returns the appropriate event struct based on the "kind" field, or `nil` 30 if the event type is not recognized. 31 """ 32 @spec from(map()) :: t() | nil 33 def from(%{"did" => did, "time_us" => time_us, "kind" => kind} = payload) do 34 case kind do 35 "commit" -> 36 case Map.get(payload, "commit") do 37 nil -> 38 Logger.warning("Commit event missing 'commit' field: #{inspect(payload)}") 39 nil 40 41 commit -> 42 Event.Commit.from(did, time_us, commit) 43 end 44 45 "identity" -> 46 case Map.get(payload, "identity") do 47 nil -> 48 Logger.warning("Identity event missing 'identity' field: #{inspect(payload)}") 49 nil 50 51 identity -> 52 Event.Identity.from(did, time_us, identity) 53 end 54 55 "account" -> 56 case Map.get(payload, "account") do 57 nil -> 58 Logger.warning("Account event missing 'account' field: #{inspect(payload)}") 59 nil 60 61 account -> 62 Event.Account.from(did, time_us, account) 63 end 64 65 _ -> 66 Logger.warning("Received unrecognized event kind from Jetstream: #{inspect(kind)}") 67 nil 68 end 69 end 70 71 def from(payload) do 72 Logger.warning("Received invalid event structure from Jetstream: #{inspect(payload)}") 73 nil 74 end 75 76 @doc """ 77 Dispatch an event to the consumer via Task.Supervisor. 78 79 Spawns a task that processes the event via the consumer's `handle_event/1` 80 callback. Unlike Tap, Jetstream does not require acknowledgments. 81 """ 82 @spec dispatch(t(), Options.t()) :: :ok 83 def dispatch(event, %Options{consumer: consumer, name: name}) do 84 supervisor_name = {:via, Registry, {Drinkup.Registry, {name, JetstreamTasks}}} 85 86 {:ok, _pid} = 87 Task.Supervisor.start_child(supervisor_name, fn -> 88 try do 89 consumer.handle_event(event) 90 rescue 91 e -> 92 Logger.error( 93 "Error in Jetstream event handler: #{Exception.format(:error, e, __STACKTRACE__)}" 94 ) 95 end 96 end) 97 98 :ok 99 end 100end