forked from
jollywhoppers.com/witchsky.app
Bluesky app fork with some witchin' additions 馃挮
1import {type BskyAgent, type ChatBskyConvoGetLog} from '@atproto/api'
2import EventEmitter from 'eventemitter3'
3import {nanoid} from 'nanoid/non-secure'
4
5import {networkRetry} from '#/lib/async/retry'
6import {DM_SERVICE_HEADERS} from '#/lib/constants'
7import {
8 isErrorMaybeAppPasswordPermissions,
9 isNetworkError,
10} from '#/lib/strings/errors'
11import {Logger} from '#/logger'
12import {
13 BACKGROUND_POLL_INTERVAL,
14 DEFAULT_POLL_INTERVAL,
15} from '#/state/messages/events/const'
16import {
17 type MessagesEventBusDispatch,
18 MessagesEventBusDispatchEvent,
19 MessagesEventBusErrorCode,
20 type MessagesEventBusEvent,
21 type MessagesEventBusParams,
22 MessagesEventBusStatus,
23} from '#/state/messages/events/types'
24
25const logger = Logger.create(Logger.Context.DMsAgent)
26
27export class MessagesEventBus {
28 private id: string
29
30 private agent: BskyAgent
31 private emitter = new EventEmitter<{event: [MessagesEventBusEvent]}>()
32
33 private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing
34 private latestRev: string | undefined = undefined
35 private pollInterval = DEFAULT_POLL_INTERVAL
36 private requestedPollIntervals: Map<string, number> = new Map()
37
38 constructor(params: MessagesEventBusParams) {
39 this.id = nanoid(3)
40 this.agent = params.agent
41
42 this.init()
43 }
44
45 requestPollInterval(interval: number) {
46 const id = nanoid()
47 this.requestedPollIntervals.set(id, interval)
48 this.dispatch({
49 event: MessagesEventBusDispatchEvent.UpdatePoll,
50 })
51 return () => {
52 this.requestedPollIntervals.delete(id)
53 this.dispatch({
54 event: MessagesEventBusDispatchEvent.UpdatePoll,
55 })
56 }
57 }
58
59 getLatestRev() {
60 return this.latestRev
61 }
62
63 on(
64 handler: (event: MessagesEventBusEvent) => void,
65 options: {
66 convoId?: string
67 },
68 ) {
69 const handle = (event: MessagesEventBusEvent) => {
70 if (event.type === 'logs' && options.convoId) {
71 const filteredLogs = event.logs.filter(log => {
72 if ('convoId' in log && log.convoId === options.convoId) {
73 return log.convoId === options.convoId
74 }
75 return false
76 })
77
78 if (filteredLogs.length > 0) {
79 handler({
80 ...event,
81 logs: filteredLogs,
82 })
83 }
84 } else {
85 handler(event)
86 }
87 }
88
89 this.emitter.on('event', handle)
90
91 return () => {
92 this.emitter.off('event', handle)
93 }
94 }
95
96 background() {
97 logger.debug(`background`, {})
98 this.dispatch({event: MessagesEventBusDispatchEvent.Background})
99 }
100
101 suspend() {
102 logger.debug(`suspend`, {})
103 this.dispatch({event: MessagesEventBusDispatchEvent.Suspend})
104 }
105
106 resume() {
107 logger.debug(`resume`, {})
108 this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
109 }
110
111 private dispatch(action: MessagesEventBusDispatch) {
112 const prevStatus = this.status
113
114 switch (this.status) {
115 case MessagesEventBusStatus.Initializing: {
116 switch (action.event) {
117 case MessagesEventBusDispatchEvent.Ready: {
118 this.status = MessagesEventBusStatus.Ready
119 this.resetPoll()
120 this.emitter.emit('event', {type: 'connect'})
121 break
122 }
123 case MessagesEventBusDispatchEvent.Background: {
124 this.status = MessagesEventBusStatus.Backgrounded
125 this.resetPoll()
126 this.emitter.emit('event', {type: 'connect'})
127 break
128 }
129 case MessagesEventBusDispatchEvent.Suspend: {
130 this.status = MessagesEventBusStatus.Suspended
131 break
132 }
133 case MessagesEventBusDispatchEvent.Error: {
134 this.status = MessagesEventBusStatus.Error
135 this.emitter.emit('event', {type: 'error', error: action.payload})
136 break
137 }
138 }
139 break
140 }
141 case MessagesEventBusStatus.Ready: {
142 switch (action.event) {
143 case MessagesEventBusDispatchEvent.Background: {
144 this.status = MessagesEventBusStatus.Backgrounded
145 this.resetPoll()
146 break
147 }
148 case MessagesEventBusDispatchEvent.Suspend: {
149 this.status = MessagesEventBusStatus.Suspended
150 this.stopPoll()
151 break
152 }
153 case MessagesEventBusDispatchEvent.Error: {
154 this.status = MessagesEventBusStatus.Error
155 this.stopPoll()
156 this.emitter.emit('event', {type: 'error', error: action.payload})
157 break
158 }
159 case MessagesEventBusDispatchEvent.UpdatePoll: {
160 this.resetPoll()
161 break
162 }
163 }
164 break
165 }
166 case MessagesEventBusStatus.Backgrounded: {
167 switch (action.event) {
168 case MessagesEventBusDispatchEvent.Resume: {
169 this.status = MessagesEventBusStatus.Ready
170 this.resetPoll()
171 break
172 }
173 case MessagesEventBusDispatchEvent.Suspend: {
174 this.status = MessagesEventBusStatus.Suspended
175 this.stopPoll()
176 break
177 }
178 case MessagesEventBusDispatchEvent.Error: {
179 this.status = MessagesEventBusStatus.Error
180 this.stopPoll()
181 this.emitter.emit('event', {type: 'error', error: action.payload})
182 break
183 }
184 case MessagesEventBusDispatchEvent.UpdatePoll: {
185 this.resetPoll()
186 break
187 }
188 }
189 break
190 }
191 case MessagesEventBusStatus.Suspended: {
192 switch (action.event) {
193 case MessagesEventBusDispatchEvent.Resume: {
194 this.status = MessagesEventBusStatus.Ready
195 this.resetPoll()
196 break
197 }
198 case MessagesEventBusDispatchEvent.Background: {
199 this.status = MessagesEventBusStatus.Backgrounded
200 this.resetPoll()
201 break
202 }
203 case MessagesEventBusDispatchEvent.Error: {
204 this.status = MessagesEventBusStatus.Error
205 this.stopPoll()
206 this.emitter.emit('event', {type: 'error', error: action.payload})
207 break
208 }
209 }
210 break
211 }
212 case MessagesEventBusStatus.Error: {
213 switch (action.event) {
214 case MessagesEventBusDispatchEvent.UpdatePoll: {
215 // basically reset
216 this.status = MessagesEventBusStatus.Initializing
217 this.latestRev = undefined
218 this.init()
219 break
220 }
221 case MessagesEventBusDispatchEvent.Resume: {
222 this.status = MessagesEventBusStatus.Ready
223 this.resetPoll()
224 this.emitter.emit('event', {type: 'connect'})
225 break
226 }
227 }
228 break
229 }
230 default:
231 break
232 }
233
234 logger.debug(`dispatch '${action.event}'`, {
235 id: this.id,
236 prev: prevStatus,
237 next: this.status,
238 })
239 }
240
241 private async init() {
242 logger.debug(`init`, {})
243
244 try {
245 const response = await networkRetry(2, () => {
246 return this.agent.chat.bsky.convo.getLog(
247 {},
248 {headers: DM_SERVICE_HEADERS},
249 )
250 })
251 // throw new Error('UNCOMMENT TO TEST INIT FAILURE')
252
253 const {cursor} = response.data
254
255 // should always be defined
256 if (cursor) {
257 if (!this.latestRev) {
258 this.latestRev = cursor
259 } else if (cursor > this.latestRev) {
260 this.latestRev = cursor
261 }
262 }
263
264 this.dispatch({event: MessagesEventBusDispatchEvent.Ready})
265 } catch (e: any) {
266 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) {
267 logger.error(`init failed`, {
268 safeMessage: e.message,
269 })
270 }
271
272 this.dispatch({
273 event: MessagesEventBusDispatchEvent.Error,
274 payload: {
275 exception: e,
276 code: MessagesEventBusErrorCode.InitFailed,
277 retry: () => {
278 this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
279 },
280 },
281 })
282 }
283 }
284
285 /*
286 * Polling
287 */
288
289 private isPolling = false
290 private pollIntervalRef: NodeJS.Timeout | undefined
291
292 private getPollInterval() {
293 switch (this.status) {
294 case MessagesEventBusStatus.Ready: {
295 const requested = Array.from(this.requestedPollIntervals.values())
296 const lowest = Math.min(DEFAULT_POLL_INTERVAL, ...requested)
297 return lowest
298 }
299 case MessagesEventBusStatus.Backgrounded: {
300 return BACKGROUND_POLL_INTERVAL
301 }
302 default:
303 return DEFAULT_POLL_INTERVAL
304 }
305 }
306
307 private resetPoll() {
308 this.pollInterval = this.getPollInterval()
309 this.stopPoll()
310 this.startPoll()
311 }
312
313 private startPoll() {
314 if (!this.isPolling) this.poll()
315
316 this.pollIntervalRef = setInterval(() => {
317 if (this.isPolling) return
318 this.poll()
319 }, this.pollInterval)
320 }
321
322 private stopPoll() {
323 if (this.pollIntervalRef) clearInterval(this.pollIntervalRef)
324 }
325
326 private async poll() {
327 if (this.isPolling) return
328
329 this.isPolling = true
330
331 // logger.debug(
332 // `poll`,
333 // {
334 // requestedPollIntervals: Array.from(
335 // this.requestedPollIntervals.values(),
336 // ),
337 // },
338 // )
339
340 try {
341 const response = await networkRetry(2, () => {
342 return this.agent.chat.bsky.convo.getLog(
343 {
344 cursor: this.latestRev,
345 },
346 {headers: DM_SERVICE_HEADERS},
347 )
348 })
349
350 // throw new Error('UNCOMMENT TO TEST POLL FAILURE')
351
352 const {logs: events} = response.data
353
354 let needsEmit = false
355 let batch: ChatBskyConvoGetLog.OutputSchema['logs'] = []
356
357 for (const ev of events) {
358 /*
359 * If there's a rev, we should handle it. If there's not a rev, we don't
360 * know what it is.
361 */
362 if ('rev' in ev && typeof ev.rev === 'string') {
363 /*
364 * We only care about new events
365 */
366 if (ev.rev > (this.latestRev = this.latestRev || ev.rev)) {
367 /*
368 * Update rev regardless of if it's a ev type we care about or not
369 */
370 this.latestRev = ev.rev
371 needsEmit = true
372 batch.push(ev)
373 }
374 }
375 }
376
377 if (needsEmit) {
378 this.emitter.emit('event', {type: 'logs', logs: batch})
379 }
380 } catch (e: any) {
381 if (!isNetworkError(e) && !isErrorMaybeAppPasswordPermissions(e)) {
382 logger.error(`poll events failed`, {
383 safeMessage: e.message,
384 })
385 }
386
387 this.dispatch({
388 event: MessagesEventBusDispatchEvent.Error,
389 payload: {
390 exception: e,
391 code: MessagesEventBusErrorCode.PollFailed,
392 retry: () => {
393 this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
394 },
395 },
396 })
397 } finally {
398 this.isPolling = false
399 }
400 }
401}