mail based rss feed aggregator
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

simplify a little bit, fix some left over naming issues and actually make senders subscribe to feeds again

ollie 7a24320a 4b9919c3

+75 -77
+39 -36
src/eater/fetcher.gleam
··· 66 66 ) -> supervision.ChildSpecification(_) { 67 67 static_supervisor.new(static_supervisor.OneForAll) 68 68 |> static_supervisor.add( 69 - factory.worker_child(start) 69 + factory.worker_child(fetcher) 70 70 |> factory.named(factory) 71 71 |> factory.supervised(), 72 72 ) 73 - |> static_supervisor.add(starter(starter_name, factory, registry, database)) 73 + |> static_supervisor.add(manager(starter_name, factory, registry, database)) 74 74 |> static_supervisor.supervised() 75 75 } 76 76 ··· 85 85 } 86 86 87 87 pub opaque type ManagerMessage { 88 - // initial startup ------------------------------------------------------------ 89 - /// start the process 88 + // internal messages ---------------------------------------------------------- 89 + /// start all fetchers 90 90 StartAll 91 - /// fetch feeds from db 92 - DatabaseReturnedFeeds(List(rss.Location)) 93 91 94 - /// start fetcher 95 - Started( 92 + /// a fetcher has started and is registering itself 93 + FetcherStarted( 96 94 Result(actor.Started(process.Subject(Message)), actor.StartError), 97 95 rss.Location, 98 96 ) 99 97 100 - // runtime additions ---------------------------------------------------------- 98 + // external commands ---------------------------------------------------------- 101 99 /// start a sender for a given user at runtime 102 100 StartFor(rss.Location) 103 101 /// stop the fetcher for a given feed at runtime ··· 131 129 |> process.send(RecheckFor(feed)) 132 130 } 133 131 134 - fn starter( 132 + fn manager( 135 133 name: ManagerName, 136 134 factory: FactoryName, 137 135 registry: pubsub.Registry, 138 136 database: sqlight.Connection, 139 137 ) { 140 138 let log = fn(level, message, fields) { 141 - woof.new("FETCHER-STARTER") 139 + woof.new("FETCHER-MANAGER") 142 140 |> woof.log(level, message, fields) 143 141 } 144 142 145 143 supervision.worker(fn() { 146 - actor.new_with_initialiser(1000, fn(self) { 147 - process.send_after(self, 100, StartAll) 144 + log(woof.Info, "Starting", []) 145 + 146 + actor.new_with_initialiser(100, fn(self) { 147 + process.send_after(self, 150, StartAll) 148 148 149 149 actor.initialised(ManagerState( 150 150 self:, ··· 160 160 StartAll -> { 161 161 case database.all_feeds(database) { 162 162 Ok(feeds) -> { 163 - DatabaseReturnedFeeds(feeds) |> actor.send(state.self, _) 163 + // start all the fetchers 164 + list.map(feeds, fn(feed) { 165 + process.send(state.self, StartFor(feed)) 166 + }) 167 + 164 168 actor.continue(state) 165 169 } 166 170 // have the supervision handle retrying ··· 169 173 170 174 actor.continue(state) 171 175 } 172 - DatabaseReturnedFeeds(feeds) -> { 173 - feeds 174 - |> list.map(fn(feed) { 175 - start_new( 176 - factory, 177 - Start(database:, registry:, feed:, manager: name), 178 - ) 179 - |> Started(feed) 180 - |> actor.send(state.self, _) 181 - }) 182 - 183 - actor.continue(state) 184 - } 185 176 186 177 // all starts go through this 187 - Started(Ok(started), feed) -> { 178 + FetcherStarted(Ok(started), feed) -> { 188 179 // if there is an existing sender for this 189 180 // tell it to shut down 190 181 case dict.get(state.fetchers, feed.id) { ··· 205 196 )), 206 197 ) 207 198 199 + log(woof.Info, "Fetcher has registered itself", [ 200 + woof.field("feed", feed.link |> uri.to_string()), 201 + ]) 202 + 208 203 actor.continue(state) 209 204 } 210 205 211 - Started(Error(start_error), feed) -> { 206 + FetcherStarted(Error(start_error), feed) -> { 212 207 log(woof.Error, "Failed to start fetcher", [ 213 208 woof.field("feed", feed.link |> uri.to_string()), 214 209 woof.field("details", string.inspect(start_error)), ··· 326 321 327 322 /// starts a new fetcher child 328 323 /// 329 - fn start(args: Start) -> Result(actor.Started(_), actor.StartError) { 324 + fn fetcher(args: Start) -> Result(actor.Started(_), actor.StartError) { 330 325 let Start(feed:, registry:, database:, manager:) = args 331 326 332 327 log(woof.Info, "Starting", [woof.field("feed", feed.link |> uri.to_string())]) 333 328 334 329 let started = 335 - actor.new_with_initialiser(100, fn(subject) { 330 + actor.new_with_initialiser(100, fn(self) { 336 331 // TODO: persist the next timestamp to check at to the database 337 332 // so the actor restarting doesnt preeptively recheck the feed 338 - let fetch_timer = process.send_after(subject, 150, CheckFeeds) 333 + let fetch_timer = process.send_after(self, 150, CheckFeeds) 339 334 340 335 actor.initialised(State( 341 - self: subject, 336 + self: self, 342 337 fetch_timer:, 343 338 feed:, 344 339 registry:, 345 340 database:, 346 341 )) 347 - |> actor.returning(subject) 342 + |> actor.returning(self) 348 343 |> Ok 349 344 }) 350 345 |> actor.on_message(on_message) ··· 352 347 353 348 // let the manager know we started 354 349 let manager = process.named_subject(manager) 355 - process.send(manager, Started(started, feed)) 350 + process.send(manager, FetcherStarted(started, feed)) 356 351 357 352 started 358 353 } ··· 362 357 fn on_message(state: State, message: Message) -> actor.Next(State, Message) { 363 358 case message { 364 359 CheckFeeds -> { 360 + log(woof.Info, "Checking feed", [ 361 + woof.field("feed", state.feed.link |> uri.to_string()), 362 + ]) 363 + 365 364 // only actually check the feed if there are people subscribed to it 366 365 case pubsub.subscriber_count_feed(state.feed, state.registry) { 367 - [] -> Nil 366 + [] -> 367 + log(woof.Warning, "Skipping", [ 368 + woof.field("feed", state.feed.link |> uri.to_string()), 369 + woof.field("reason", "No sender is subscribed"), 370 + ]) 368 371 [_, ..] -> { 369 372 let handled = handle_feed(state.feed, state) 370 373 case handled {
+36 -41
src/eater/sender.gleam
··· 95 95 } 96 96 97 97 pub opaque type ManagerMessage { 98 - // initial startup ------------------------------------------------------------ 99 - /// start the process 98 + // internal messages ---------------------------------------------------------- 99 + /// start all senders 100 100 StartAll 101 - /// fetch users from db 102 - DatabaseReturnedUsers(List(user.User)) 103 - /// start sender 104 - Started(Result(actor.Started(Nil), actor.StartError), user.User) 101 + /// a sender has started and is registering itself 102 + SenderStarted(Result(actor.Started(Nil), actor.StartError), user.User) 105 103 106 - // runtime additions ------------------------------------------------------------ 107 - /// start a sender for a given user at runtime 104 + // external commands ---------------------------------------------------------- 105 + /// start a sender for a given user 108 106 StartFor(user.User) 109 - /// stop the sender for a given user at runtime 107 + /// stop the sender for a given user 110 108 StopFor(user.User) 111 - /// restart the sender for a given user at runtime 109 + /// restart the sender for a given user 112 110 RestartFor(user.User) 113 111 } 114 112 113 + /// start a sender for a given user 114 + /// 115 115 pub fn start_for(manager: ManagerName, user: user.User) { 116 116 process.named_subject(manager) 117 117 |> process.send(StartFor(user)) 118 118 } 119 119 120 + /// stop the sender for a given user 121 + /// 120 122 pub fn stop_for(manager: ManagerName, user: user.User) { 121 123 process.named_subject(manager) 122 124 |> process.send(StopFor(user)) 123 125 } 124 126 127 + /// restart the sender for a given user 128 + /// 125 129 pub fn restart_for(manager: ManagerName, user: user.User) { 126 130 process.named_subject(manager) 127 131 |> process.send(RestartFor(user)) 128 132 } 129 133 130 - /// handles starting all senders on startup and then shuts down 134 + /// starts senders for all existing users on startup 135 + /// then also tracks every started sender and handles starting / stopping and restarting 131 136 /// 132 137 fn manager( 133 138 name: ManagerName, ··· 137 142 smtp_environment: smtp.SmtpEnvironment, 138 143 ) { 139 144 let log = fn(level, message, fields) { 140 - woof.new("SENDER-STARTER") 145 + woof.new("SENDER-MANAGER") 141 146 |> woof.log(level, message, fields) 142 147 } 143 148 144 149 supervision.worker(fn() { 145 - actor.new_with_initialiser(1000, fn(self) { 146 - process.send_after(self, 100, StartAll) 150 + actor.new_with_initialiser(100, fn(self) { 151 + process.send_after(self, 150, StartAll) 147 152 148 153 actor.initialised(ManagerState( 149 154 self:, ··· 161 166 StartAll -> { 162 167 case database.all_users(database) { 163 168 Ok(users) -> { 164 - DatabaseReturnedUsers(users) 165 - |> actor.send(state.self, _) 169 + // start all senders 170 + list.map(users, fn(user) { 171 + process.send(state.self, StartFor(user)) 172 + }) 166 173 167 174 actor.continue(state) 168 175 } ··· 171 178 172 179 actor.continue(state) 173 180 } 174 - DatabaseReturnedUsers(users) -> { 175 - users 176 - |> list.map(fn(user) { 177 - start_new( 178 - factory, 179 - Start( 180 - database:, 181 - registry:, 182 - user:, 183 - smtp_environment:, 184 - manager: name, 185 - ), 186 - ) 187 - |> Started(user) 188 - |> actor.send(state.self, _) 189 - }) 190 - 191 - actor.continue(state) 192 - } 193 181 // all starts go through this 194 - Started(Ok(started), user) -> { 182 + SenderStarted(Ok(started), user) -> { 195 183 // if there is an existing sender for this 196 184 // tell it to shut down 197 185 case dict.get(state.senders, user.id) { ··· 209 197 senders: dict.insert(state.senders, user.id, started.pid), 210 198 ) 211 199 200 + log(woof.Info, "Sender has registered itself", [ 201 + woof.field("user", user.email), 202 + ]) 203 + 212 204 actor.continue(state) 213 205 } 214 206 215 - Started(Error(start_error), user) -> { 207 + SenderStarted(Error(start_error), user) -> { 216 208 log(woof.Error, "Failed to start sender", [ 217 209 woof.field("user", user.email), 218 210 woof.field("details", string.inspect(start_error)), ··· 321 313 let Start(database:, registry:, user:, smtp_environment:, manager:) = args 322 314 323 315 let actor_started = 324 - actor.new_with_initialiser(300, fn(subject) { 316 + actor.new_with_initialiser(100, fn(self) { 317 + // tell self to get this users feeds from the database 318 + process.send_after(self, 150, GetFeeds) 319 + 325 320 let state = 326 321 State( 327 322 database:, ··· 329 324 user:, 330 325 feeds: [], 331 326 smtp_environment:, 332 - self: subject, 327 + self:, 333 328 sending_failed_n_times: 0, 334 - selector: process.new_selector() |> process.select(subject), 329 + selector: process.new_selector() |> process.select(self), 335 330 ) 336 331 337 332 actor.initialised(state) ··· 343 338 344 339 // let the manager know we started 345 340 let manager = process.named_subject(manager) 346 - process.send(manager, Started(actor_started, user)) 341 + process.send(manager, SenderStarted(actor_started, user)) 347 342 348 343 actor_started 349 344 }