Select the types of activity you want to include in your feed.
add `backend.gleam`, turn `sender.starter` and `fetcher.starter` into
`*.manager`
implemented most of the features in the readme, do still need to test em
···3939 - [ ] ewe logs
40404141- [ ] backend manager
4242- - [ ] `backend.new_subscription(backend_name, user, feed)`
4343- - [ ] save new subscription to database
4444- - [ ] notify responsible sender
4545- - [ ] start sender if not running
4646- - [ ] start fetcher if not running
4747- - [ ] `backend.remove_subscription(backend_name, user, feed)`
4848- - [ ] delete from database
4949- - [ ] notify sender
5050- - [ ] if noone is subscribed anymore
5151- - [ ] remove feed from database
5252- - [ ] stop fetcher
4242+ - [ ] TESTING THIS
4343+ - [x] `backend.new_subscription(backend_name, user, feed)`
4444+ - [x] save new subscription to database
4545+ - [x] notify responsible sender
4646+ - [x] start sender if not running
4747+ - [x] start fetcher if not running
4848+ - [x] `backend.remove_subscription(backend_name, user, feed)`
4949+ - [x] delete from database
5050+ - [x] notify sender
5151+ - [x] if noone is subscribed anymore
5252+ - [x] remove feed from database
5353+ - [x] stop fetcher
5354 - [ ] `backend.status(backend_name)`
5455 - [ ] fetcher data
5556 - [ ] time till next fetch
···5758 - [ ] sender data
5859 - [ ] status ok / issues
5960 - [ ] maybe mails sent? (we do have this data)
6060- - [ ] `backend.restart_feed(backend_name, feed)`
6161+ - [x] `backend.restart_feed(backend_name, feed)`
6162 - restart the fetcher for this feed
6262- - [ ] `backend.refetch(backend_name, feed)`
6363+ - [x] `backend.refetch(backend_name, feed)`
6364 - trigger early fetching of a given feed
6464- - [ ] `backend.restart_user(backend_name, user)`
6565+ - [x] `backend.restart_user(backend_name, user)`
6566 - restart sender for this user
6666- - [ ] `backend.find_feed(backend_name, uri) -> Result(rss.Location, Nil)`
6767- - [ ] check database
6868- - [ ] found -> found `rss.Location`
6969- - [ ] not found -> `rss.new_location`
7070- - [ ] db_error -> Nil
6767+ - [x] `backend.find_feed(backend_name, uri) -> Result(rss.Location, Nil)`
6868+ - [x] check database
6969+ - [x] found -> found `rss.Location`
7070+ - [x] not found -> `rss.new_location`
7171+ - [x] db_error -> Nil
717272737374
+14-14
src/eater.gleam
···1616// This software is provided "AS IS", WITHOUT WARRANTY OF ANY KIND. [cite: 5]
1717// See the Licence for the specific language governing permissions and limitations. [cite: 6]
18181919+import eater/backend
1920import eater/configuration
2021import eater/database
2122import eater/fetcher
···5051 let assert Ok(_) =
5152 ensure_default_admin_exists(database, smtp_environment, configuration)
52535454+ let backend = process.new_name("backend")
5355 let registry = process.new_name("registry")
5656+ let sender_factory = process.new_name("sender_factory")
5757+ let sender_manager = process.new_name("sender_starter")
5458 let fetcher_factory = process.new_name("fetcher_factory")
5555- let fetcher_starter = process.new_name("fetcher_starter")
5656- let sender_factory = process.new_name("sender_factory")
5757- let sender_starter = process.new_name("sender_starter")
5959+ let fetcher_manager = process.new_name("fetcher_starter")
58605961 let assert Ok(_supervisor) =
6062 supervisor.new(supervisor.RestForOne)
6161- |> supervisor.add(group_registry.supervised(registry))
6262- |> supervisor.add(fetcher.factory(
6363- name: fetcher_factory,
6464- starter: fetcher_starter,
6565- registry:,
6666- database:,
6767- ))
6868- |> supervisor.add(sender.factory(
6969- name: sender_factory,
7070- starter: sender_starter,
7171- registry:,
6363+ |> supervisor.add(backend.supervised(
6464+ names: backend.Names(
6565+ backend:,
6666+ registry:,
6767+ sender_factory:,
6868+ sender_manager:,
6969+ fetcher_factory:,
7070+ fetcher_manager:,
7171+ ),
7272 database:,
7373 smtp_environment:,
7474 ))
+409
src/eater/backend.gleam
···11+// eater
22+// Copyright (C) 2026 Olivia Streun and contributors. [cite: 4]
33+//
44+// This software is licensed under the European Union Public Licence (EUPL) v1.2.
55+// You may not use this work except in compliance with the Licence.
66+// You may obtain a copy of the Licence at: https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
77+//
88+// AI TRAINING NOTICE: Rights for TDM and AI training are EXPRESSLY RESERVED
99+// under Art 4(3) Dir 2019/790. AI training constitutes a Derivative Work.
1010+// See LICENSE file in the repository root for full details.
1111+//
1212+//
1313+// This software is provided "AS IS", WITHOUT WARRANTY OF ANY KIND. [cite: 5]
1414+// See the Licence for the specific language governing permissions and limitations. [cite: 6]
1515+1616+import eater/database
1717+import eater/feed/rss
1818+import eater/fetcher
1919+import eater/pubsub
2020+import eater/sender
2121+import eater/smtp
2222+import eater/user
2323+import gleam/erlang/process
2424+import gleam/otp/actor
2525+import gleam/otp/static_supervisor as supervisor
2626+import gleam/otp/supervision
2727+import gleam/result
2828+import gleam/string
2929+import gleam/uri
3030+import sqlight
3131+import woof
3232+3333+// public stuff -----------------------------------------------------------------
3434+3535+pub type Names {
3636+ Names(
3737+ backend: process.Name(Message),
3838+ registry: pubsub.Registry,
3939+ sender_factory: sender.FactoryName,
4040+ sender_manager: sender.ManagerName,
4141+ fetcher_factory: fetcher.FactoryName,
4242+ fetcher_manager: fetcher.ManagerName,
4343+ )
4444+}
4545+4646+pub fn supervised(
4747+ names names: Names,
4848+ database database: sqlight.Connection,
4949+ smtp_environment smtp_environment: smtp.SmtpEnvironment,
5050+) -> supervision.ChildSpecification(supervisor.Supervisor) {
5151+ let Names(
5252+ backend: _,
5353+ registry:,
5454+ sender_factory:,
5555+ sender_manager:,
5656+ fetcher_factory:,
5757+ fetcher_manager:,
5858+ ) = names
5959+6060+ supervisor.new(supervisor.RestForOne)
6161+ |> supervisor.add(pubsub.supervised(registry))
6262+ |> supervisor.add(fetcher.factory(
6363+ name: fetcher_factory,
6464+ starter: fetcher_manager,
6565+ registry:,
6666+ database:,
6767+ ))
6868+ |> supervisor.add(sender.factory(
6969+ name: sender_factory,
7070+ starter: sender_manager,
7171+ registry:,
7272+ database:,
7373+ smtp_environment:,
7474+ ))
7575+ |> supervisor.add(backend_manager(names, database, smtp_environment))
7676+ |> supervisor.supervised()
7777+}
7878+7979+// main api ---------------------------------------------------------------------
8080+8181+/// add a new subscription for a user
8282+///
8383+pub fn new_subscription(
8484+ backend backend: process.Name(Message),
8585+ user user: user.User,
8686+ feed feed: rss.Location,
8787+) -> Nil {
8888+ let backend = process.named_subject(backend)
8989+ process.send(backend, NewSubscription(user, feed))
9090+}
9191+9292+/// remove a subscription from a user
9393+///
9494+pub fn remove_subscription(
9595+ backend backend: process.Name(Message),
9696+ user user: user.User,
9797+ feed feed: rss.Location,
9898+) -> Nil {
9999+ let backend = process.named_subject(backend)
100100+ process.send(backend, RemoveSubscription(user, feed))
101101+}
102102+103103+/// to be fleshed out as it becomes clearer what this entails
104104+pub type Status
105105+106106+/// get the status of the backend
107107+///
108108+/// waits for the response from the backend
109109+///
110110+pub fn status(backend backend: process.Name(Message)) -> Result(Status, Nil) {
111111+ let backend = process.named_subject(backend)
112112+ let send_to = process.new_subject()
113113+114114+ process.send(backend, Status(send_to:))
115115+116116+ process.receive(send_to, within: 1000)
117117+}
118118+119119+/// restart the fetcher for this feed
120120+///
121121+pub fn restart_feed(
122122+ backend backend: process.Name(Message),
123123+ feed feed: rss.Location,
124124+) -> Nil {
125125+ let backend = process.named_subject(backend)
126126+ process.send(backend, RestartFeed(feed))
127127+}
128128+129129+/// trigger an early refetch for this feed
130130+///
131131+pub fn refetch(
132132+ backend backend: process.Name(Message),
133133+ feed feed: rss.Location,
134134+) -> Nil {
135135+ let backend = process.named_subject(backend)
136136+ process.send(backend, Refetch(feed))
137137+}
138138+139139+/// restart the sender for this user
140140+///
141141+pub fn restart_user(
142142+ backend backend: process.Name(Message),
143143+ user user: user.User,
144144+) -> Nil {
145145+ let backend = process.named_subject(backend)
146146+ process.send(backend, RestartUser(user))
147147+}
148148+149149+/// find a feed using a uri
150150+///
151151+pub fn find_feed(
152152+ backend backend: process.Name(Message),
153153+ uri uri: uri.Uri,
154154+) -> Result(rss.Location, Nil) {
155155+ let backend = process.named_subject(backend)
156156+157157+ let send_to = process.new_subject()
158158+159159+ process.send(backend, FindFeed(uri, send_to:))
160160+161161+ process.receive(send_to, within: 1000)
162162+ |> result.flatten()
163163+}
164164+165165+pub opaque type Message {
166166+167167+ // external messages ----------------------------------------------------------
168168+ /// add a subscription for a user
169169+ ///
170170+ /// - save to database
171171+ /// - notify sender
172172+ /// - start sender if not running
173173+ /// - start fetcher if not running
174174+ ///
175175+ NewSubscription(user.User, rss.Location)
176176+177177+ /// remove a subscription from a user
178178+ ///
179179+ /// - delete from database
180180+ /// - notify sender
181181+ /// - if noone is subscribed
182182+ /// - stop fetcher
183183+ /// - remove feed from database
184184+ ///
185185+ RemoveSubscription(user.User, rss.Location)
186186+187187+ /// get the status of the backend
188188+ ///
189189+ /// - fetcher data
190190+ /// - time till next fetch
191191+ /// - status ok / issue
192192+ /// - sender data
193193+ /// - status ok / issue
194194+ /// - total mails sent (maybe)
195195+ ///
196196+ Status(send_to: process.Subject(Status))
197197+198198+ /// restart the fetcher for this feed
199199+ ///
200200+ RestartFeed(rss.Location)
201201+202202+ /// trigger an early refetch for this feed
203203+ ///
204204+ Refetch(rss.Location)
205205+206206+ /// restart the sender for this user
207207+ ///
208208+ RestartUser(user.User)
209209+210210+ /// find a feed using a uri
211211+ ///
212212+ /// - check the database
213213+ /// - found -> send the associated `rss.Location` to `send_to`
214214+ /// - not found -> `rss.new_location` and send to `send_to`
215215+ ///
216216+ FindFeed(uri.Uri, send_to: process.Subject(Result(rss.Location, Nil)))
217217+ // internal messages ----------------------------------------------------------
218218+}
219219+220220+type State {
221221+ State(
222222+ self: process.Subject(Message),
223223+ names: Names,
224224+ database: sqlight.Connection,
225225+ smtp_environment: smtp.SmtpEnvironment,
226226+ logger: woof.Logger,
227227+ )
228228+}
229229+230230+// actor ------------------------------------------------------------------------
231231+232232+fn backend_manager(
233233+ names: Names,
234234+ database: sqlight.Connection,
235235+ smtp_environment: smtp.SmtpEnvironment,
236236+) -> supervision.ChildSpecification(Nil) {
237237+ supervision.worker(fn() {
238238+ actor.new_with_initialiser(200, fn(self) {
239239+ actor.initialised(State(
240240+ self:,
241241+ names:,
242242+ database:,
243243+ smtp_environment:,
244244+ logger: woof.new("BACKEND-MANAGER"),
245245+ ))
246246+ |> Ok
247247+ })
248248+ |> actor.named(names.backend)
249249+ |> actor.on_message(on_message)
250250+ |> actor.start()
251251+ })
252252+}
253253+254254+fn on_message(state: State, message: Message) -> actor.Next(State, Message) {
255255+ case message {
256256+ // external messages --------------------------------------------------------
257257+ NewSubscription(user, feed) -> handle_new_subscription(state, user, feed)
258258+ RemoveSubscription(user, feed) ->
259259+ handle_remove_subscription(state, user, feed)
260260+ Status(send_to: _) -> todo as "get status of system"
261261+ RestartFeed(feed) -> {
262262+ fetcher.restart_for(state.names.fetcher_manager, feed)
263263+ actor.continue(state)
264264+ }
265265+ Refetch(feed) -> {
266266+ fetcher.refetch_for(state.names.fetcher_manager, feed)
267267+ actor.continue(state)
268268+ }
269269+ RestartUser(user) -> {
270270+ sender.restart_for(state.names.sender_manager, user)
271271+ actor.continue(state)
272272+ }
273273+ FindFeed(uri, send_to:) -> {
274274+ case database.feed_by_link(uri, state.database) {
275275+ Error(_) -> Error(Nil)
276276+ Ok([]) -> Ok(rss.new_location(uri))
277277+ Ok([feed, ..]) -> Ok(feed)
278278+ }
279279+ |> process.send(send_to, _)
280280+ actor.continue(state)
281281+ }
282282+ // internal messages --------------------------------------------------------
283283+ }
284284+}
285285+286286+/// add a subscription for a user
287287+///
288288+/// - save to database
289289+/// - notify sender
290290+/// - start sender if not running
291291+/// - start fetcher if not running
292292+///
293293+fn handle_new_subscription(
294294+ state: State,
295295+ user: user.User,
296296+ feed: rss.Location,
297297+) -> actor.Next(State, Message) {
298298+ // - save to database
299299+ use _ <- try_twice(
300300+ fn() { database.add_feed(feed, state.database) },
301301+ otherwise: log_and_stop(state.logger, "Failed to add feed to database", [
302302+ woof.field("feed", feed.link |> uri.to_string()),
303303+ ]),
304304+ )
305305+306306+ // - notify sender
307307+ pubsub.publish_subscribed_to_feed(user, feed, state.names.registry)
308308+309309+ // - start sender if not running
310310+ sender.start_for(state.names.sender_manager, user)
311311+ // - start fetcher if not running
312312+ fetcher.start_for(state.names.fetcher_manager, feed)
313313+314314+ actor.continue(state)
315315+}
316316+317317+/// remove a subscription from a user
318318+///
319319+/// - delete from database
320320+/// - notify sender
321321+/// - if noone is subscribed
322322+/// - stop fetcher
323323+/// - remove feed from database
324324+///
325325+fn handle_remove_subscription(
326326+ state: State,
327327+ user: user.User,
328328+ feed: rss.Location,
329329+) -> actor.Next(State, Message) {
330330+ // - delete from database
331331+ use _ <- try_twice(
332332+ fn() { database.delete_subscription(user, feed, state.database) },
333333+ otherwise: log_and_stop(state.logger, "Failed delete subscription", [
334334+ woof.field("feed", feed.link |> uri.to_string()),
335335+ woof.field("user", user.email),
336336+ ]),
337337+ )
338338+339339+ // - notify sender
340340+ pubsub.publish_unsubscribed_from_feed(user, feed, state.names.registry)
341341+342342+ use subscribers <- try_twice(
343343+ fn() { database.subscription_count(feed, state.database) },
344344+ otherwise: log_and_stop(
345345+ state.logger,
346346+ "Failed get subscriber count from database",
347347+ [
348348+ woof.field("feed", feed.link |> uri.to_string()),
349349+ ],
350350+ ),
351351+ )
352352+353353+ case subscribers {
354354+ // - if noone is subscribed
355355+ 0 -> {
356356+ // - stop fetcher
357357+ fetcher.stop_for(state.names.fetcher_manager, feed)
358358+ // - remove feed from database
359359+ use _ <- try_twice(
360360+ fn() { database.delete_feed(feed, state.database) },
361361+ otherwise: log_and_stop(
362362+ state.logger,
363363+ "Failed to delete feed from database",
364364+ [
365365+ woof.field("feed", feed.link |> uri.to_string()),
366366+ ],
367367+ ),
368368+ )
369369+370370+ actor.continue(state)
371371+ }
372372+ _ -> actor.continue(state)
373373+ }
374374+}
375375+376376+// helpers ----------------------------------------------------------------------
377377+378378+/// returns a function that logs an error
379379+/// to the supplied logger with the supplied message
380380+/// and then returns `actor.stop_abnormal` with that same message
381381+///
382382+fn log_and_stop(
383383+ logger logger: woof.Logger,
384384+ message message: String,
385385+ fields fields: List(#(String, String)),
386386+) -> fn(a) -> actor.Next(b, c) {
387387+ fn(error) {
388388+ woof.log(logger, woof.Error, message, [
389389+ woof.field("details", string.inspect(error)),
390390+ ..fields
391391+ ])
392392+ actor.stop_abnormal(message)
393393+ }
394394+}
395395+396396+fn try_twice(
397397+ try try: fn() -> Result(a, b),
398398+ otherwise otherwise: fn(b) -> c,
399399+ continue continue: fn(a) -> c,
400400+) -> c {
401401+ case try() {
402402+ Ok(value) -> continue(value)
403403+ Error(_) ->
404404+ case try() {
405405+ Ok(value) -> continue(value)
406406+ Error(error) -> otherwise(error)
407407+ }
408408+ }
409409+}
+1-1
src/eater/database.gleam
···337337 |> Ok
338338}
339339340340-/// feeds a given user is subscribed to
340340+/// how many users are subscribed to this feed
341341///
342342pub fn subscription_count(
343343 for feed: rss.Location,
+160-90
src/eater/fetcher.gleam
···1919import eater/database
2020import eater/feed/rss
2121import eater/pubsub
2222+import gleam/dict
2223import gleam/erlang/process
2324import gleam/http
2425import gleam/http/request
···3536import parsed_it/xml
3637import sqlight
3738import woof
3939+import youid/uuid
38403941const interval_in_ms: Int = 3_600_000
4042···5254/// reexport of the underlying `process.Name` for convenience
5355///
5456pub type FactoryName =
5555- process.Name(factory.Message(Start, Nil))
5757+ process.Name(factory.Message(Start, process.Subject(Message)))
56585759/// the factory for the fetcher actors
5860///
5961pub fn factory(
6062 name factory: FactoryName,
6161- starter starter_name: StarterName,
6363+ starter starter_name: ManagerName,
6264 registry registry: pubsub.Registry,
6365 database database: sqlight.Connection,
6466) -> supervision.ChildSpecification(_) {
···7274 |> static_supervisor.supervised()
7375}
74767575-type StarterState {
7676- StarterState(
7777- self: process.Subject(StarterMessage),
7777+type ManagerState {
7878+ ManagerState(
7979+ self: process.Subject(ManagerMessage),
7880 factory: FactoryName,
7981 registry: pubsub.Registry,
8082 database: sqlight.Connection,
8383+ fetchers: dict.Dict(uuid.Uuid, #(process.Pid, process.Subject(Message))),
8184 )
8285}
83868484-pub opaque type StarterMessage {
8787+pub opaque type ManagerMessage {
8588 // initial startup ------------------------------------------------------------
8689 /// start the process
8790 StartAll
8888- /// fetch feeds from db (attempt 1)
8989- DatabaseReturnedFeeds(Result(List(rss.Location), sqlight.Error))
9090- /// fetch feeds from db (attempt 2)
9191- FetchAgain
9191+ /// fetch feeds from db
9292+ DatabaseReturnedFeeds(List(rss.Location))
92939393- /// start fetcher (attempt 1)
9494- Started(Result(actor.Started(Nil), actor.StartError), rss.Location)
9595- /// start fetcher (attempt 2)
9696- StartAgain(rss.Location)
9494+ /// start fetcher
9595+ Started(
9696+ Result(actor.Started(process.Subject(Message)), actor.StartError),
9797+ rss.Location,
9898+ )
979998100 // runtime additions ----------------------------------------------------------
99101 /// start a sender for a given user at runtime
100102 StartFor(rss.Location)
103103+ /// stop the fetcher for a given feed at runtime
104104+ StopFor(rss.Location)
105105+ /// restart the fetcher for a given feed at runtime
106106+ RestartFor(rss.Location)
107107+ /// trigger an early refetching of this feed
108108+ RecheckFor(rss.Location)
101109}
102110103103-pub type StarterName =
104104- process.Name(StarterMessage)
111111+pub type ManagerName =
112112+ process.Name(ManagerMessage)
105113106106-pub fn start_for(starter: StarterName, feed: rss.Location) {
114114+pub fn start_for(starter: ManagerName, feed: rss.Location) {
107115 process.named_subject(starter)
108116 |> process.send(StartFor(feed))
109117}
110118119119+pub fn stop_for(manager: ManagerName, feed: rss.Location) {
120120+ process.named_subject(manager)
121121+ |> process.send(StopFor(feed))
122122+}
123123+124124+pub fn restart_for(manager: ManagerName, feed: rss.Location) {
125125+ process.named_subject(manager)
126126+ |> process.send(RestartFor(feed))
127127+}
128128+129129+pub fn refetch_for(manager: ManagerName, feed: rss.Location) {
130130+ process.named_subject(manager)
131131+ |> process.send(RecheckFor(feed))
132132+}
133133+111134fn starter(
112112- name: StarterName,
135135+ name: ManagerName,
113136 factory: FactoryName,
114137 registry: pubsub.Registry,
115138 database: sqlight.Connection,
···123146 actor.new_with_initialiser(1000, fn(self) {
124147 process.send_after(self, 100, StartAll)
125148126126- actor.initialised(StarterState(self:, factory:, registry:, database:))
149149+ actor.initialised(ManagerState(
150150+ self:,
151151+ factory:,
152152+ registry:,
153153+ database:,
154154+ fetchers: dict.new(),
155155+ ))
127156 |> Ok
128157 })
129158 |> actor.on_message(fn(state, message) {
130159 case message {
131160 StartAll -> {
132132- database.all_feeds(database)
133133- |> DatabaseReturnedFeeds
134134- |> actor.send(state.self, _)
161161+ case database.all_feeds(database) {
162162+ Ok(feeds) -> {
163163+ DatabaseReturnedFeeds(feeds) |> actor.send(state.self, _)
164164+ actor.continue(state)
165165+ }
166166+ // have the supervision handle retrying
167167+ Error(_) -> actor.stop_abnormal("Failed to get feeds from database")
168168+ }
135169136170 actor.continue(state)
137171 }
138138- DatabaseReturnedFeeds(Ok(feeds)) -> {
172172+ DatabaseReturnedFeeds(feeds) -> {
139173 feeds
140174 |> list.map(fn(feed) {
141141- start_new(factory, Start(database:, registry:, feed:))
175175+ start_new(
176176+ factory,
177177+ Start(database:, registry:, feed:, manager: name),
178178+ )
142179 |> Started(feed)
143180 |> actor.send(state.self, _)
144181 })
145182146183 actor.continue(state)
147184 }
148148- DatabaseReturnedFeeds(Error(db_error)) -> {
149149- log(woof.Warning, "Failed to get feeds from database once", [
150150- woof.field("details", string.inspect(db_error)),
151151- ])
152185153153- actor.send(state.self, FetchAgain)
154154- actor.continue(state)
155155- }
156156- FetchAgain -> {
157157- case database.all_feeds(database) {
158158- Ok(feeds) -> {
159159- DatabaseReturnedFeeds(Ok(feeds))
160160- |> actor.send(state.self, _)
186186+ // all starts go through this
187187+ Started(Ok(started), feed) -> {
188188+ // if there is an existing sender for this
189189+ // tell it to shut down
190190+ case dict.get(state.fetchers, feed.id) {
191191+ Ok(#(pid, _)) ->
192192+ case process.is_alive(pid) {
193193+ True -> process.send_exit(pid)
194194+ False -> Nil
195195+ }
196196+ Error(_) -> Nil
197197+ }
161198162162- actor.continue(state)
163163- }
164164- Error(error) -> {
165165- log(woof.Warning, "Failed to get feeds from database twice", [
166166- woof.field("details", string.inspect(error)),
167167- ])
199199+ let state =
200200+ ManagerState(
201201+ ..state,
202202+ fetchers: dict.insert(state.fetchers, feed.id, #(
203203+ started.pid,
204204+ started.data,
205205+ )),
206206+ )
168207169169- actor.stop_abnormal("Failed to get feeds from database twice")
170170- }
171171- }
208208+ actor.continue(state)
172209 }
173210174174- // all starts go through this
175175- // TODO: track started subjects and add StopFor
176176- Started(Ok(_), _user) -> actor.continue(state)
177177-178211 Started(Error(start_error), feed) -> {
179179- log(woof.Warning, "Failed to start fetcher once", [
212212+ log(woof.Error, "Failed to start fetcher", [
180213 woof.field("feed", feed.link |> uri.to_string()),
181214 woof.field("details", string.inspect(start_error)),
182215 ])
183216184184- StartAgain(feed)
185185- |> actor.send(state.self, _)
217217+ actor.stop_abnormal(
218218+ "Fetcher failed to start, i dont know how this would happen",
219219+ )
220220+ }
221221+ // runtime additions ----------------------------------------------------
222222+ StartFor(feed) -> {
223223+ let start_new = fn() {
224224+ let _ =
225225+ start_new(
226226+ factory,
227227+ Start(database:, registry:, feed:, manager: name),
228228+ )
229229+ Nil
230230+ }
231231+232232+ case dict.get(state.fetchers, feed.id) {
233233+ Ok(#(pid, _)) -> {
234234+ case process.is_alive(pid) {
235235+ True -> Nil
236236+ False -> start_new()
237237+ }
238238+ }
239239+ Error(_) -> start_new()
240240+ }
241241+186242 actor.continue(state)
187243 }
188188- StartAgain(feed) -> {
189189- case start_new(factory, Start(database:, registry:, feed:)) {
190190- Ok(started) -> {
191191- actor.send(state.self, Started(Ok(started), feed))
192192- actor.continue(state)
244244+ StopFor(feed) -> {
245245+ case dict.get(state.fetchers, feed.id) {
246246+ Ok(#(pid, _)) -> {
247247+ case process.is_alive(pid) {
248248+ True -> process.send_exit(pid)
249249+ False -> Nil
250250+ }
193251 }
194194- Error(error) -> {
195195- log(woof.Warning, "Failed to start fetcher twice", [
196196- woof.field("feed", feed.link |> uri.to_string()),
197197- woof.field("details", string.inspect(error)),
198198- ])
252252+ Error(_) -> Nil
253253+ }
254254+255255+ actor.continue(state)
256256+ }
257257+ RestartFor(user) -> {
258258+ process.send(state.self, StopFor(user))
259259+ process.send(state.self, StartFor(user))
199260200200- actor.stop_abnormal(
201201- "Failed to start fetcher for feed "
202202- <> feed.link |> uri.to_string()
203203- <> " twice",
204204- )
261261+ actor.continue(state)
262262+ }
263263+ RecheckFor(feed) -> {
264264+ case dict.get(state.fetchers, feed.id) {
265265+ Ok(#(pid, subject)) -> {
266266+ case process.is_alive(pid) {
267267+ True -> {
268268+ process.send(subject, CheckFeeds)
269269+ }
270270+ False -> Nil
271271+ }
205272 }
273273+ Error(_) -> Nil
206274 }
207207- }
208208- // runtime additions ----------------------------------------------------
209209- StartFor(feed) -> {
210210- start_new(factory, Start(database:, registry:, feed:))
211211- |> Started(feed)
212212- |> actor.send(state.self, _)
213275214276 actor.continue(state)
215277 }
···240302 feed: rss.Location,
241303 registry: pubsub.Registry,
242304 database: sqlight.Connection,
305305+ manager: ManagerName,
243306 )
244307}
245308···264327/// starts a new fetcher child
265328///
266329fn start(args: Start) -> Result(actor.Started(_), actor.StartError) {
267267- let Start(feed:, registry:, database:) = args
330330+ let Start(feed:, registry:, database:, manager:) = args
268331269332 log(woof.Info, "Starting", [woof.field("feed", feed.link |> uri.to_string())])
270333271271- actor.new_with_initialiser(100, fn(subject) {
272272- // TODO: persist the next timestamp to check at to the database
273273- // so the actor restarting doesnt preeptively recheck the feed
274274- let fetch_timer = process.send_after(subject, 150, CheckFeeds)
334334+ let started =
335335+ actor.new_with_initialiser(100, fn(subject) {
336336+ // TODO: persist the next timestamp to check at to the database
337337+ // so the actor restarting doesnt preeptively recheck the feed
338338+ let fetch_timer = process.send_after(subject, 150, CheckFeeds)
275339276276- actor.initialised(State(
277277- self: subject,
278278- fetch_timer:,
279279- feed:,
280280- registry:,
281281- database:,
282282- ))
283283- |> actor.returning(Nil)
284284- |> Ok
285285- })
286286- |> actor.on_message(on_message)
287287- |> actor.start()
340340+ actor.initialised(State(
341341+ self: subject,
342342+ fetch_timer:,
343343+ feed:,
344344+ registry:,
345345+ database:,
346346+ ))
347347+ |> actor.returning(subject)
348348+ |> Ok
349349+ })
350350+ |> actor.on_message(on_message)
351351+ |> actor.start()
352352+353353+ // let the manager know we started
354354+ let manager = process.named_subject(manager)
355355+ process.send(manager, Started(started, feed))
356356+357357+ started
288358}
289359290360/// handle messages
+196-146
src/eater/sender.gleam
···2323import eater/user
2424import gcourier/smtp as gsmtp
2525import gleam/bool
2626+import gleam/dict
2627import gleam/erlang/process
2728import gleam/list
2829import gleam/otp/actor
···3435import gleam/uri
3536import sqlight
3637import woof
3838+import youid/uuid
37393840/// log with module related structured data
3941///
···55575658pub fn factory(
5759 name factory: FactoryName,
5858- starter starter_name: StarterName,
6060+ starter starter_name: ManagerName,
5961 registry registry: pubsub.Registry,
6062 database database: sqlight.Connection,
6163 smtp_environment smtp_environment: smtp.SmtpEnvironment,
···6668 |> factory.named(factory)
6769 |> factory.supervised(),
6870 )
6969- |> static_supervisor.add(starter(
7171+ |> static_supervisor.add(manager(
7072 starter_name,
7173 factory,
7274 registry,
···78807981// starting ---------------------------------------------------------------------
80828181-pub type StarterName =
8282- process.Name(StarterMessage)
8383+pub type ManagerName =
8484+ process.Name(ManagerMessage)
83858484-type StarterState {
8585- StarterState(
8686- self: process.Subject(StarterMessage),
8686+type ManagerState {
8787+ ManagerState(
8888+ self: process.Subject(ManagerMessage),
8789 factory: FactoryName,
8890 registry: pubsub.Registry,
8991 database: sqlight.Connection,
9092 smtp_environment: smtp.SmtpEnvironment,
9393+ senders: dict.Dict(uuid.Uuid, process.Pid),
9194 )
9295}
93969494-pub opaque type StarterMessage {
9797+pub opaque type ManagerMessage {
9598 // initial startup ------------------------------------------------------------
9699 /// start the process
97100 StartAll
9898- /// fetch users from db (attempt 1)
9999- DatabaseReturnedUsers(Result(List(user.User), sqlight.Error))
100100- /// fetch users from db (attempt 2)
101101- FetchAgain
102102-103103- /// start sender (attempt 1)
101101+ /// fetch users from db
102102+ DatabaseReturnedUsers(List(user.User))
103103+ /// start sender
104104 Started(Result(actor.Started(Nil), actor.StartError), user.User)
105105- /// start sender (attempt 2)
106106- StartAgain(user.User)
107105108106 // runtime additions ------------------------------------------------------------
109107 /// start a sender for a given user at runtime
110108 StartFor(user.User)
109109+ /// stop the sender for a given user at runtime
110110+ StopFor(user.User)
111111+ /// restart the sender for a given user at runtime
112112+ RestartFor(user.User)
111113}
112114113113-pub fn start_for(starter: StarterName, user: user.User) {
114114- process.named_subject(starter)
115115+pub fn start_for(manager: ManagerName, user: user.User) {
116116+ process.named_subject(manager)
115117 |> process.send(StartFor(user))
116118}
117119120120+pub fn stop_for(manager: ManagerName, user: user.User) {
121121+ process.named_subject(manager)
122122+ |> process.send(StopFor(user))
123123+}
124124+125125+pub fn restart_for(manager: ManagerName, user: user.User) {
126126+ process.named_subject(manager)
127127+ |> process.send(RestartFor(user))
128128+}
129129+118130/// handles starting all senders on startup and then shuts down
119131///
120120-fn starter(
121121- name: StarterName,
132132+fn manager(
133133+ name: ManagerName,
122134 factory: FactoryName,
123135 registry: pubsub.Registry,
124136 database: sqlight.Connection,
···133145 actor.new_with_initialiser(1000, fn(self) {
134146 process.send_after(self, 100, StartAll)
135147136136- actor.initialised(StarterState(
148148+ actor.initialised(ManagerState(
137149 self:,
138150 factory:,
139151 registry:,
140152 database:,
141153 smtp_environment:,
154154+ senders: dict.new(),
142155 ))
143156 |> Ok
144157 })
···146159 case message {
147160 // initial start --------------------------------------------------------
148161 StartAll -> {
149149- database.all_users(database)
150150- |> DatabaseReturnedUsers
151151- |> actor.send(state.self, _)
162162+ case database.all_users(database) {
163163+ Ok(users) -> {
164164+ DatabaseReturnedUsers(users)
165165+ |> actor.send(state.self, _)
166166+167167+ actor.continue(state)
168168+ }
169169+ Error(_) -> actor.stop_abnormal("Failed to get users from database")
170170+ }
152171153172 actor.continue(state)
154173 }
155155- DatabaseReturnedUsers(Ok(users)) -> {
174174+ DatabaseReturnedUsers(users) -> {
156175 users
157176 |> list.map(fn(user) {
158177 start_new(
159178 factory,
160160- Start(database:, registry:, user:, smtp_environment:),
179179+ Start(
180180+ database:,
181181+ registry:,
182182+ user:,
183183+ smtp_environment:,
184184+ manager: name,
185185+ ),
161186 )
162187 |> Started(user)
163188 |> actor.send(state.self, _)
···165190166191 actor.continue(state)
167192 }
168168- DatabaseReturnedUsers(Error(db_error)) -> {
169169- log(woof.Warning, "Failed to get users from database once", [
170170- woof.field("details", string.inspect(db_error)),
171171- ])
193193+ // all starts go through this
194194+ Started(Ok(started), user) -> {
195195+ // if there is an existing sender for this
196196+ // tell it to shut down
197197+ case dict.get(state.senders, user.id) {
198198+ Ok(pid) ->
199199+ case process.is_alive(pid) {
200200+ True -> process.send_exit(pid)
201201+ False -> Nil
202202+ }
203203+ Error(_) -> Nil
204204+ }
172205173173- actor.send(state.self, FetchAgain)
206206+ let state =
207207+ ManagerState(
208208+ ..state,
209209+ senders: dict.insert(state.senders, user.id, started.pid),
210210+ )
211211+174212 actor.continue(state)
175213 }
176176- FetchAgain -> {
177177- case database.all_users(database) {
178178- Ok(users) -> {
179179- DatabaseReturnedUsers(Ok(users))
180180- |> actor.send(state.self, _)
181181-182182- actor.continue(state)
183183- }
184184- Error(error) -> {
185185- log(woof.Warning, "Failed to get users from database twice", [
186186- woof.field("details", string.inspect(error)),
187187- ])
188188-189189- actor.stop_abnormal("Failed to get users from database twice")
190190- }
191191- }
192192- }
193193- // all starts go through this
194194- // TODO: track started subjects and add StopFor
195195- Started(Ok(_), _user) -> actor.continue(state)
196214197215 Started(Error(start_error), user) -> {
198198- log(woof.Warning, "Failed to start sender once", [
216216+ log(woof.Error, "Failed to start sender", [
199217 woof.field("user", user.email),
200218 woof.field("details", string.inspect(start_error)),
201219 ])
202220203203- StartAgain(user)
204204- |> actor.send(state.self, _)
205205- actor.continue(state)
221221+ actor.stop_abnormal(
222222+ "Sender failed to start, i dont know how this would happen",
223223+ )
206224 }
207207- StartAgain(user) -> {
208208- case
209209- start_new(
210210- factory,
211211- Start(database:, registry:, user:, smtp_environment:),
212212- )
213213- {
214214- Ok(started) -> {
215215- actor.send(state.self, Started(Ok(started), user))
216216- actor.continue(state)
217217- }
218218- Error(error) -> {
219219- log(woof.Warning, "Failed to start sender twice", [
220220- woof.field("user", user.email),
221221- woof.field("details", string.inspect(error)),
222222- ])
223225224224- actor.stop_abnormal(
225225- "Failed to start sender for user " <> user.email <> " twice",
226226+ // runtime additions ----------------------------------------------------
227227+ StartFor(user) -> {
228228+ let start_new = fn() {
229229+ let _ =
230230+ start_new(
231231+ factory,
232232+ Start(
233233+ database:,
234234+ registry:,
235235+ user:,
236236+ smtp_environment:,
237237+ manager: name,
238238+ ),
226239 )
240240+ Nil
241241+ }
242242+243243+ case dict.get(state.senders, user.id) {
244244+ Ok(pid) -> {
245245+ case process.is_alive(pid) {
246246+ True -> Nil
247247+ False -> start_new()
248248+ }
227249 }
250250+ Error(_) -> start_new()
228251 }
252252+253253+ actor.continue(state)
229254 }
255255+ StopFor(user) -> {
256256+ case dict.get(state.senders, user.id) {
257257+ Ok(pid) -> {
258258+ case process.is_alive(pid) {
259259+ True -> process.send_exit(pid)
260260+ False -> Nil
261261+ }
262262+ }
263263+ Error(_) -> Nil
264264+ }
230265231231- // runtime additions ----------------------------------------------------
232232- StartFor(user) -> {
233233- start_new(
234234- factory,
235235- Start(database:, registry:, user:, smtp_environment:),
236236- )
237237- |> Started(user)
238238- |> actor.send(state.self, _)
266266+ actor.continue(state)
267267+ }
268268+ RestartFor(user) -> {
269269+ process.send(state.self, StopFor(user))
270270+ process.send(state.self, StartFor(user))
239271240272 actor.continue(state)
241273 }
···252284 registry: pubsub.Registry,
253285 user: user.User,
254286 smtp_environment: smtp.SmtpEnvironment,
287287+ manager: ManagerName,
255288 )
256289}
257290258291/// start a new sender in the sender factory
259292///
260260-/// make sure the user you are starting this sender for, is actually subscribed to some feeds
293293+/// the sender will register itself with the manager
261294///
262262-pub fn start_new(
263263- factory_name: process.Name(_),
264264- with: Start,
265265-) -> Result(actor.Started(Nil), actor.StartError) {
295295+fn start_new(factory factory_name: FactoryName, with with: Start) {
266296 log(woof.Info, "Starting", [woof.field("user", with.user.email)])
297297+267298 let factory = factory.get_by_name(factory_name)
299299+268300 factory.start_child(factory, with)
269301}
270302···286318// actor ------------------------------------------------------------------------
287319288320fn sender(args: Start) -> Result(actor.Started(Nil), actor.StartError) {
289289- let initial_selector = fn(state: State) -> State {
290290- log(woof.Info, "Initializing selector", [
291291- woof.field("user", state.user.email),
292292- ])
293293-294294- let self = process.self()
295295-296296- let selector =
297297- state.selector
298298- |> pubsub.select_user(
299299- user: state.user,
300300- in: state.registry,
301301- self:,
302302- handler: PubSubMessage,
303303- )
321321+ let Start(database:, registry:, user:, smtp_environment:, manager:) = args
304322305305- let selector =
306306- list.fold(state.feeds, selector, fn(selector, feed) {
307307- log(woof.Info, "User is subscribed to feed", [
308308- woof.field("user", state.user.email),
309309- woof.field("feed", feed.link |> uri.to_string()),
310310- ])
311311-312312- pubsub.select_feed(
313313- selector:,
314314- feed:,
315315- in: state.registry,
316316- self:,
317317- handler: PubSubMessage,
323323+ let actor_started =
324324+ actor.new_with_initialiser(300, fn(subject) {
325325+ let state =
326326+ State(
327327+ database:,
328328+ registry:,
329329+ user:,
330330+ feeds: [],
331331+ smtp_environment:,
332332+ self: subject,
333333+ sending_failed_n_times: 0,
334334+ selector: process.new_selector() |> process.select(subject),
318335 )
319319- })
320336321321- State(..state, selector:)
322322- }
337337+ actor.initialised(state)
338338+ |> actor.selecting(state.selector)
339339+ |> Ok
340340+ })
341341+ |> actor.on_message(on_message)
342342+ |> actor.start()
323343324324- let Start(database:, registry:, user:, smtp_environment:) = args
344344+ // let the manager know we started
345345+ let manager = process.named_subject(manager)
346346+ process.send(manager, Started(actor_started, user))
325347326326- actor.new_with_initialiser(100, fn(subject) {
327327- use feeds <- result.try(
328328- database.feeds_for_user(for: user, in: database)
329329- |> result.map_error(fn(error) {
330330- "Failed to get feeds from database with: " <> string.inspect(error)
331331- }),
332332- )
333333-334334- let state =
335335- State(
336336- database:,
337337- registry:,
338338- user:,
339339- feeds:,
340340- smtp_environment:,
341341- self: subject,
342342- sending_failed_n_times: 0,
343343- selector: process.new_selector() |> process.select(subject),
344344- )
345345- |> initial_selector
346346-347347- actor.initialised(state)
348348- |> actor.selecting(state.selector)
349349- |> Ok
350350- })
351351- |> actor.on_message(on_message)
352352- |> actor.start()
348348+ actor_started
353349}
354350355351// message ----------------------------------------------------------------------
356352357353type Message {
354354+ /// sent once at the start to fetch all feeds
355355+ ///
356356+ GetFeeds
358357 PubSubMessage(pubsub.Message)
359358 Retry(message: pubsub.Message, attempt: Int)
360359}
···374373 )
375374376375 case message {
376376+ GetFeeds -> {
377377+ case database.feeds_for_user(for: state.user, in: state.database) {
378378+ Ok(feeds) -> {
379379+ let state =
380380+ State(..state, feeds:)
381381+ |> initial_selector()
382382+383383+ actor.continue(state)
384384+ |> actor.with_selector(state.selector)
385385+ }
386386+ Error(_) -> actor.stop_abnormal("Failed to get feeds from the database")
387387+ }
388388+ }
377389 PubSubMessage(pubsub.FeedUpdate(update:)) -> {
378390 let handled = handle_feed_update(state, update)
379391 case handled {
···462474 }
463475}
464476477477+/// initializes the selector from the supplied state
478478+///
479479+/// > !! should only be called once on startup !!
480480+///
481481+fn initial_selector(state: State) {
482482+ log(woof.Info, "Initializing selector", [
483483+ woof.field("user", state.user.email),
484484+ ])
485485+486486+ let self = process.self()
487487+488488+ let selector =
489489+ state.selector
490490+ |> pubsub.select_user(
491491+ user: state.user,
492492+ in: state.registry,
493493+ self:,
494494+ handler: PubSubMessage,
495495+ )
496496+497497+ let selector =
498498+ list.fold(state.feeds, selector, fn(selector, feed) {
499499+ log(woof.Info, "User is subscribed to feed", [
500500+ woof.field("user", state.user.email),
501501+ woof.field("feed", feed.link |> uri.to_string()),
502502+ ])
503503+504504+ pubsub.select_feed(
505505+ selector:,
506506+ feed:,
507507+ in: state.registry,
508508+ self:,
509509+ handler: PubSubMessage,
510510+ )
511511+ })
512512+513513+ State(..state, selector:)
514514+}
515515+465516/// add a given feed
466517///
467518fn add_feed(state: State, feed: rss.Location) -> State {
···551602 smtp.feed_update_to_email(update, state.user)
552603 |> smtp.send_message(state.smtp_environment)
553604}
554554-// subscriptions changed --------------------------------------------------------
+2-2
src/eater/sql.gleam
···1313) {
1414 let sql =
1515 "
1616-INSERT INTO feeds (
1616+INSERT OR IGNORE INTO feeds (
1717 id,
1818 link,
1919 failed_n_times,
···9292) {
9393 let sql =
9494 "
9595-INSERT INTO users (
9595+INSERT OR IGNORE INTO users (
9696 id,
9797 email,
9898 password_hash,
+1-1
src/eater/sql/feeds.sql
···1414-- See the Licence for the specific language governing permissions and limitations. [cite: 6]
15151616-- name: AddFeed :exec
1717-INSERT INTO feeds (
1717+INSERT OR IGNORE INTO feeds (
1818 id,
1919 link,
2020 failed_n_times,
···1414-- See the Licence for the specific language governing permissions and limitations. [cite: 6]
15151616-- name: AddUser :exec
1717-INSERT INTO users (
1717+INSERT OR IGNORE INTO users (
1818 id,
1919 email,
2020 password_hash,