Elixir ATProtocol ingestion and sync library.
1defmodule Drinkup.Jetstream.Event do
2 @moduledoc """
3 Event handling and dispatch for Jetstream events.
4
5 Parses incoming JSON events from Jetstream and dispatches them to the
6 configured consumer via Task.Supervisor.
7 """
8
9 require Logger
10 alias Drinkup.Jetstream.{Event, Options}
11
12 @type t() :: Event.Commit.t() | Event.Identity.t() | Event.Account.t()
13
14 @doc """
15 Parse a JSON map into an event struct.
16
17 Jetstream events have a top-level structure with a "kind" field that
18 determines the event type, and a nested object with the event data.
19
20 ## Example Event Structure
21
22 %{
23 "did" => "did:plc:...",
24 "time_us" => 1726880765818347,
25 "kind" => "commit",
26 "commit" => %{...}
27 }
28
29 Returns the appropriate event struct based on the "kind" field, or `nil`
30 if the event type is not recognized.
31 """
32 @spec from(map()) :: t() | nil
33 def from(%{"did" => did, "time_us" => time_us, "kind" => kind} = payload) do
34 case kind do
35 "commit" ->
36 case Map.get(payload, "commit") do
37 nil ->
38 Logger.warning("Commit event missing 'commit' field: #{inspect(payload)}")
39 nil
40
41 commit ->
42 Event.Commit.from(did, time_us, commit)
43 end
44
45 "identity" ->
46 case Map.get(payload, "identity") do
47 nil ->
48 Logger.warning("Identity event missing 'identity' field: #{inspect(payload)}")
49 nil
50
51 identity ->
52 Event.Identity.from(did, time_us, identity)
53 end
54
55 "account" ->
56 case Map.get(payload, "account") do
57 nil ->
58 Logger.warning("Account event missing 'account' field: #{inspect(payload)}")
59 nil
60
61 account ->
62 Event.Account.from(did, time_us, account)
63 end
64
65 _ ->
66 Logger.warning("Received unrecognized event kind from Jetstream: #{inspect(kind)}")
67 nil
68 end
69 end
70
71 def from(payload) do
72 Logger.warning("Received invalid event structure from Jetstream: #{inspect(payload)}")
73 nil
74 end
75
76 @doc """
77 Dispatch an event to the consumer via Task.Supervisor.
78
79 Spawns a task that processes the event via the consumer's `handle_event/1`
80 callback. Unlike Tap, Jetstream does not require acknowledgments.
81 """
82 @spec dispatch(t(), Options.t()) :: :ok
83 def dispatch(event, %Options{consumer: consumer, name: name}) do
84 supervisor_name = {:via, Registry, {Drinkup.Registry, {name, JetstreamTasks}}}
85
86 {:ok, _pid} =
87 Task.Supervisor.start_child(supervisor_name, fn ->
88 try do
89 consumer.handle_event(event)
90 rescue
91 e ->
92 Logger.error(
93 "Error in Jetstream event handler: #{Exception.format(:error, e, __STACKTRACE__)}"
94 )
95 end
96 end)
97
98 :ok
99 end
100end