mail based rss feed aggregator
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

move starting sender/fetcher into a sub-supervisor also redo a lot of the pubsub stuff to actually properly handle selectors

ollie 89b8a7cb 55b32332

+369 -139
+15 -29
src/eater.gleam
··· 54 54 let fetcher_factory = process.new_name("fetcher_factory") 55 55 let sender_factory = process.new_name("sender_factory") 56 56 57 - let _supervisor = 57 + let assert Ok(_supervisor) = 58 58 supervisor.new(supervisor.RestForOne) 59 59 |> supervisor.add(group_registry.supervised(registry)) 60 - |> supervisor.add(fetcher.factory(fetcher_factory)) 61 - |> supervisor.add(sender.factory(sender_factory)) 60 + |> supervisor.add(fetcher.factory(fetcher_factory, registry:, database:)) 61 + |> supervisor.add(sender.factory( 62 + sender_factory, 63 + registry:, 64 + database:, 65 + smtp_environment:, 66 + )) 62 67 |> supervisor.add(webserver.supervised( 63 - database, 64 - registry, 65 - smtp_environment, 66 - configuration, 68 + database:, 69 + registry:, 70 + sender_factory:, 71 + fetcher_factory:, 72 + smtp_environment:, 73 + configuration:, 67 74 )) 68 75 |> supervisor.start() 69 76 70 77 woof.log(logger, woof.Info, "Finished starting supervisor", []) 71 78 72 - woof.log(logger, woof.Info, "Starting senders", []) 73 - 74 - let assert Ok(_) = 75 - sender.start_all_existing( 76 - sender_factory, 77 - registry, 78 - database, 79 - smtp_environment, 80 - ) 81 - as "Failed to start existing senders" 82 - 83 - woof.log(logger, woof.Info, "Finished starting senders", []) 84 - 85 - woof.log(logger, woof.Info, "Starting fetchers", []) 86 - 87 - let assert Ok(_) = 88 - fetcher.start_all_existing(fetcher_factory, registry, database) 89 - as "Failed to start existing fetchers" 90 - 91 - woof.log(logger, woof.Info, "Finished starting fetchers", []) 92 - 93 79 process.sleep_forever() 94 80 } 95 81 ··· 117 103 fn configure_logging() -> Nil { 118 104 woof.configure(woof.Config( 119 105 level: woof.Debug, 120 - format: woof.Json, 106 + format: woof.Text, 121 107 colors: woof.Auto, 122 108 )) 123 109
+72 -25
src/eater/fetcher.gleam
··· 27 27 import gleam/list 28 28 import gleam/otp/actor 29 29 import gleam/otp/factory_supervisor as factory 30 + import gleam/otp/static_supervisor 30 31 import gleam/otp/supervision 31 32 import gleam/result 32 33 import gleam/string 34 + import gleam/uri 33 35 import parsed_it/xml 34 36 import sqlight 35 37 import woof ··· 46 48 woof.new("FETCHER") 47 49 |> woof.log(level, message, fields) 48 50 } 51 + 52 + /// reexport of the underlying `process.Name` for convenience 53 + /// 54 + pub type FactoryName = 55 + process.Name(factory.Message(Start, Nil)) 49 56 50 57 /// the factory for the fetcher actors 51 58 /// 52 59 pub fn factory( 53 - name: process.Name(_), 54 - ) -> supervision.ChildSpecification(factory.Supervisor(Start, _)) { 55 - factory.worker_child(start) 56 - |> factory.named(name) 57 - |> factory.supervised() 60 + name name: FactoryName, 61 + registry registry: pubsub.Registry, 62 + database database: sqlight.Connection, 63 + ) -> supervision.ChildSpecification(_) { 64 + static_supervisor.new(static_supervisor.OneForAll) 65 + |> static_supervisor.add( 66 + factory.worker_child(start) 67 + |> factory.named(name) 68 + |> factory.supervised(), 69 + ) 70 + |> static_supervisor.add(starter(name, registry, database)) 71 + |> static_supervisor.supervised() 72 + } 73 + 74 + fn starter( 75 + name: FactoryName, 76 + registry: pubsub.Registry, 77 + database: sqlight.Connection, 78 + ) { 79 + supervision.worker(fn() { 80 + actor.new_with_initialiser(1000, fn(subject) { 81 + use _ <- result.try( 82 + start_all_existing(name, registry, database) 83 + |> result.map_error(fn(error) { 84 + "Failed to start a fetcher " <> string.inspect(error) 85 + }), 86 + ) 87 + 88 + process.send_after(subject, 100, Nil) 89 + 90 + actor.initialised(Nil) |> Ok 91 + }) 92 + |> actor.on_message(fn(_, _) { actor.stop() }) 93 + |> actor.start() 94 + }) 95 + |> supervision.restart(supervision.Transient) 58 96 } 59 97 60 98 /// starts a fetcher for each known feed 61 99 /// 62 100 pub fn start_all_existing( 63 - factory: process.Name(factory.Message(Start, _)), 101 + factory: FactoryName, 64 102 registry: pubsub.Registry, 65 103 database: sqlight.Connection, 66 - ) -> Result(List(actor.Started(a)), actor.StartError) { 104 + ) -> Result(List(actor.Started(Nil)), actor.StartError) { 67 105 use feeds <- result.try( 68 106 database.all_feeds(database) 69 107 |> result.map_error(fn(error) { ··· 82 120 83 121 /// start a new fetcher in the fetcher factory 84 122 /// 123 + /// make sure there is at least 1 sender listening for updates for this feed 124 + /// 85 125 pub fn start_new( 86 - factory_name: process.Name(_), 126 + factory_name: FactoryName, 87 127 with: Start, 88 128 ) -> Result(actor.Started(_), actor.StartError) { 89 129 let factory = factory.get_by_name(factory_name) ··· 124 164 fn start(args: Start) -> Result(actor.Started(_), actor.StartError) { 125 165 let Start(feed:, registry:, database:) = args 126 166 127 - log(woof.Info, "Starting", [woof.field("feed", feed.link)]) 167 + log(woof.Info, "Starting", [woof.field("feed", feed.link |> uri.to_string())]) 128 168 129 169 actor.new_with_initialiser(100, fn(subject) { 130 170 // TODO: persist the next timestamp to check at to the database ··· 138 178 registry:, 139 179 database:, 140 180 )) 141 - |> actor.returning(subject) 181 + |> actor.returning(Nil) 142 182 |> Ok 143 183 }) 144 184 |> actor.on_message(on_message) ··· 150 190 fn on_message(state: State, message: Message) -> actor.Next(State, Message) { 151 191 case message { 152 192 CheckFeeds -> { 153 - let handled = handle_feed(state.feed, state) 154 - 155 - case handled { 156 - Ok(_) -> Nil 157 - Error(error) -> 158 - log(woof.Error, "Checking feed failed", [ 159 - woof.field("feed", state.feed.link), 160 - woof.field("details", string.inspect(error)), 161 - ]) 193 + // only actually check the feed if there are people subscribed to it 194 + case pubsub.subscriber_count_feed(state.feed, state.registry) { 195 + [] -> Nil 196 + [_, ..] -> { 197 + let handled = handle_feed(state.feed, state) 198 + case handled { 199 + Ok(_) -> Nil 200 + Error(error) -> 201 + log(woof.Error, "Checking feed failed", [ 202 + woof.field("feed", state.feed.link |> uri.to_string()), 203 + woof.field("details", string.inspect(error)), 204 + ]) 205 + } 206 + } 162 207 } 163 208 164 209 // cancel old timer in case we received another CheckFeeds message that wasnt sent by the timer ··· 194 239 // reverse so we publish the oldest items first 195 240 |> list.reverse 196 241 |> list.map(fn(item) { 197 - pubsub.feed_update( 242 + pubsub.publish_feed_update( 198 243 update: rss.FeedUpdate(item, feed.channel.details), 199 244 for: state.feed, 200 245 in: state.registry, ··· 220 265 fn fetch_feed(location: rss.Location) -> Result(rss.Feed, FetchError) { 221 266 case location { 222 267 rss.Location(skip_n_times: 0, ..) -> { 223 - log(woof.Info, "Fetching", [woof.field("feed", location.link)]) 268 + log(woof.Info, "Fetching", [ 269 + woof.field("feed", location.link |> uri.to_string()), 270 + ]) 224 271 225 272 use req <- result.try( 226 - request.to(location.link) 273 + request.from_uri(location.link) 227 274 |> result.replace_error(FailedToParseUrl(location.link)), 228 275 ) 229 276 ··· 241 288 |> result.map_error(FailedToParseRssFeed) 242 289 response -> { 243 290 log(woof.Warning, "Fetching failed", [ 244 - woof.field("feed", location.link), 291 + woof.field("feed", location.link |> uri.to_string()), 245 292 woof.int_field("status", response.status), 246 293 ]) 247 294 Error(Non200Response(response)) ··· 250 297 } 251 298 _ -> { 252 299 log(woof.Info, "Skipping", [ 253 - woof.field("feed", location.link), 300 + woof.field("feed", location.link |> uri.to_string()), 254 301 woof.field("reason", "cooldown"), 255 302 ]) 256 303 Error(OnCooldown) ··· 259 306 } 260 307 261 308 type FetchError { 262 - FailedToParseUrl(String) 309 + FailedToParseUrl(uri.Uri) 263 310 RequestFailed(httpc.HttpError) 264 311 Non200Response(response.Response(String)) 265 312 FailedToParseRssFeed(xml.XmlDecodeError)
+139 -23
src/eater/pubsub.gleam
··· 20 20 import eater/user 21 21 import gleam/erlang/process 22 22 import gleam/list 23 + import gleam/otp/actor 23 24 import gleam/otp/supervision 25 + import gleam/uri 24 26 import group_registry 25 27 import woof 26 28 import youid/uuid ··· 45 47 group_registry.supervised(name) 46 48 } 47 49 50 + /// added for testing purposes, prefer `supervised` 51 + /// 52 + pub fn start( 53 + name: Registry, 54 + ) -> Result( 55 + actor.Started(group_registry.GroupRegistry(Message)), 56 + actor.StartError, 57 + ) { 58 + group_registry.start(name) 59 + } 60 + 48 61 pub type Message { 49 62 FeedUpdate(update: rss.FeedUpdate) 50 - SubscriptionChanged 63 + SubscriptionRemoved(rss.Location) 64 + SubscriptionAdded(rss.Location) 51 65 } 52 66 53 67 /// get the string 'channel' for a given `rss.Location` ··· 64 78 65 79 // publishing ------------------------------------------------------------------- 66 80 67 - /// publish a message about subscription changes for a user to the registry 81 + /// publish a user unsubscribing from a feed 68 82 /// 69 - pub fn subscription_changed(for user: user.User, in registry: Registry) -> Nil { 83 + pub fn publish_unsubscribed_from_feed( 84 + user user: user.User, 85 + from feed: rss.Location, 86 + in registry: Registry, 87 + ) -> Nil { 70 88 let registry = group_registry.get_registry(registry) 71 89 72 90 let members = group_registry.members(registry, user_channel(user)) 73 91 74 - log(woof.Info, "Published subscription change", [ 92 + list.map(members, process.send(_, SubscriptionRemoved(feed))) 93 + 94 + log(woof.Info, "Published unsubscribe", [ 75 95 woof.field("user", user.email), 96 + woof.field("feed", feed.link |> uri.to_string()), 76 97 ]) 98 + } 99 + 100 + /// publish a user subscribing to a feed 101 + /// 102 + pub fn publish_subscribed_to_feed( 103 + user user: user.User, 104 + from feed: rss.Location, 105 + in registry: Registry, 106 + ) -> Nil { 107 + let registry = group_registry.get_registry(registry) 108 + 109 + let members = group_registry.members(registry, user_channel(user)) 77 110 78 - list.map(members, process.send(_, SubscriptionChanged)) 79 - Nil 111 + list.map(members, process.send(_, SubscriptionAdded(feed))) 112 + log(woof.Info, "Published subscription", [ 113 + woof.field("user", user.email), 114 + woof.field("feed", feed.link |> uri.to_string()), 115 + ]) 80 116 } 81 117 82 118 /// publish an `rss.FeedUpdate` for a given `rss.Location` in a given registry 83 119 /// 84 - pub fn feed_update( 120 + pub fn publish_feed_update( 85 121 update update: rss.FeedUpdate, 86 122 for feed: rss.Location, 87 123 in registry: Registry, ··· 91 127 92 128 let members = group_registry.members(registry, channel) 93 129 130 + list.map(members, process.send(_, FeedUpdate(update:))) 131 + 94 132 log(woof.Info, "Feed published update", [ 95 - woof.field("feed", feed.link), 133 + woof.field("feed", feed.link |> uri.to_string()), 96 134 woof.field("update", update.new.link), 97 135 ]) 98 - 99 - list.map(members, process.send(_, FeedUpdate(update:))) 100 - 101 - Nil 102 136 } 103 137 104 138 // subscribing ------------------------------------------------------------------ 105 139 106 - /// get a subject to select on for a given user 140 + /// add the subject for a given user to the supplied selector 107 141 /// 108 - pub fn user_selector( 142 + pub fn select_user( 143 + selector selector: process.Selector(b), 109 144 user user: user.User, 110 145 in registry: Registry, 111 146 self self: process.Pid, 112 - subject subject: process.Subject(b), 113 147 handler handler: fn(Message) -> b, 114 148 ) -> process.Selector(b) { 115 149 let registry = group_registry.get_registry(registry) 116 150 117 - let registry = group_registry.join(registry, user_channel(user), self) 151 + let channel = user_channel(user) 118 152 119 - process.new_selector() 120 - |> process.select(subject) 153 + let registry = group_registry.join(registry, channel, self) 154 + 155 + selector 121 156 |> process.select_map(registry, handler) 122 157 } 123 158 124 - /// get a subject to select on for a given feed 159 + /// add the subject for a given feed to the supplied selector 125 160 /// 126 - pub fn feed_selector( 161 + pub fn select_feed( 162 + selector selector: process.Selector(b), 127 163 feed feed: rss.Location, 128 164 in registry: Registry, 129 165 self self: process.Pid, 130 - subject subject: process.Subject(b), 131 166 handler handler: fn(Message) -> b, 132 167 ) -> process.Selector(b) { 133 168 let registry = group_registry.get_registry(registry) ··· 136 171 137 172 let registry = group_registry.join(registry, channel, self) 138 173 139 - process.new_selector() 140 - |> process.select(subject) 174 + selector 141 175 |> process.select_map(registry, handler) 142 176 } 177 + 178 + // unsubscribing ---------------------------------------------------------------- 179 + 180 + /// unsubscribe from the registry for a given feed 181 + /// 182 + /// returns the new selector to use 183 + /// 184 + pub fn deselect_feed( 185 + selector: process.Selector(b), 186 + registry: Registry, 187 + feed: rss.Location, 188 + pid: process.Pid, 189 + ) -> process.Selector(b) { 190 + let registry = group_registry.get_registry(registry) 191 + 192 + let channel = feed_channel(feed) 193 + 194 + let members = group_registry.members(registry, channel) 195 + 196 + group_registry.leave(registry, channel, [pid]) 197 + 198 + list.fold(members, selector, fn(selector, member) { 199 + let subject_pid = member |> process.subject_owner() 200 + 201 + case subject_pid { 202 + Ok(subject_pid) if subject_pid == pid -> 203 + process.deselect(selector, member) 204 + _ -> selector 205 + } 206 + }) 207 + } 208 + 209 + /// unsubscribe from the registry for a given user 210 + /// 211 + /// returns the new selector to use 212 + /// 213 + pub fn deselect_user( 214 + selector: process.Selector(b), 215 + registry: Registry, 216 + user: user.User, 217 + pid: process.Pid, 218 + ) -> process.Selector(b) { 219 + let registry = group_registry.get_registry(registry) 220 + 221 + let channel = user_channel(user) 222 + 223 + let members = group_registry.members(registry, channel) 224 + 225 + group_registry.leave(registry, channel, [pid]) 226 + 227 + list.fold(members, selector, fn(selector, member) { 228 + let subject_pid = member |> process.subject_owner() 229 + 230 + case subject_pid { 231 + Ok(subject_pid) if subject_pid == pid -> 232 + process.deselect(selector, member) 233 + _ -> selector 234 + } 235 + }) 236 + } 237 + 238 + // registry info ---------------------------------------------------------------- 239 + 240 + /// how many other processes are subscribed to updates for this `user.User` 241 + /// 242 + pub fn subscriber_count_user(for user: user.User, in registry: Registry) { 243 + let registry = group_registry.get_registry(registry) 244 + 245 + let channel = user_channel(user) 246 + 247 + group_registry.members(registry, channel) 248 + } 249 + 250 + /// how many other processes are subscribed to updates for this `rss.Location` 251 + /// 252 + pub fn subscriber_count_feed(for feed: rss.Location, in registry: Registry) { 253 + let registry = group_registry.get_registry(registry) 254 + 255 + let channel = feed_channel(feed) 256 + 257 + group_registry.members(registry, channel) 258 + }
+143 -62
src/eater/sender.gleam
··· 27 27 import gleam/list 28 28 import gleam/otp/actor 29 29 import gleam/otp/factory_supervisor as factory 30 + import gleam/otp/static_supervisor 30 31 import gleam/otp/supervision 31 32 import gleam/result 32 33 import gleam/string 33 - import group_registry 34 + import gleam/uri 34 35 import sqlight 35 36 import woof 36 37 ··· 47 48 48 49 // factory ---------------------------------------------------------------------- 49 50 51 + /// reexport of the underlying `process.Name` for convenience 52 + /// 53 + pub type FactoryName = 54 + process.Name(factory.Message(Start, Nil)) 55 + 50 56 pub fn factory( 51 - name: process.Name(_), 52 - ) -> supervision.ChildSpecification(factory.Supervisor(Start, Nil)) { 53 - factory.worker_child(sender) 54 - |> factory.named(name) 55 - |> factory.supervised() 57 + name name: FactoryName, 58 + registry registry: pubsub.Registry, 59 + database database: sqlight.Connection, 60 + smtp_environment smtp_environment: smtp.SmtpEnvironment, 61 + ) -> supervision.ChildSpecification(_) { 62 + static_supervisor.new(static_supervisor.OneForAll) 63 + |> static_supervisor.add( 64 + factory.worker_child(sender) 65 + |> factory.named(name) 66 + |> factory.supervised(), 67 + ) 68 + |> static_supervisor.add(starter(name, registry, database, smtp_environment)) 69 + |> static_supervisor.supervised() 70 + } 71 + 72 + fn starter( 73 + name: FactoryName, 74 + registry: pubsub.Registry, 75 + database: sqlight.Connection, 76 + smtp_environment: smtp.SmtpEnvironment, 77 + ) { 78 + supervision.worker(fn() { 79 + actor.new_with_initialiser(1000, fn(subject) { 80 + use _ <- result.try( 81 + start_all_existing(name, registry, database, smtp_environment) 82 + |> result.map_error(fn(error) { 83 + "Failed to start a sender " <> string.inspect(error) 84 + }), 85 + ) 86 + 87 + process.send_after(subject, 100, Nil) 88 + 89 + actor.initialised(Nil) |> Ok 90 + }) 91 + |> actor.on_message(fn(_, _) { actor.stop() }) 92 + |> actor.start() 93 + }) 94 + |> supervision.restart(supervision.Transient) 56 95 } 57 96 58 97 // starting --------------------------------------------------------------------- ··· 60 99 /// starts a senders for each known user 61 100 /// 62 101 pub fn start_all_existing( 63 - factory: process.Name(factory.Message(Start, Nil)), 102 + factory: FactoryName, 64 103 registry: pubsub.Registry, 65 104 database: sqlight.Connection, 66 105 smtp_environment: smtp.SmtpEnvironment, ··· 92 131 93 132 /// start a new sender in the sender factory 94 133 /// 134 + /// make sure the user you are starting this sender for, is actually subscribed to some feeds 135 + /// 95 136 pub fn start_new( 96 137 factory_name: process.Name(_), 97 138 with: Start, ··· 108 149 database: sqlight.Connection, 109 150 registry: pubsub.Registry, 110 151 user: user.User, 152 + feeds: List(rss.Location), 111 153 smtp_environment: smtp.SmtpEnvironment, 112 154 self: process.Subject(Message), 113 155 sending_failed_n_times: Int, 156 + selector: process.Selector(Message), 114 157 ) 115 158 } 116 159 117 160 // actor ------------------------------------------------------------------------ 118 161 119 162 fn sender(args: Start) -> Result(actor.Started(Nil), actor.StartError) { 163 + let initial_selector = fn(state: State) -> State { 164 + log(woof.Info, "Initializing selector", [ 165 + woof.field("user", state.user.email), 166 + ]) 167 + 168 + let self = process.self() 169 + 170 + let selector = 171 + state.selector 172 + |> pubsub.select_user( 173 + user: state.user, 174 + in: state.registry, 175 + self:, 176 + handler: PubSubMessage, 177 + ) 178 + 179 + let selector = 180 + list.fold(state.feeds, selector, fn(selector, feed) { 181 + log(woof.Info, "User is subscribed to feed", [ 182 + woof.field("user", state.user.email), 183 + woof.field("feed", feed.link |> uri.to_string()), 184 + ]) 185 + 186 + pubsub.select_feed( 187 + selector:, 188 + feed:, 189 + in: state.registry, 190 + self:, 191 + handler: PubSubMessage, 192 + ) 193 + }) 194 + 195 + State(..state, selector:) 196 + } 197 + 120 198 let Start(database:, registry:, user:, smtp_environment:) = args 121 199 122 200 actor.new_with_initialiser(100, fn(subject) { 201 + use feeds <- result.try( 202 + database.feeds_for_user(for: user, in: database) 203 + |> result.map_error(fn(error) { 204 + "Failed to get feeds from database with: " <> string.inspect(error) 205 + }), 206 + ) 207 + 123 208 let state = 124 209 State( 125 210 database:, 126 211 registry:, 127 212 user:, 213 + feeds:, 128 214 smtp_environment:, 129 215 self: subject, 130 216 sending_failed_n_times: 0, 217 + selector: process.new_selector() |> process.select(subject), 131 218 ) 132 - 133 - use selector <- result.try(update_feeds(state:)) 219 + |> initial_selector 134 220 135 221 actor.initialised(state) 136 - |> actor.selecting(selector) 222 + |> actor.selecting(state.selector) 137 223 |> Ok 138 224 }) 139 225 |> actor.on_message(on_message) ··· 198 284 } 199 285 } 200 286 } 201 - PubSubMessage(pubsub.SubscriptionChanged) -> { 202 - case update_feeds(state) { 203 - Ok(selector) -> actor.continue(state) |> actor.with_selector(selector) 204 - Error(error) -> actor.stop_abnormal(error) 205 - } 287 + PubSubMessage(pubsub.SubscriptionRemoved(feed)) -> { 288 + let state = drop_feed(state, feed) 289 + actor.continue(state) 290 + } 291 + PubSubMessage(pubsub.SubscriptionAdded(feed)) -> { 292 + let state = add_feed(state, feed) 293 + 294 + actor.continue(state) 295 + |> actor.with_selector(state.selector) 206 296 } 207 297 Retry(message: pubsub.FeedUpdate(update:), attempt:) if attempt < 5 -> { 208 298 let handled = handle_feed_update(state, update) ··· 241 331 ]) 242 332 actor.continue(state) 243 333 } 244 - Retry(message: pubsub.SubscriptionChanged, ..) -> 245 - actor.stop_abnormal("pubsub.SubscriptionChanged cannot be retried") 334 + Retry(message:, ..) -> 335 + actor.stop_abnormal(string.inspect(message) <> "cannot be retried") 246 336 } 247 337 } 248 338 339 + /// add a given feed 340 + /// 341 + fn add_feed(state: State, feed: rss.Location) -> State { 342 + // update the selector 343 + let selector = 344 + state.selector 345 + |> pubsub.select_feed( 346 + feed:, 347 + in: state.registry, 348 + self: process.self(), 349 + handler: PubSubMessage, 350 + ) 351 + 352 + // update the feed list 353 + let feeds = 354 + state.feeds 355 + |> list.prepend(feed) 356 + |> list.unique() 357 + 358 + State(..state, selector:, feeds:) 359 + } 360 + 361 + /// drop a given feed 362 + /// 363 + fn drop_feed(state: State, feed: rss.Location) -> State { 364 + let selector = 365 + state.selector 366 + |> pubsub.deselect_feed(state.registry, feed, process.self()) 367 + 368 + let feeds = 369 + list.filter(state.feeds, fn(a) { a.id != feed.id }) 370 + |> list.unique() 371 + 372 + State(..state, feeds:, selector:) 373 + } 374 + 249 375 // feed updates ----------------------------------------------------------------- 250 376 251 377 type FeedUpdateError { ··· 299 425 smtp.feed_update_to_email(update, state.user) 300 426 |> smtp.send_message(state.smtp_environment) 301 427 } 302 - 303 428 // subscriptions changed -------------------------------------------------------- 304 - 305 - fn update_feeds(state state: State) -> Result(process.Selector(Message), String) { 306 - log(woof.Info, "Updating feeds for user", [ 307 - woof.field("user", state.user.email), 308 - ]) 309 - 310 - let self = process.self() 311 - 312 - use feeds <- result.try( 313 - database.feeds_for_user(for: state.user, in: state.database) 314 - |> result.map_error(fn(error) { 315 - "Failed to update feeds for " 316 - <> state.user.email 317 - <> " with error: " 318 - <> string.inspect(error) 319 - }), 320 - ) 321 - 322 - let selector = 323 - pubsub.user_selector( 324 - user: state.user, 325 - in: state.registry, 326 - self:, 327 - subject: state.self, 328 - handler: PubSubMessage, 329 - ) 330 - 331 - list.fold(feeds, selector, fn(selector, feed) { 332 - log(woof.Info, "User subscribed to feed", [ 333 - woof.field("user", state.user.email), 334 - woof.field("feed", feed.link), 335 - ]) 336 - 337 - pubsub.feed_selector( 338 - feed:, 339 - in: state.registry, 340 - self:, 341 - subject: state.self, 342 - handler: PubSubMessage, 343 - ) 344 - |> process.merge_selector(selector) 345 - }) 346 - |> Ok 347 - }