mail based rss feed aggregator
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

each fetcher now stores the updates from the last fetching when a new sender starts and subscribes to the feed; the fetcher will be notified and itll send all updates it has to that specific sender. that way, in a longer running system, users subscribing to an existing feed dont have to wait for (worst case) up to an hour to get their emails

ollie 2dce134b fea49068

+71 -21
+51 -19
src/eater/fetcher.gleam
··· 310 310 311 311 pub type Message { 312 312 CheckFeed 313 + InitSelector 314 + PubSubMessage(pubsub.Message) 313 315 } 314 316 315 317 type State { ··· 324 326 feed: rss.Location, 325 327 registry: pubsub.Registry, 326 328 database: sqlight.Connection, 329 + updates: List(rss.FeedUpdate), 327 330 ) 328 331 } 329 332 ··· 364 367 _ -> { 365 368 log(woof.Info, "Next check is being scheduled now", []) 366 369 367 - process.send_after(self, 150, CheckFeed) 370 + process.send_after(self, 150, InitSelector) 368 371 } 369 372 } 370 373 371 - actor.initialised(State( 372 - self: self, 373 - fetch_timer:, 374 - feed:, 375 - registry:, 376 - database:, 377 - )) 374 + actor.initialised( 375 + State( 376 + self: self, 377 + fetch_timer:, 378 + feed:, 379 + registry:, 380 + database:, 381 + updates: [], 382 + ), 383 + ) 378 384 |> actor.returning(self) 379 385 |> Ok 380 386 }) ··· 392 398 /// 393 399 fn on_message(state: State, message: Message) -> actor.Next(State, Message) { 394 400 case message { 401 + InitSelector -> { 402 + let selector = 403 + process.new_selector() 404 + |> process.select(state.self) 405 + |> pubsub.select_feed( 406 + feed: state.feed, 407 + handler: PubSubMessage, 408 + self: process.self(), 409 + in: state.registry, 410 + ) 411 + 412 + actor.continue(state) 413 + |> actor.with_selector(selector) 414 + } 415 + 416 + PubSubMessage(pubsub.NewMember(member)) -> { 417 + state.updates 418 + |> list.map(fn(update) { process.send(member, pubsub.FeedUpdate(update)) }) 419 + 420 + actor.continue(state) 421 + } 422 + PubSubMessage(pubsub.FeedUpdate(_)) -> actor.continue(state) 423 + PubSubMessage(pubsub.FailedToFetchTooManyTimes(_)) -> actor.continue(state) 424 + 395 425 // gets periodically raised to check our feed 396 426 CheckFeed -> { 397 427 log(woof.Info, "Checking feed", [ ··· 520 550 Ok(State(..state, feed:)) 521 551 } 522 552 Ok(feed) -> { 523 - feed.channel.items 524 - // reverse so we publish the oldest items first 525 - |> list.reverse 526 - |> list.map(fn(item) { 527 - pubsub.publish_feed_update( 528 - update: rss.FeedUpdate(item, feed.channel.details), 529 - for: state.feed, 530 - in: state.registry, 531 - ) 532 - }) 553 + let updates = 554 + feed.channel.items 555 + // reverse so we publish the oldest items first 556 + |> list.reverse 557 + |> list.map(rss.FeedUpdate(_, feed.channel.details)) 533 558 534 - Ok(state) 559 + updates 560 + |> list.map(pubsub.publish_feed_update( 561 + _, 562 + for: state.feed, 563 + in: state.registry, 564 + )) 565 + 566 + Ok(State(..state, updates:)) 535 567 } 536 568 } 537 569 }
+17 -2
src/eater/pubsub.gleam
··· 57 57 pub type Message { 58 58 FeedUpdate(update: rss.FeedUpdate) 59 59 FailedToFetchTooManyTimes(rss.Location) 60 + NewMember(process.Subject(Message)) 60 61 } 61 62 62 63 /// get the string 'channel' for a given `rss.Location` ··· 136 137 137 138 /// add the subject for a given feed to the supplied selector 138 139 /// 140 + /// also notifies all subscribers that a new member has joined 141 + /// 139 142 pub fn select_feed( 140 143 selector selector: process.Selector(b), 141 144 feed feed: rss.Location, ··· 143 146 self self: process.Pid, 144 147 handler handler: fn(Message) -> b, 145 148 ) -> process.Selector(b) { 149 + // get the registry 146 150 let registry = group_registry.get_registry(registry) 147 151 152 + // and the 'channel' for this feed 148 153 let channel = feed_channel(feed) 149 154 150 - let registry = group_registry.join(registry, channel, self) 155 + // make a new subject for the caller of this function 156 + let registry_subject = group_registry.join(registry, channel, self) 151 157 158 + // add it to their selector, using their handler 159 + let selector = 160 + selector 161 + |> process.select_map(registry_subject, handler) 162 + 163 + // notify other subscribers of new member 164 + let members = group_registry.members(registry, channel) 165 + list.map(members, process.send(_, NewMember(registry_subject))) 166 + 167 + // return their ajusted selector to the caller 152 168 selector 153 - |> process.select_map(registry, handler) 154 169 } 155 170 156 171 // unsubscribing ----------------------------------------------------------------
+3
src/eater/sender.gleam
··· 581 581 actor.continue(state) 582 582 } 583 583 } 584 + // senders dont care about other senders joining 585 + PubSubMessage(pubsub.NewMember(_)) -> actor.continue(state) 586 + Retry(message: pubsub.NewMember(_), attempt: _) -> actor.continue(state) 584 587 } 585 588 } 586 589