···2929import gleam/uri
3030import sqlight
3131import woof
3232+import youid/uuid
3333+3434+/// log with module related structured data
3535+///
3636+fn log(
3737+ level: woof.Level,
3838+ message: String,
3939+ fields: List(#(String, String)),
4040+) -> Nil {
4141+ woof.new("BACKEND")
4242+ |> woof.log(level, message, fields)
4343+}
32443345// public stuff -----------------------------------------------------------------
34464747+/// get an instance of `Reference` to pass around
4848+///
4949+pub fn new_reference(name, database) {
5050+ Reference(name:, database:)
5151+}
5252+5353+/// reference for the backend
5454+///
5555+/// use with:
5656+/// - `new_subscription`
5757+/// - `remove_subscription`
5858+/// - `status`
5959+/// - `find_feed`
6060+/// - `restart_feed`
6161+/// - `refetch_feed`
6262+/// - `restart_user`
6363+/// - `fetch_users`
6464+/// - `subscriptions_for_user`
6565+/// - `...`
6666+///
6767+pub opaque type Reference {
6868+ Reference(name: Name, database: sqlight.Connection)
6969+}
7070+7171+pub type Name =
7272+ process.Name(Message)
7373+3574pub type Names {
3675 Names(
3776 backend: process.Name(Message),
···79118// main api ---------------------------------------------------------------------
8011981120/// add a new subscription for a user
121121+/// - save to database
122122+/// - notify backend processes
82123///
83124pub fn new_subscription(
8484- backend backend: process.Name(Message),
125125+ backend backend: Reference,
85126 user user: user.User,
86127 feed feed: rss.Location,
8787-) -> Nil {
8888- let backend = process.named_subject(backend)
128128+) -> Result(Nil, Nil) {
129129+ use _ <- result.try(
130130+ database.add_feed(backend.database, feed)
131131+ |> result.map_error(fn(error) {
132132+ log(woof.Error, "Failed to add user to database", [
133133+ woof.field("user-email", user.email),
134134+ woof.field("user-id", user.id |> uuid.to_string()),
135135+ woof.field("details", string.inspect(error)),
136136+ ])
137137+ }),
138138+ )
139139+140140+ use _ <- result.try(
141141+ database.add_subscription(backend.database, user, feed)
142142+ |> result.map_error(fn(error) {
143143+ log(woof.Error, "Failed to add user to database", [
144144+ woof.field("user-email", user.email),
145145+ woof.field("user-id", user.id |> uuid.to_string()),
146146+ woof.field("details", string.inspect(error)),
147147+ ])
148148+ }),
149149+ )
150150+151151+ let backend = process.named_subject(backend.name)
89152 process.send(backend, NewSubscription(user, feed))
153153+ |> Ok
90154}
9115592156/// remove a subscription from a user
157157+///
158158+/// - delete from database
159159+/// - notify backend processes
93160///
94161pub fn remove_subscription(
9595- backend backend: process.Name(Message),
162162+ backend backend: Reference,
96163 user user: user.User,
97164 feed feed: rss.Location,
9898-) -> Nil {
9999- let backend = process.named_subject(backend)
165165+) -> Result(Nil, Nil) {
166166+ use _ <- result.try(
167167+ database.delete_subscription(backend.database, user, feed)
168168+ |> result.map_error(fn(error) {
169169+ log(woof.Error, "Failed to add user to database", [
170170+ woof.field("user-email", user.email),
171171+ woof.field("user-id", user.id |> uuid.to_string()),
172172+ woof.field("details", string.inspect(error)),
173173+ ])
174174+ }),
175175+ )
176176+177177+ let backend = process.named_subject(backend.name)
100178 process.send(backend, RemoveSubscription(user, feed))
179179+ |> Ok
101180}
102181103182/// to be fleshed out as it becomes clearer what this entails
···107186///
108187/// waits for the response from the backend
109188///
110110-pub fn status(backend backend: process.Name(Message)) -> Result(Status, Nil) {
111111- let backend = process.named_subject(backend)
189189+pub fn status(backend backend: Reference) -> Result(Status, Nil) {
190190+ let backend = process.named_subject(backend.name)
112191 let send_to = process.new_subject()
113192114193 process.send(backend, Status(send_to:))
···118197119198/// restart the fetcher for this feed
120199///
121121-pub fn restart_feed(
122122- backend backend: process.Name(Message),
123123- feed feed: rss.Location,
124124-) -> Nil {
125125- let backend = process.named_subject(backend)
200200+pub fn restart_feed(backend backend: Reference, feed feed: rss.Location) -> Nil {
201201+ let backend = process.named_subject(backend.name)
126202 process.send(backend, RestartFeed(feed))
127203}
128204129205/// trigger an early refetch for this feed
130206///
131131-pub fn refetch(
132132- backend backend: process.Name(Message),
133133- feed feed: rss.Location,
134134-) -> Nil {
135135- let backend = process.named_subject(backend)
207207+pub fn refetch_feed(backend backend: Reference, feed feed: rss.Location) -> Nil {
208208+ let backend = process.named_subject(backend.name)
136209 process.send(backend, Refetch(feed))
137210}
138211139212/// restart the sender for this user
140213///
141141-pub fn restart_user(
142142- backend backend: process.Name(Message),
143143- user user: user.User,
144144-) -> Nil {
145145- let backend = process.named_subject(backend)
214214+pub fn restart_user(backend backend: Reference, user user: user.User) -> Nil {
215215+ let backend = process.named_subject(backend.name)
146216 process.send(backend, RestartUser(user))
147217}
148218149219/// find a feed using a uri
150220///
151221pub fn find_feed(
152152- backend backend: process.Name(Message),
222222+ backend backend: Reference,
153223 uri uri: uri.Uri,
154224) -> Result(rss.Location, Nil) {
155155- let backend = process.named_subject(backend)
225225+ case database.feed_by_link(uri, backend.database) {
226226+ Error(_) -> Error(Nil)
227227+ Ok([]) -> Ok(rss.new_location(uri))
228228+ Ok([feed, ..]) -> Ok(feed)
229229+ }
230230+}
231231+232232+/// find a user using an email
233233+///
234234+pub fn find_user(
235235+ backend backend: Reference,
236236+ email email: String,
237237+) -> Result(user.User, Nil) {
238238+ case database.user_by_email(backend.database, email) {
239239+ Error(_) -> Error(Nil)
240240+ Ok([]) -> Error(Nil)
241241+ Ok([user, ..]) -> Ok(user)
242242+ }
243243+}
156244157157- let send_to = process.new_subject()
245245+/// add a new user
246246+///
247247+/// Ok(Nil) -> added successfully
248248+/// Error(Nil) -> something went wrong
249249+///
250250+pub fn new_user(backend: Reference, user: user.User) -> Result(Nil, Nil) {
251251+ database.add_user(user, backend.database)
252252+ |> result.map_error(fn(error) {
253253+ log(woof.Error, "Failed to add user to database", [
254254+ woof.field("user-email", user.email),
255255+ woof.field("user-id", user.id |> uuid.to_string()),
256256+ woof.field("details", string.inspect(error)),
257257+ ])
258258+ })
259259+}
158260159159- process.send(backend, FindFeed(uri, send_to:))
261261+/// get a list of all currently existing users
262262+///
263263+pub fn fetch_users(backend: Reference) -> Result(List(user.User), Nil) {
264264+ database.all_users(backend.database)
265265+ |> result.map_error(fn(error) {
266266+ log(woof.Error, "Failed to get users from database", [
267267+ woof.field("details", string.inspect(error)),
268268+ ])
269269+ })
270270+}
160271161161- process.receive(send_to, within: 1000)
162162- |> result.flatten()
272272+/// get all subscriptions for a user
273273+///
274274+pub fn subscriptions_for_user(
275275+ backend: Reference,
276276+ user: user.User,
277277+) -> Result(List(rss.Location), Nil) {
278278+ database.feeds_for_user(backend.database, user)
279279+ |> result.map_error(fn(error) {
280280+ log(woof.Error, "Failed to get feeds for user", [
281281+ woof.field("user-email", user.email),
282282+ woof.field("user-id", user.id |> uuid.to_string()),
283283+ woof.field("details", string.inspect(error)),
284284+ ])
285285+ })
163286}
164287165288pub opaque type Message {
···206329 /// restart the sender for this user
207330 ///
208331 RestartUser(user.User)
332332+}
209333210210- /// find a feed using a uri
211211- ///
212212- /// - check the database
213213- /// - found -> send the associated `rss.Location` to `send_to`
214214- /// - not found -> `rss.new_location` and send to `send_to`
215215- ///
216216- FindFeed(uri.Uri, send_to: process.Subject(Result(rss.Location, Nil)))
217217- // internal messages ----------------------------------------------------------
334334+fn describe_message(message: Message) -> List(#(String, String)) {
335335+ case message {
336336+ NewSubscription(user, feed) -> [
337337+ woof.field("message", "NewSubscription"),
338338+ woof.field("user-id", user.id |> uuid.to_string()),
339339+ woof.field("user-email", user.email),
340340+ woof.field("feed-id", feed.id |> uuid.to_string()),
341341+ woof.field("feed-url", feed.link |> uri.to_string()),
342342+ ]
343343+ RemoveSubscription(user, feed) -> [
344344+ woof.field("message", "RemoveSubscription"),
345345+ woof.field("user-id", user.id |> uuid.to_string()),
346346+ woof.field("user-email", user.email),
347347+ woof.field("feed-id", feed.id |> uuid.to_string()),
348348+ woof.field("feed-url", feed.link |> uri.to_string()),
349349+ ]
350350+ Status(send_to: _) -> [
351351+ woof.field("message", "Status"),
352352+ ]
353353+ RestartFeed(feed) -> [
354354+ woof.field("message", "RestartFeed"),
355355+ woof.field("feed-id", feed.id |> uuid.to_string()),
356356+ woof.field("feed-url", feed.link |> uri.to_string()),
357357+ ]
358358+ Refetch(feed) -> [
359359+ woof.field("message", "Refetch"),
360360+ woof.field("feed-id", feed.id |> uuid.to_string()),
361361+ woof.field("feed-url", feed.link |> uri.to_string()),
362362+ ]
363363+ RestartUser(user) -> [
364364+ woof.field("message", "RestartUser"),
365365+ woof.field("user-id", user.id |> uuid.to_string()),
366366+ woof.field("user-email", user.email),
367367+ ]
368368+ }
218369}
219370220371type State {
···252403}
253404254405fn on_message(state: State, message: Message) -> actor.Next(State, Message) {
406406+ case woof.is_enabled(woof.Debug) {
407407+ True ->
408408+ woof.log(
409409+ state.logger,
410410+ woof.Debug,
411411+ "New message",
412412+ describe_message(message),
413413+ )
414414+ False -> Nil
415415+ }
416416+255417 case message {
256418 // external messages --------------------------------------------------------
257419 NewSubscription(user, feed) -> handle_new_subscription(state, user, feed)
···270432 sender.restart_for(state.names.sender_manager, user)
271433 actor.continue(state)
272434 }
273273- FindFeed(uri, send_to:) -> {
274274- case database.feed_by_link(uri, state.database) {
275275- Error(_) -> Error(Nil)
276276- Ok([]) -> Ok(rss.new_location(uri))
277277- Ok([feed, ..]) -> Ok(feed)
278278- }
279279- |> process.send(send_to, _)
280280- actor.continue(state)
281281- }
282282- // internal messages --------------------------------------------------------
283435 }
284436}
285437286438/// add a subscription for a user
287439///
288288-/// - save to database
289440/// - notify sender
290441/// - start sender if not running
291442/// - start fetcher if not running
···295446 user: user.User,
296447 feed: rss.Location,
297448) -> actor.Next(State, Message) {
298298- // - save to database
299299- use _ <- try_twice(
300300- fn() { database.add_feed(feed, state.database) },
301301- otherwise: log_and_stop(state.logger, "Failed to add feed to database", [
302302- woof.field("feed", feed.link |> uri.to_string()),
303303- ]),
304304- )
305305-306449 // - notify sender
307307- pubsub.publish_subscribed_to_feed(user, feed, state.names.registry)
450450+ sender.add_subscription(state.names.sender_manager, user, feed)
308451309452 // - start sender if not running
310453 sender.start_for(state.names.sender_manager, user)
···316459317460/// remove a subscription from a user
318461///
319319-/// - delete from database
320462/// - notify sender
321463/// - if noone is subscribed
322464/// - stop fetcher
···327469 user: user.User,
328470 feed: rss.Location,
329471) -> actor.Next(State, Message) {
330330- // - delete from database
331331- use _ <- try_twice(
332332- fn() { database.delete_subscription(user, feed, state.database) },
333333- otherwise: log_and_stop(state.logger, "Failed delete subscription", [
334334- woof.field("feed", feed.link |> uri.to_string()),
335335- woof.field("user", user.email),
336336- ]),
337337- )
338338-339472 // - notify sender
340340- pubsub.publish_unsubscribed_from_feed(user, feed, state.names.registry)
473473+ sender.remove_subscription(state.names.sender_manager, user, feed)
341474342475 use subscribers <- try_twice(
343476 fn() { database.subscription_count(feed, state.database) },
+20-25
src/eater/database.gleam
···3333/// returns the feed for convenience
3434///
3535pub fn add_feed(
3636- feed feed: rss.Location,
3736 into on: sqlight.Connection,
3737+ feed feed: rss.Location,
3838) -> Result(rss.Location, sqlight.Error) {
3939 let #(sql, with) =
4040 sql.add_feed(
···142142pub fn add_user(
143143 user user: user.User,
144144 into on: sqlight.Connection,
145145-) -> Result(user.User, sqlight.Error) {
145145+) -> Result(Nil, sqlight.Error) {
146146 let #(sql, with) =
147147 sql.add_user(
148148 id: user.id |> uuid.to_bit_array,
···157157 let with = list.map(with, parrot_to_sqlight)
158158159159 sqlight.query(sql, on:, with:, expecting: decode.success(""))
160160- |> result.replace(user)
160160+ |> result.replace(Nil)
161161}
162162163163/// gets a list of all users from the database
···186186}
187187188188/// gets a specific user using the associated email
189189-/// the nested error contains the provided email
190189///
191190pub fn user_by_email(
192191 in on: sqlight.Connection,
193192 email email: String,
194194-) -> Result(Result(user.User, String), sqlight.Error) {
193193+) -> Result(List(user.User), sqlight.Error) {
195194 let #(sql, with, expecting) = sql.user_by_email(email:)
196195197196 let with = list.map(with, parrot_to_sqlight)
198197199198 use user <- result.try(sqlight.query(sql, on:, with:, expecting:))
200199201201- case user {
202202- [] -> Error(email)
203203- [user, ..] -> {
204204- let assert Ok(id) = uuid.from_bit_array(user.id)
205205- as "invalid UUID from db UUID column?!"
200200+ list.map(user, fn(user) {
201201+ let assert Ok(id) = uuid.from_bit_array(user.id)
202202+ as "invalid UUID from db UUID column?!"
206203207207- let is_admin = case user.is_admin {
208208- 1 -> True
209209- _ -> False
210210- }
204204+ let is_admin = case user.is_admin {
205205+ 1 -> True
206206+ _ -> False
207207+ }
211208212212- user.User(id, user.email, user.password_hash, is_admin:)
213213- |> Ok
214214- }
215215- }
209209+ user.User(id, user.email, user.password_hash, is_admin:)
210210+ })
216211 |> Ok
217212}
218213···269264/// returns the feed on success for convenience
270265///
271266pub fn add_subscription(
272272- user: user.User,
273273- feed: rss.Location,
274274- into on: sqlight.Connection,
267267+ on: sqlight.Connection,
268268+ user user: user.User,
269269+ feed feed: rss.Location,
275270) -> Result(rss.Location, sqlight.Error) {
276271 let #(sql, with) =
277272 sql.add_subscription(
···289284/// returns the feed in the `Ok()` for convenience
290285///
291286pub fn delete_subscription(
292292- user: user.User,
293293- feed: rss.Location,
294294- database on: sqlight.Connection,
287287+ on: sqlight.Connection,
288288+ user user: user.User,
289289+ feed feed: rss.Location,
295290) -> Result(rss.Location, sqlight.Error) {
296291 let #(sql, with) =
297292 sql.delete_subsciption(
···308303/// feeds a given user is subscribed to
309304///
310305pub fn feeds_for_user(
311311- for user: user.User,
312306 in on: sqlight.Connection,
307307+ for user: user.User,
313308) -> Result(List(rss.Location), sqlight.Error) {
314309 let #(sql, with, expecting) = sql.feeds_for_user(user.id |> uuid.to_bit_array)
315310
+14-18
src/eater/fetcher.gleam
···8080 factory: FactoryName,
8181 registry: pubsub.Registry,
8282 database: sqlight.Connection,
8383- fetchers: dict.Dict(uuid.Uuid, #(process.Pid, process.Subject(Message))),
8383+ fetchers: dict.Dict(uuid.Uuid, actor.Started(process.Subject(Message))),
8484 )
8585}
8686···179179 // if there is an existing sender for this
180180 // tell it to shut down
181181 case dict.get(state.fetchers, feed.id) {
182182- Ok(#(pid, _)) ->
183183- case process.is_alive(pid) {
184184- True -> process.send_exit(pid)
182182+ Ok(actor) ->
183183+ case process.is_alive(actor.pid) {
184184+ True -> process.send_exit(actor.pid)
185185 False -> Nil
186186 }
187187 Error(_) -> Nil
···190190 let state =
191191 ManagerState(
192192 ..state,
193193- fetchers: dict.insert(state.fetchers, feed.id, #(
194194- started.pid,
195195- started.data,
196196- )),
193193+ fetchers: dict.insert(state.fetchers, feed.id, started),
197194 )
198195199196 log(woof.Info, "Fetcher has registered itself", [
···225222 }
226223227224 case dict.get(state.fetchers, feed.id) {
228228- Ok(#(pid, _)) -> {
229229- case process.is_alive(pid) {
225225+ Ok(actor) -> {
226226+ case process.is_alive(actor.pid) {
230227 True -> Nil
231228 False -> start_new()
232229 }
···238235 }
239236 StopFor(feed) -> {
240237 case dict.get(state.fetchers, feed.id) {
241241- Ok(#(pid, _)) -> {
242242- case process.is_alive(pid) {
243243- True -> process.send_exit(pid)
238238+ Ok(actor) -> {
239239+ case process.is_alive(actor.pid) {
240240+ True -> process.send_exit(actor.pid)
244241 False -> Nil
245242 }
246243 }
···257254 }
258255 RecheckFor(feed) -> {
259256 case dict.get(state.fetchers, feed.id) {
260260- Ok(#(pid, subject)) -> {
261261- case process.is_alive(pid) {
257257+ Ok(actor) -> {
258258+ case process.is_alive(actor.pid) {
262259 True -> {
263263- process.send(subject, CheckFeeds)
260260+ process.send(actor.data, CheckFeeds)
264261 }
265262 False -> Nil
266263 }
···398395 FailedToPersistFailure(sqlight.Error, FetchError)
399396}
400397398398+// TODO: clean up
401399/// tries to fetch a given feed and publish it to the group registry
402400///
403401/// handles the failure cases and incrementsthe feed's cooldown accordingly
404402///
405405-// TODO: clean up
406406-407403fn handle_feed(
408404 location: rss.Location,
409405 state: State,
-39
src/eater/pubsub.gleam
···60606161pub type Message {
6262 FeedUpdate(update: rss.FeedUpdate)
6363- SubscriptionRemoved(rss.Location)
6464- SubscriptionAdded(rss.Location)
6563}
66646765/// get the string 'channel' for a given `rss.Location`
···7775}
78767977// publishing -------------------------------------------------------------------
8080-8181-/// publish a user unsubscribing from a feed
8282-///
8383-pub fn publish_unsubscribed_from_feed(
8484- user user: user.User,
8585- from feed: rss.Location,
8686- in registry: Registry,
8787-) -> Nil {
8888- let registry = group_registry.get_registry(registry)
8989-9090- let members = group_registry.members(registry, user_channel(user))
9191-9292- list.map(members, process.send(_, SubscriptionRemoved(feed)))
9393-9494- log(woof.Info, "Published unsubscribe", [
9595- woof.field("user", user.email),
9696- woof.field("feed", feed.link |> uri.to_string()),
9797- ])
9898-}
9999-100100-/// publish a user subscribing to a feed
101101-///
102102-pub fn publish_subscribed_to_feed(
103103- user user: user.User,
104104- from feed: rss.Location,
105105- in registry: Registry,
106106-) -> Nil {
107107- let registry = group_registry.get_registry(registry)
108108-109109- let members = group_registry.members(registry, user_channel(user))
110110-111111- list.map(members, process.send(_, SubscriptionAdded(feed)))
112112- log(woof.Info, "Published subscription", [
113113- woof.field("user", user.email),
114114- woof.field("feed", feed.link |> uri.to_string()),
115115- ])
116116-}
1177811879/// publish an `rss.FeedUpdate` for a given `rss.Location` in a given registry
11980///
+74-21
src/eater/sender.gleam
···5353/// reexport of the underlying `process.Name` for convenience
5454///
5555pub type FactoryName =
5656- process.Name(factory.Message(Start, Nil))
5656+ process.Name(factory.Message(Start, process.Subject(Message)))
57575858pub fn factory(
5959 name factory: FactoryName,
···9090 registry: pubsub.Registry,
9191 database: sqlight.Connection,
9292 smtp_environment: smtp.SmtpEnvironment,
9393- senders: dict.Dict(uuid.Uuid, process.Pid),
9393+ senders: dict.Dict(uuid.Uuid, actor.Started(process.Subject(Message))),
9494 )
9595}
9696···9999 /// start all senders
100100 StartAll
101101 /// a sender has started and is registering itself
102102- SenderStarted(Result(actor.Started(Nil), actor.StartError), user.User)
102102+ SenderStarted(
103103+ Result(actor.Started(process.Subject(Message)), actor.StartError),
104104+ user.User,
105105+ )
103106104107 // external commands ----------------------------------------------------------
105108 /// start a sender for a given user
···108111 StopFor(user.User)
109112 /// restart the sender for a given user
110113 RestartFor(user.User)
114114+ /// add a subscription for this user
115115+ AddSubscription(user.User, rss.Location)
116116+ /// remove a subscription for this user
117117+ RemoveSubscription(user.User, rss.Location)
111118}
112119113120/// start a sender for a given user
···131138 |> process.send(RestartFor(user))
132139}
133140141141+/// add a subscription for this user
142142+///
143143+pub fn add_subscription(
144144+ manager: ManagerName,
145145+ user: user.User,
146146+ feed: rss.Location,
147147+) {
148148+ process.named_subject(manager)
149149+ |> process.send(AddSubscription(user, feed))
150150+}
151151+152152+/// remove a subscription for this user
153153+///
154154+pub fn remove_subscription(
155155+ manager: ManagerName,
156156+ user: user.User,
157157+ feed: rss.Location,
158158+) {
159159+ process.named_subject(manager)
160160+ |> process.send(RemoveSubscription(user, feed))
161161+}
162162+134163/// starts senders for all existing users on startup
135164/// then also tracks every started sender and handles starting / stopping and restarting
136165///
···183212 // if there is an existing sender for this
184213 // tell it to shut down
185214 case dict.get(state.senders, user.id) {
186186- Ok(pid) ->
187187- case process.is_alive(pid) {
188188- True -> process.send_exit(pid)
215215+ Ok(actor) ->
216216+ case process.is_alive(actor.pid) {
217217+ True -> process.send_exit(actor.pid)
189218 False -> Nil
190219 }
191220 Error(_) -> Nil
···194223 let state =
195224 ManagerState(
196225 ..state,
197197- senders: dict.insert(state.senders, user.id, started.pid),
226226+ senders: dict.insert(state.senders, user.id, started),
198227 )
199228200229 log(woof.Info, "Sender has registered itself", [
···233262 }
234263235264 case dict.get(state.senders, user.id) {
236236- Ok(pid) -> {
237237- case process.is_alive(pid) {
265265+ Ok(actor) -> {
266266+ case process.is_alive(actor.pid) {
238267 True -> Nil
239268 False -> start_new()
240269 }
···246275 }
247276 StopFor(user) -> {
248277 case dict.get(state.senders, user.id) {
249249- Ok(pid) -> {
250250- case process.is_alive(pid) {
251251- True -> process.send_exit(pid)
278278+ Ok(actor) -> {
279279+ case process.is_alive(actor.pid) {
280280+ True -> process.send_exit(actor.pid)
252281 False -> Nil
253282 }
254283 }
···263292264293 actor.continue(state)
265294 }
295295+ AddSubscription(user, feed) -> {
296296+ case dict.get(state.senders, user.id) {
297297+ Ok(actor) -> {
298298+ process.send(actor.data, NewSubscription(feed))
299299+ }
300300+ Error(_) -> Nil
301301+ }
302302+303303+ actor.continue(state)
304304+ }
305305+ RemoveSubscription(user, feed) -> {
306306+ case dict.get(state.senders, user.id) {
307307+ Ok(actor) -> {
308308+ process.send(actor.data, DropSubscription(feed))
309309+ }
310310+ Error(_) -> Nil
311311+ }
312312+313313+ actor.continue(state)
314314+ }
266315 }
267316 })
268317 |> actor.named(name)
···309358310359// actor ------------------------------------------------------------------------
311360312312-fn sender(args: Start) -> Result(actor.Started(Nil), actor.StartError) {
361361+fn sender(
362362+ args: Start,
363363+) -> Result(actor.Started(process.Subject(Message)), actor.StartError) {
313364 let Start(database:, registry:, user:, smtp_environment:, manager:) = args
314365315366 let actor_started =
···331382332383 actor.initialised(state)
333384 |> actor.selecting(state.selector)
385385+ |> actor.returning(self)
334386 |> Ok
335387 })
336388 |> actor.on_message(on_message)
···345397346398// message ----------------------------------------------------------------------
347399348348-type Message {
400400+pub opaque type Message {
349401 /// sent once at the start to fetch all feeds
350402 ///
351403 GetFeeds
352404 PubSubMessage(pubsub.Message)
353405 Retry(message: pubsub.Message, attempt: Int)
406406+407407+ NewSubscription(rss.Location)
408408+ DropSubscription(rss.Location)
354409}
355410356411// message handling -------------------------------------------------------------
···417472 }
418473 }
419474 }
420420- PubSubMessage(pubsub.SubscriptionRemoved(feed)) -> {
421421- let state = drop_feed(state, feed)
422422- actor.continue(state)
423423- }
424424- PubSubMessage(pubsub.SubscriptionAdded(feed)) -> {
475475+ NewSubscription(feed) -> {
425476 let state = add_feed(state, feed)
426477427478 actor.continue(state)
428479 |> actor.with_selector(state.selector)
480480+ }
481481+ DropSubscription(feed) -> {
482482+ let state = drop_feed(state, feed)
483483+ actor.continue(state)
429484 }
430485 Retry(message: pubsub.FeedUpdate(update:), attempt:) if attempt < 5 -> {
431486 let handled = handle_feed_update(state, update)
···464519 ])
465520 actor.continue(state)
466521 }
467467- Retry(message:, ..) ->
468468- actor.stop_abnormal(string.inspect(message) <> "cannot be retried")
469522 }
470523}
471524