mail based rss feed aggregator
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

starters are no longer transient they now also have `StartFor` message and function for runtime adding of new workers

TODO: add `StopFor` message and function

ollie 4bd93753 1e5a9825

+106 -67
+10 -2
src/eater.gleam
··· 52 52 53 53 let registry = process.new_name("registry") 54 54 let fetcher_factory = process.new_name("fetcher_factory") 55 + let fetcher_starter = process.new_name("fetcher_starter") 55 56 let sender_factory = process.new_name("sender_factory") 57 + let sender_starter = process.new_name("sender_starter") 56 58 57 59 let assert Ok(_supervisor) = 58 60 supervisor.new(supervisor.RestForOne) 59 61 |> supervisor.add(group_registry.supervised(registry)) 60 - |> supervisor.add(fetcher.factory(fetcher_factory, registry:, database:)) 62 + |> supervisor.add(fetcher.factory( 63 + name: fetcher_factory, 64 + starter: fetcher_starter, 65 + registry:, 66 + database:, 67 + )) 61 68 |> supervisor.add(sender.factory( 62 - sender_factory, 69 + name: sender_factory, 70 + starter: sender_starter, 63 71 registry:, 64 72 database:, 65 73 smtp_environment:,
+41 -34
src/eater/fetcher.gleam
··· 57 57 /// the factory for the fetcher actors 58 58 /// 59 59 pub fn factory( 60 - name name: FactoryName, 60 + name factory: FactoryName, 61 + starter starter_name: StarterName, 61 62 registry registry: pubsub.Registry, 62 63 database database: sqlight.Connection, 63 64 ) -> supervision.ChildSpecification(_) { 64 65 static_supervisor.new(static_supervisor.OneForAll) 65 66 |> static_supervisor.add( 66 67 factory.worker_child(start) 67 - |> factory.named(name) 68 + |> factory.named(factory) 68 69 |> factory.supervised(), 69 70 ) 70 - |> static_supervisor.add(starter(name, registry, database)) 71 + |> static_supervisor.add(starter(starter_name, factory, registry, database)) 71 72 |> static_supervisor.supervised() 72 73 } 73 74 ··· 77 78 factory: FactoryName, 78 79 registry: pubsub.Registry, 79 80 database: sqlight.Connection, 80 - left_to_start: Int, 81 81 ) 82 82 } 83 83 84 - type StarterMessage { 84 + pub opaque type StarterMessage { 85 + // initial startup ------------------------------------------------------------ 85 86 /// start the process 86 87 StartAll 87 88 /// fetch feeds from db (attempt 1) ··· 93 94 Started(Result(actor.Started(Nil), actor.StartError), rss.Location) 94 95 /// start fetcher (attempt 2) 95 96 StartAgain(rss.Location) 97 + 98 + // runtime additions ---------------------------------------------------------- 99 + /// start a sender for a given user at runtime 100 + StartFor(rss.Location) 101 + } 102 + 103 + pub type StarterName = 104 + process.Name(StarterMessage) 105 + 106 + pub fn start_for(starter: StarterName, feed: rss.Location) { 107 + process.named_subject(starter) 108 + |> process.send(StartFor(feed)) 96 109 } 97 110 98 111 fn starter( 112 + name: StarterName, 99 113 factory: FactoryName, 100 114 registry: pubsub.Registry, 101 115 database: sqlight.Connection, ··· 109 123 actor.new_with_initialiser(1000, fn(self) { 110 124 process.send_after(self, 100, StartAll) 111 125 112 - actor.initialised(StarterState( 113 - self:, 114 - factory:, 115 - registry:, 116 - database:, 117 - left_to_start: -1, 118 - )) 126 + actor.initialised(StarterState(self:, factory:, registry:, database:)) 119 127 |> Ok 120 128 }) 121 129 |> actor.on_message(fn(state, message) { ··· 128 136 actor.continue(state) 129 137 } 130 138 DatabaseReturnedFeeds(Ok(feeds)) -> { 131 - let left_to_start = 132 - feeds 133 - |> list.map(fn(feed) { 134 - start_new(factory, Start(database:, registry:, feed:)) 135 - |> Started(feed) 136 - |> actor.send(state.self, _) 137 - }) 138 - |> list.length() 139 + feeds 140 + |> list.map(fn(feed) { 141 + start_new(factory, Start(database:, registry:, feed:)) 142 + |> Started(feed) 143 + |> actor.send(state.self, _) 144 + }) 139 145 140 - actor.continue(StarterState(..state, left_to_start:)) 146 + actor.continue(state) 141 147 } 142 148 DatabaseReturnedFeeds(Error(db_error)) -> { 143 149 log(woof.Warning, "Failed to get feeds from database once", [ ··· 164 170 } 165 171 } 166 172 } 167 - Started(Ok(_), _user) -> { 168 - case state.left_to_start { 169 - // only one is 'left to start' (this one) 170 - // and since that just started, lets stop this actor 171 - 1 -> actor.stop() 172 - // theres more to go 173 - _ -> 174 - actor.continue( 175 - StarterState(..state, left_to_start: state.left_to_start - 1), 176 - ) 177 - } 178 - } 173 + 174 + // all starts go through this 175 + // TODO: track started subjects and add StopFor 176 + Started(Ok(_), _user) -> actor.continue(state) 177 + 179 178 Started(Error(start_error), feed) -> { 180 179 log(woof.Warning, "Failed to start fetcher once", [ 181 180 woof.field("feed", feed.link |> uri.to_string()), ··· 206 205 } 207 206 } 208 207 } 208 + // runtime additions ---------------------------------------------------- 209 + StartFor(feed) -> { 210 + start_new(factory, Start(database:, registry:, feed:)) 211 + |> Started(feed) 212 + |> actor.send(state.self, _) 213 + 214 + actor.continue(state) 215 + } 209 216 } 210 217 }) 218 + |> actor.named(name) 211 219 |> actor.start() 212 220 }) 213 - |> supervision.restart(supervision.Transient) 214 221 } 215 222 216 223 /// start a new fetcher in the fetcher factory
+53 -31
src/eater/sender.gleam
··· 54 54 process.Name(factory.Message(Start, Nil)) 55 55 56 56 pub fn factory( 57 - name name: FactoryName, 57 + name factory: FactoryName, 58 + starter starter_name: StarterName, 58 59 registry registry: pubsub.Registry, 59 60 database database: sqlight.Connection, 60 61 smtp_environment smtp_environment: smtp.SmtpEnvironment, ··· 62 63 static_supervisor.new(static_supervisor.OneForAll) 63 64 |> static_supervisor.add( 64 65 factory.worker_child(sender) 65 - |> factory.named(name) 66 + |> factory.named(factory) 66 67 |> factory.supervised(), 67 68 ) 68 - |> static_supervisor.add(starter(name, registry, database, smtp_environment)) 69 + |> static_supervisor.add(starter( 70 + starter_name, 71 + factory, 72 + registry, 73 + database, 74 + smtp_environment, 75 + )) 69 76 |> static_supervisor.supervised() 70 77 } 71 78 72 79 // starting --------------------------------------------------------------------- 80 + 81 + pub type StarterName = 82 + process.Name(StarterMessage) 73 83 74 84 type StarterState { 75 85 StarterState( ··· 78 88 registry: pubsub.Registry, 79 89 database: sqlight.Connection, 80 90 smtp_environment: smtp.SmtpEnvironment, 81 - left_to_start: Int, 82 91 ) 83 92 } 84 93 85 - type StarterMessage { 94 + pub opaque type StarterMessage { 95 + // initial startup ------------------------------------------------------------ 86 96 /// start the process 87 97 StartAll 88 98 /// fetch users from db (attempt 1) ··· 94 104 Started(Result(actor.Started(Nil), actor.StartError), user.User) 95 105 /// start sender (attempt 2) 96 106 StartAgain(user.User) 107 + 108 + // runtime additions ------------------------------------------------------------ 109 + /// start a sender for a given user at runtime 110 + StartFor(user.User) 111 + } 112 + 113 + pub fn start_for(starter: StarterName, user: user.User) { 114 + process.named_subject(starter) 115 + |> process.send(StartFor(user)) 97 116 } 98 117 99 118 /// handles starting all senders on startup and then shuts down 100 119 /// 101 120 fn starter( 121 + name: StarterName, 102 122 factory: FactoryName, 103 123 registry: pubsub.Registry, 104 124 database: sqlight.Connection, ··· 119 139 registry:, 120 140 database:, 121 141 smtp_environment:, 122 - left_to_start: -1, 123 142 )) 124 143 |> Ok 125 144 }) 126 145 |> actor.on_message(fn(state, message) { 127 146 case message { 147 + // initial start -------------------------------------------------------- 128 148 StartAll -> { 129 149 database.all_users(database) 130 150 |> DatabaseReturnedUsers ··· 133 153 actor.continue(state) 134 154 } 135 155 DatabaseReturnedUsers(Ok(users)) -> { 136 - let left_to_start = 137 - users 138 - |> list.map(fn(user) { 139 - start_new( 140 - factory, 141 - Start(database:, registry:, user:, smtp_environment:), 142 - ) 143 - |> Started(user) 144 - |> actor.send(state.self, _) 145 - }) 146 - |> list.length() 156 + users 157 + |> list.map(fn(user) { 158 + start_new( 159 + factory, 160 + Start(database:, registry:, user:, smtp_environment:), 161 + ) 162 + |> Started(user) 163 + |> actor.send(state.self, _) 164 + }) 147 165 148 - actor.continue(StarterState(..state, left_to_start:)) 166 + actor.continue(state) 149 167 } 150 168 DatabaseReturnedUsers(Error(db_error)) -> { 151 169 log(woof.Warning, "Failed to get users from database once", [ ··· 172 190 } 173 191 } 174 192 } 175 - Started(Ok(_), _user) -> { 176 - case state.left_to_start { 177 - // only one is 'left to start' (this one) 178 - // and since that just started, lets stop this actor 179 - 1 -> actor.stop() 180 - // theres more to go 181 - _ -> 182 - actor.continue( 183 - StarterState(..state, left_to_start: state.left_to_start - 1), 184 - ) 185 - } 186 - } 193 + // all starts go through this 194 + // TODO: track started subjects and add StopFor 195 + Started(Ok(_), _user) -> actor.continue(state) 196 + 187 197 Started(Error(start_error), user) -> { 188 198 log(woof.Warning, "Failed to start sender once", [ 189 199 woof.field("user", user.email), ··· 217 227 } 218 228 } 219 229 } 230 + 231 + // runtime additions ---------------------------------------------------- 232 + StartFor(user) -> { 233 + start_new( 234 + factory, 235 + Start(database:, registry:, user:, smtp_environment:), 236 + ) 237 + |> Started(user) 238 + |> actor.send(state.self, _) 239 + 240 + actor.continue(state) 241 + } 220 242 } 221 243 }) 244 + |> actor.named(name) 222 245 |> actor.start() 223 246 }) 224 - |> supervision.restart(supervision.Transient) 225 247 } 226 248 227 249 pub type Start {
+2
test/eater_test.gleam
··· 109 109 110 110 let registry = process.new_name("registry") 111 111 let factory = process.new_name("factory") 112 + let starter = process.new_name("starter") 112 113 113 114 let assert Ok(_) = 114 115 static_supervisor.new(static_supervisor.RestForOne) 115 116 |> static_supervisor.add(pubsub.supervised(registry)) 116 117 |> static_supervisor.add(sender.factory( 117 118 factory, 119 + starter:, 118 120 registry:, 119 121 database:, 120 122 smtp_environment: test_smtp_env(),