mail based rss feed aggregator
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

swap naive `starter` in `sender` and `fetcher` to a nicer implementation

ollie 1e5a9825 95fcfe53

+270 -71
+129 -34
src/eater/fetcher.gleam
··· 71 71 |> static_supervisor.supervised() 72 72 } 73 73 74 + type StarterState { 75 + StarterState( 76 + self: process.Subject(StarterMessage), 77 + factory: FactoryName, 78 + registry: pubsub.Registry, 79 + database: sqlight.Connection, 80 + left_to_start: Int, 81 + ) 82 + } 83 + 84 + type StarterMessage { 85 + /// start the process 86 + StartAll 87 + /// fetch feeds from db (attempt 1) 88 + DatabaseReturnedFeeds(Result(List(rss.Location), sqlight.Error)) 89 + /// fetch feeds from db (attempt 2) 90 + FetchAgain 91 + 92 + /// start fetcher (attempt 1) 93 + Started(Result(actor.Started(Nil), actor.StartError), rss.Location) 94 + /// start fetcher (attempt 2) 95 + StartAgain(rss.Location) 96 + } 97 + 74 98 fn starter( 75 - name: FactoryName, 99 + factory: FactoryName, 76 100 registry: pubsub.Registry, 77 101 database: sqlight.Connection, 78 102 ) { 103 + let log = fn(level, message, fields) { 104 + woof.new("FETCHER-STARTER") 105 + |> woof.log(level, message, fields) 106 + } 107 + 79 108 supervision.worker(fn() { 80 - actor.new_with_initialiser(1000, fn(subject) { 81 - use _ <- result.try( 82 - start_all_existing(name, registry, database) 83 - |> result.map_error(fn(error) { 84 - "Failed to start a fetcher " <> string.inspect(error) 85 - }), 86 - ) 109 + actor.new_with_initialiser(1000, fn(self) { 110 + process.send_after(self, 100, StartAll) 111 + 112 + actor.initialised(StarterState( 113 + self:, 114 + factory:, 115 + registry:, 116 + database:, 117 + left_to_start: -1, 118 + )) 119 + |> Ok 120 + }) 121 + |> actor.on_message(fn(state, message) { 122 + case message { 123 + StartAll -> { 124 + database.all_feeds(database) 125 + |> DatabaseReturnedFeeds 126 + |> actor.send(state.self, _) 127 + 128 + actor.continue(state) 129 + } 130 + DatabaseReturnedFeeds(Ok(feeds)) -> { 131 + let left_to_start = 132 + feeds 133 + |> list.map(fn(feed) { 134 + start_new(factory, Start(database:, registry:, feed:)) 135 + |> Started(feed) 136 + |> actor.send(state.self, _) 137 + }) 138 + |> list.length() 139 + 140 + actor.continue(StarterState(..state, left_to_start:)) 141 + } 142 + DatabaseReturnedFeeds(Error(db_error)) -> { 143 + log(woof.Warning, "Failed to get feeds from database once", [ 144 + woof.field("details", string.inspect(db_error)), 145 + ]) 146 + 147 + actor.send(state.self, FetchAgain) 148 + actor.continue(state) 149 + } 150 + FetchAgain -> { 151 + case database.all_feeds(database) { 152 + Ok(feeds) -> { 153 + DatabaseReturnedFeeds(Ok(feeds)) 154 + |> actor.send(state.self, _) 155 + 156 + actor.continue(state) 157 + } 158 + Error(error) -> { 159 + log(woof.Warning, "Failed to get feeds from database twice", [ 160 + woof.field("details", string.inspect(error)), 161 + ]) 162 + 163 + actor.stop_abnormal("Failed to get feeds from database twice") 164 + } 165 + } 166 + } 167 + Started(Ok(_), _user) -> { 168 + case state.left_to_start { 169 + // only one is 'left to start' (this one) 170 + // and since that just started, lets stop this actor 171 + 1 -> actor.stop() 172 + // theres more to go 173 + _ -> 174 + actor.continue( 175 + StarterState(..state, left_to_start: state.left_to_start - 1), 176 + ) 177 + } 178 + } 179 + Started(Error(start_error), feed) -> { 180 + log(woof.Warning, "Failed to start fetcher once", [ 181 + woof.field("feed", feed.link |> uri.to_string()), 182 + woof.field("details", string.inspect(start_error)), 183 + ]) 87 184 88 - process.send_after(subject, 100, Nil) 185 + StartAgain(feed) 186 + |> actor.send(state.self, _) 187 + actor.continue(state) 188 + } 189 + StartAgain(feed) -> { 190 + case start_new(factory, Start(database:, registry:, feed:)) { 191 + Ok(started) -> { 192 + actor.send(state.self, Started(Ok(started), feed)) 193 + actor.continue(state) 194 + } 195 + Error(error) -> { 196 + log(woof.Warning, "Failed to start fetcher twice", [ 197 + woof.field("feed", feed.link |> uri.to_string()), 198 + woof.field("details", string.inspect(error)), 199 + ]) 89 200 90 - actor.initialised(Nil) |> Ok 201 + actor.stop_abnormal( 202 + "Failed to start fetcher for feed " 203 + <> feed.link |> uri.to_string() 204 + <> " twice", 205 + ) 206 + } 207 + } 208 + } 209 + } 91 210 }) 92 - |> actor.on_message(fn(_, _) { actor.stop() }) 93 211 |> actor.start() 94 212 }) 95 213 |> supervision.restart(supervision.Transient) 96 - } 97 - 98 - /// starts a fetcher for each known feed 99 - /// 100 - pub fn start_all_existing( 101 - factory: FactoryName, 102 - registry: pubsub.Registry, 103 - database: sqlight.Connection, 104 - ) -> Result(List(actor.Started(Nil)), actor.StartError) { 105 - use feeds <- result.try( 106 - database.all_feeds(database) 107 - |> result.map_error(fn(error) { 108 - actor.InitFailed( 109 - "Failed to get feeds from database with error: " 110 - <> string.inspect(error), 111 - ) 112 - }), 113 - ) 114 - 115 - list.map(feeds, fn(feed) { 116 - start_new(factory, Start(feed:, registry:, database:)) 117 - }) 118 - |> result.all() 119 214 } 120 215 121 216 /// start a new fetcher in the fetcher factory
+141 -37
src/eater/sender.gleam
··· 69 69 |> static_supervisor.supervised() 70 70 } 71 71 72 + // starting --------------------------------------------------------------------- 73 + 74 + type StarterState { 75 + StarterState( 76 + self: process.Subject(StarterMessage), 77 + factory: FactoryName, 78 + registry: pubsub.Registry, 79 + database: sqlight.Connection, 80 + smtp_environment: smtp.SmtpEnvironment, 81 + left_to_start: Int, 82 + ) 83 + } 84 + 85 + type StarterMessage { 86 + /// start the process 87 + StartAll 88 + /// fetch users from db (attempt 1) 89 + DatabaseReturnedUsers(Result(List(user.User), sqlight.Error)) 90 + /// fetch users from db (attempt 2) 91 + FetchAgain 92 + 93 + /// start sender (attempt 1) 94 + Started(Result(actor.Started(Nil), actor.StartError), user.User) 95 + /// start sender (attempt 2) 96 + StartAgain(user.User) 97 + } 98 + 99 + /// handles starting all senders on startup and then shuts down 100 + /// 72 101 fn starter( 73 - name: FactoryName, 102 + factory: FactoryName, 74 103 registry: pubsub.Registry, 75 104 database: sqlight.Connection, 76 105 smtp_environment: smtp.SmtpEnvironment, 77 106 ) { 107 + let log = fn(level, message, fields) { 108 + woof.new("SENDER-STARTER") 109 + |> woof.log(level, message, fields) 110 + } 111 + 78 112 supervision.worker(fn() { 79 - actor.new_with_initialiser(1000, fn(subject) { 80 - use _ <- result.try( 81 - start_all_existing(name, registry, database, smtp_environment) 82 - |> result.map_error(fn(error) { 83 - "Failed to start a sender " <> string.inspect(error) 84 - }), 85 - ) 113 + actor.new_with_initialiser(1000, fn(self) { 114 + process.send_after(self, 100, StartAll) 86 115 87 - process.send_after(subject, 100, Nil) 88 - 89 - actor.initialised(Nil) |> Ok 116 + actor.initialised(StarterState( 117 + self:, 118 + factory:, 119 + registry:, 120 + database:, 121 + smtp_environment:, 122 + left_to_start: -1, 123 + )) 124 + |> Ok 90 125 }) 91 - |> actor.on_message(fn(_, _) { actor.stop() }) 92 - |> actor.start() 93 - }) 94 - |> supervision.restart(supervision.Transient) 95 - } 126 + |> actor.on_message(fn(state, message) { 127 + case message { 128 + StartAll -> { 129 + database.all_users(database) 130 + |> DatabaseReturnedUsers 131 + |> actor.send(state.self, _) 96 132 97 - // starting --------------------------------------------------------------------- 133 + actor.continue(state) 134 + } 135 + DatabaseReturnedUsers(Ok(users)) -> { 136 + let left_to_start = 137 + users 138 + |> list.map(fn(user) { 139 + start_new( 140 + factory, 141 + Start(database:, registry:, user:, smtp_environment:), 142 + ) 143 + |> Started(user) 144 + |> actor.send(state.self, _) 145 + }) 146 + |> list.length() 98 147 99 - /// starts a senders for each known user 100 - /// 101 - pub fn start_all_existing( 102 - factory: FactoryName, 103 - registry: pubsub.Registry, 104 - database: sqlight.Connection, 105 - smtp_environment: smtp.SmtpEnvironment, 106 - ) -> Result(List(actor.Started(Nil)), actor.StartError) { 107 - use users <- result.try( 108 - database.all_users(database) 109 - |> result.map_error(fn(error) { 110 - actor.InitFailed( 111 - "Failed to get users from database with error: " 112 - <> string.inspect(error), 113 - ) 114 - }), 115 - ) 148 + actor.continue(StarterState(..state, left_to_start:)) 149 + } 150 + DatabaseReturnedUsers(Error(db_error)) -> { 151 + log(woof.Warning, "Failed to get users from database once", [ 152 + woof.field("details", string.inspect(db_error)), 153 + ]) 154 + 155 + actor.send(state.self, FetchAgain) 156 + actor.continue(state) 157 + } 158 + FetchAgain -> { 159 + case database.all_users(database) { 160 + Ok(users) -> { 161 + DatabaseReturnedUsers(Ok(users)) 162 + |> actor.send(state.self, _) 163 + 164 + actor.continue(state) 165 + } 166 + Error(error) -> { 167 + log(woof.Warning, "Failed to get users from database twice", [ 168 + woof.field("details", string.inspect(error)), 169 + ]) 170 + 171 + actor.stop_abnormal("Failed to get users from database twice") 172 + } 173 + } 174 + } 175 + Started(Ok(_), _user) -> { 176 + case state.left_to_start { 177 + // only one is 'left to start' (this one) 178 + // and since that just started, lets stop this actor 179 + 1 -> actor.stop() 180 + // theres more to go 181 + _ -> 182 + actor.continue( 183 + StarterState(..state, left_to_start: state.left_to_start - 1), 184 + ) 185 + } 186 + } 187 + Started(Error(start_error), user) -> { 188 + log(woof.Warning, "Failed to start sender once", [ 189 + woof.field("user", user.email), 190 + woof.field("details", string.inspect(start_error)), 191 + ]) 192 + 193 + StartAgain(user) 194 + |> actor.send(state.self, _) 195 + actor.continue(state) 196 + } 197 + StartAgain(user) -> { 198 + case 199 + start_new( 200 + factory, 201 + Start(database:, registry:, user:, smtp_environment:), 202 + ) 203 + { 204 + Ok(started) -> { 205 + actor.send(state.self, Started(Ok(started), user)) 206 + actor.continue(state) 207 + } 208 + Error(error) -> { 209 + log(woof.Warning, "Failed to start sender twice", [ 210 + woof.field("user", user.email), 211 + woof.field("details", string.inspect(error)), 212 + ]) 116 213 117 - list.map(users, fn(user) { 118 - start_new(factory, Start(user:, registry:, database:, smtp_environment:)) 214 + actor.stop_abnormal( 215 + "Failed to start sender for user " <> user.email <> " twice", 216 + ) 217 + } 218 + } 219 + } 220 + } 221 + }) 222 + |> actor.start() 119 223 }) 120 - |> result.all() 224 + |> supervision.restart(supervision.Transient) 121 225 } 122 226 123 227 pub type Start {