Bluesky app fork with some witchin' additions 馃挮
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 401 lines 11 kB view raw
1import {type BskyAgent, type ChatBskyConvoGetLog} from '@atproto/api' 2import EventEmitter from 'eventemitter3' 3import {nanoid} from 'nanoid/non-secure' 4 5import {networkRetry} from '#/lib/async/retry' 6import {DM_SERVICE_HEADERS} from '#/lib/constants' 7import { 8 isErrorMaybeAppPasswordPermissions, 9 isNetworkError, 10} from '#/lib/strings/errors' 11import {Logger} from '#/logger' 12import { 13 BACKGROUND_POLL_INTERVAL, 14 DEFAULT_POLL_INTERVAL, 15} from '#/state/messages/events/const' 16import { 17 type MessagesEventBusDispatch, 18 MessagesEventBusDispatchEvent, 19 MessagesEventBusErrorCode, 20 type MessagesEventBusEvent, 21 type MessagesEventBusParams, 22 MessagesEventBusStatus, 23} from '#/state/messages/events/types' 24 25const logger = Logger.create(Logger.Context.DMsAgent) 26 27export class MessagesEventBus { 28 private id: string 29 30 private agent: BskyAgent 31 private emitter = new EventEmitter<{event: [MessagesEventBusEvent]}>() 32 33 private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing 34 private latestRev: string | undefined = undefined 35 private pollInterval = DEFAULT_POLL_INTERVAL 36 private requestedPollIntervals: Map<string, number> = new Map() 37 38 constructor(params: MessagesEventBusParams) { 39 this.id = nanoid(3) 40 this.agent = params.agent 41 42 this.init() 43 } 44 45 requestPollInterval(interval: number) { 46 const id = nanoid() 47 this.requestedPollIntervals.set(id, interval) 48 this.dispatch({ 49 event: MessagesEventBusDispatchEvent.UpdatePoll, 50 }) 51 return () => { 52 this.requestedPollIntervals.delete(id) 53 this.dispatch({ 54 event: MessagesEventBusDispatchEvent.UpdatePoll, 55 }) 56 } 57 } 58 59 getLatestRev() { 60 return this.latestRev 61 } 62 63 on( 64 handler: (event: MessagesEventBusEvent) => void, 65 options: { 66 convoId?: string 67 }, 68 ) { 69 const handle = (event: MessagesEventBusEvent) => { 70 if (event.type === 'logs' && options.convoId) { 71 const filteredLogs = event.logs.filter(log => { 72 if ('convoId' in log && log.convoId === options.convoId) { 73 return log.convoId === options.convoId 74 } 75 return false 76 }) 77 78 if (filteredLogs.length > 0) { 79 handler({ 80 ...event, 81 logs: filteredLogs, 82 }) 83 } 84 } else { 85 handler(event) 86 } 87 } 88 89 this.emitter.on('event', handle) 90 91 return () => { 92 this.emitter.off('event', handle) 93 } 94 } 95 96 background() { 97 logger.debug(`background`, {}) 98 this.dispatch({event: MessagesEventBusDispatchEvent.Background}) 99 } 100 101 suspend() { 102 logger.debug(`suspend`, {}) 103 this.dispatch({event: MessagesEventBusDispatchEvent.Suspend}) 104 } 105 106 resume() { 107 logger.debug(`resume`, {}) 108 this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) 109 } 110 111 private dispatch(action: MessagesEventBusDispatch) { 112 const prevStatus = this.status 113 114 switch (this.status) { 115 case MessagesEventBusStatus.Initializing: { 116 switch (action.event) { 117 case MessagesEventBusDispatchEvent.Ready: { 118 this.status = MessagesEventBusStatus.Ready 119 this.resetPoll() 120 this.emitter.emit('event', {type: 'connect'}) 121 break 122 } 123 case MessagesEventBusDispatchEvent.Background: { 124 this.status = MessagesEventBusStatus.Backgrounded 125 this.resetPoll() 126 this.emitter.emit('event', {type: 'connect'}) 127 break 128 } 129 case MessagesEventBusDispatchEvent.Suspend: { 130 this.status = MessagesEventBusStatus.Suspended 131 break 132 } 133 case MessagesEventBusDispatchEvent.Error: { 134 this.status = MessagesEventBusStatus.Error 135 this.emitter.emit('event', {type: 'error', error: action.payload}) 136 break 137 } 138 } 139 break 140 } 141 case MessagesEventBusStatus.Ready: { 142 switch (action.event) { 143 case MessagesEventBusDispatchEvent.Background: { 144 this.status = MessagesEventBusStatus.Backgrounded 145 this.resetPoll() 146 break 147 } 148 case MessagesEventBusDispatchEvent.Suspend: { 149 this.status = MessagesEventBusStatus.Suspended 150 this.stopPoll() 151 break 152 } 153 case MessagesEventBusDispatchEvent.Error: { 154 this.status = MessagesEventBusStatus.Error 155 this.stopPoll() 156 this.emitter.emit('event', {type: 'error', error: action.payload}) 157 break 158 } 159 case MessagesEventBusDispatchEvent.UpdatePoll: { 160 this.resetPoll() 161 break 162 } 163 } 164 break 165 } 166 case MessagesEventBusStatus.Backgrounded: { 167 switch (action.event) { 168 case MessagesEventBusDispatchEvent.Resume: { 169 this.status = MessagesEventBusStatus.Ready 170 this.resetPoll() 171 break 172 } 173 case MessagesEventBusDispatchEvent.Suspend: { 174 this.status = MessagesEventBusStatus.Suspended 175 this.stopPoll() 176 break 177 } 178 case MessagesEventBusDispatchEvent.Error: { 179 this.status = MessagesEventBusStatus.Error 180 this.stopPoll() 181 this.emitter.emit('event', {type: 'error', error: action.payload}) 182 break 183 } 184 case MessagesEventBusDispatchEvent.UpdatePoll: { 185 this.resetPoll() 186 break 187 } 188 } 189 break 190 } 191 case MessagesEventBusStatus.Suspended: { 192 switch (action.event) { 193 case MessagesEventBusDispatchEvent.Resume: { 194 this.status = MessagesEventBusStatus.Ready 195 this.resetPoll() 196 break 197 } 198 case MessagesEventBusDispatchEvent.Background: { 199 this.status = MessagesEventBusStatus.Backgrounded 200 this.resetPoll() 201 break 202 } 203 case MessagesEventBusDispatchEvent.Error: { 204 this.status = MessagesEventBusStatus.Error 205 this.stopPoll() 206 this.emitter.emit('event', {type: 'error', error: action.payload}) 207 break 208 } 209 } 210 break 211 } 212 case MessagesEventBusStatus.Error: { 213 switch (action.event) { 214 case MessagesEventBusDispatchEvent.UpdatePoll: { 215 // basically reset 216 this.status = MessagesEventBusStatus.Initializing 217 this.latestRev = undefined 218 this.init() 219 break 220 } 221 case MessagesEventBusDispatchEvent.Resume: { 222 this.status = MessagesEventBusStatus.Ready 223 this.resetPoll() 224 this.emitter.emit('event', {type: 'connect'}) 225 break 226 } 227 } 228 break 229 } 230 default: 231 break 232 } 233 234 logger.debug(`dispatch '${action.event}'`, { 235 id: this.id, 236 prev: prevStatus, 237 next: this.status, 238 }) 239 } 240 241 private async init() { 242 logger.debug(`init`, {}) 243 244 try { 245 const response = await networkRetry(2, () => { 246 return this.agent.chat.bsky.convo.getLog( 247 {}, 248 {headers: DM_SERVICE_HEADERS}, 249 ) 250 }) 251 // throw new Error('UNCOMMENT TO TEST INIT FAILURE') 252 253 const {cursor} = response.data 254 255 // should always be defined 256 if (cursor) { 257 if (!this.latestRev) { 258 this.latestRev = cursor 259 } else if (cursor > this.latestRev) { 260 this.latestRev = cursor 261 } 262 } 263 264 this.dispatch({event: MessagesEventBusDispatchEvent.Ready}) 265 } catch (e: any) { 266 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) { 267 logger.error(`init failed`, { 268 safeMessage: e.message, 269 }) 270 } 271 272 this.dispatch({ 273 event: MessagesEventBusDispatchEvent.Error, 274 payload: { 275 exception: e, 276 code: MessagesEventBusErrorCode.InitFailed, 277 retry: () => { 278 this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) 279 }, 280 }, 281 }) 282 } 283 } 284 285 /* 286 * Polling 287 */ 288 289 private isPolling = false 290 private pollIntervalRef: NodeJS.Timeout | undefined 291 292 private getPollInterval() { 293 switch (this.status) { 294 case MessagesEventBusStatus.Ready: { 295 const requested = Array.from(this.requestedPollIntervals.values()) 296 const lowest = Math.min(DEFAULT_POLL_INTERVAL, ...requested) 297 return lowest 298 } 299 case MessagesEventBusStatus.Backgrounded: { 300 return BACKGROUND_POLL_INTERVAL 301 } 302 default: 303 return DEFAULT_POLL_INTERVAL 304 } 305 } 306 307 private resetPoll() { 308 this.pollInterval = this.getPollInterval() 309 this.stopPoll() 310 this.startPoll() 311 } 312 313 private startPoll() { 314 if (!this.isPolling) this.poll() 315 316 this.pollIntervalRef = setInterval(() => { 317 if (this.isPolling) return 318 this.poll() 319 }, this.pollInterval) 320 } 321 322 private stopPoll() { 323 if (this.pollIntervalRef) clearInterval(this.pollIntervalRef) 324 } 325 326 private async poll() { 327 if (this.isPolling) return 328 329 this.isPolling = true 330 331 // logger.debug( 332 // `poll`, 333 // { 334 // requestedPollIntervals: Array.from( 335 // this.requestedPollIntervals.values(), 336 // ), 337 // }, 338 // ) 339 340 try { 341 const response = await networkRetry(2, () => { 342 return this.agent.chat.bsky.convo.getLog( 343 { 344 cursor: this.latestRev, 345 }, 346 {headers: DM_SERVICE_HEADERS}, 347 ) 348 }) 349 350 // throw new Error('UNCOMMENT TO TEST POLL FAILURE') 351 352 const {logs: events} = response.data 353 354 let needsEmit = false 355 let batch: ChatBskyConvoGetLog.OutputSchema['logs'] = [] 356 357 for (const ev of events) { 358 /* 359 * If there's a rev, we should handle it. If there's not a rev, we don't 360 * know what it is. 361 */ 362 if ('rev' in ev && typeof ev.rev === 'string') { 363 /* 364 * We only care about new events 365 */ 366 if (ev.rev > (this.latestRev = this.latestRev || ev.rev)) { 367 /* 368 * Update rev regardless of if it's a ev type we care about or not 369 */ 370 this.latestRev = ev.rev 371 needsEmit = true 372 batch.push(ev) 373 } 374 } 375 } 376 377 if (needsEmit) { 378 this.emitter.emit('event', {type: 'logs', logs: batch}) 379 } 380 } catch (e: any) { 381 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) { 382 logger.error(`poll events failed`, { 383 safeMessage: e.message, 384 }) 385 } 386 387 this.dispatch({ 388 event: MessagesEventBusDispatchEvent.Error, 389 payload: { 390 exception: e, 391 code: MessagesEventBusErrorCode.PollFailed, 392 retry: () => { 393 this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) 394 }, 395 }, 396 }) 397 } finally { 398 this.isPolling = false 399 } 400 } 401}