Bluesky app fork with some witchin' additions 馃挮 witchsky.app
bluesky fork client
119
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 352 lines 9.5 kB view raw
1import { 2 AtpAgent, 3 CredentialSession, 4 type ToolsOzoneSafelinkDefs, 5 type ToolsOzoneSafelinkQueryEvents, 6} from '@atproto/api' 7import {ExpiredTokenError} from '@atproto/api/dist/client/types/com/atproto/server/confirmEmail.js' 8import {MINUTE} from '@atproto/common' 9import {LRUCache} from 'lru-cache' 10 11import {type ServiceConfig} from '../config.js' 12import type Database from '../db/index.js' 13import {type SafelinkRule} from '../db/schema.js' 14import {redirectLogger} from '../logger.js' 15 16const SAFELINK_MIN_FETCH_INTERVAL = 1_000 17const SAFELINK_MAX_FETCH_INTERVAL = 10_000 18const SCHEME_REGEX = /^[a-zA-Z][a-zA-Z0-9+.-]*:\/\// 19 20export class SafelinkClient { 21 private domainCache: LRUCache<string, SafelinkRule | 'ok'> 22 private urlCache: LRUCache<string, SafelinkRule | 'ok'> 23 24 private db: Database 25 26 private ozoneAgent: OzoneAgent 27 28 private cursor?: string 29 30 constructor({cfg, db}: {cfg: ServiceConfig; db: Database}) { 31 this.domainCache = new LRUCache<string, SafelinkRule | 'ok'>({ 32 max: 10000, 33 }) 34 35 this.urlCache = new LRUCache<string, SafelinkRule | 'ok'>({ 36 max: 25000, 37 }) 38 39 this.db = db 40 41 this.ozoneAgent = new OzoneAgent( 42 cfg.safelinkPdsUrl!, 43 cfg.safelinkAgentIdentifier!, 44 cfg.safelinkAgentPass!, 45 ) 46 } 47 48 public async tryFindRule(link: string): Promise<SafelinkRule | 'ok'> { 49 let url: string 50 let domain: string 51 try { 52 url = SafelinkClient.normalizeUrl(link) 53 domain = SafelinkClient.normalizeDomain(link) 54 } catch (e) { 55 redirectLogger.error( 56 {error: e, inputUrl: link}, 57 'failed to normalize looked up link', 58 ) 59 // fail open 60 return 'ok' 61 } 62 63 // First, check if there is an existing URL rule. Note that even if the rule is 'ok', we still 64 // want to check for a blocking domain rule, so we will only return here if the url rule exists 65 // _and_ it is not 'ok'. 66 const urlRule = this.urlCache.get(url) 67 if (urlRule && urlRule !== 'ok') { 68 return urlRule 69 } 70 71 // If we find a domain rule of _any_ kind, including 'ok', we can now return that rule. 72 const domainRule = this.domainCache.get(domain) 73 if (domainRule) { 74 return domainRule 75 } 76 77 try { 78 const maybeUrlRule = await this.getRule(this.db, url, 'url') 79 this.urlCache.set(url, maybeUrlRule) 80 return maybeUrlRule 81 } catch (e) { 82 this.urlCache.set(url, 'ok') 83 } 84 85 try { 86 const maybeDomainRule = await this.getRule(this.db, domain, 'domain') 87 this.domainCache.set(domain, maybeDomainRule) 88 return maybeDomainRule 89 } catch (e) { 90 this.domainCache.set(domain, 'ok') 91 } 92 93 return 'ok' 94 } 95 96 private async getRule( 97 db: Database, 98 url: string, 99 pattern: ToolsOzoneSafelinkDefs.PatternType, 100 ): Promise<SafelinkRule> { 101 return db.db 102 .selectFrom('safelink_rule') 103 .selectAll() 104 .where('url', '=', url) 105 .where('pattern', '=', pattern) 106 .orderBy('createdAt', 'desc') 107 .executeTakeFirstOrThrow() 108 } 109 110 private async addRule(db: Database, rule: SafelinkRule) { 111 try { 112 if (rule.pattern === 'url') { 113 rule.url = SafelinkClient.normalizeUrl(rule.url) 114 } else if (rule.pattern === 'domain') { 115 rule.url = SafelinkClient.normalizeDomain(rule.url) 116 } 117 } catch (e) { 118 redirectLogger.error( 119 {error: e, inputUrl: rule.url}, 120 'failed to normalize rule input URL', 121 ) 122 return 123 } 124 125 db.db 126 .insertInto('safelink_rule') 127 .values({ 128 id: rule.id, 129 eventType: rule.eventType, 130 url: rule.url, 131 pattern: rule.pattern, 132 action: rule.action, 133 createdAt: rule.createdAt, 134 }) 135 .execute() 136 .catch(err => { 137 redirectLogger.error( 138 {error: err, rule}, 139 'failed to add rule to database', 140 ) 141 }) 142 143 if (rule.pattern === 'domain') { 144 this.domainCache.delete(rule.url) 145 } else { 146 this.urlCache.delete(rule.url) 147 } 148 } 149 150 private async removeRule(db: Database, rule: SafelinkRule) { 151 try { 152 if (rule.pattern === 'url') { 153 rule.url = SafelinkClient.normalizeUrl(rule.url) 154 } else if (rule.pattern === 'domain') { 155 rule.url = SafelinkClient.normalizeDomain(rule.url) 156 } 157 } catch (e) { 158 redirectLogger.error( 159 {error: e, inputUrl: rule.url}, 160 'failed to normalize rule input URL', 161 ) 162 return 163 } 164 165 await db.db 166 .deleteFrom('safelink_rule') 167 .where('pattern', '=', 'domain') 168 .where('url', '=', rule.url) 169 .execute() 170 .catch(err => { 171 redirectLogger.error( 172 {error: err, rule}, 173 'failed to remove rule from database', 174 ) 175 }) 176 177 if (rule.pattern === 'domain') { 178 this.domainCache.delete(rule.url) 179 } else { 180 this.urlCache.delete(rule.url) 181 } 182 } 183 184 public async runFetchEvents() { 185 let agent: AtpAgent 186 try { 187 agent = await this.ozoneAgent.getAgent() 188 } catch (err) { 189 redirectLogger.error({error: err}, 'error getting Ozone agent') 190 setTimeout(() => this.runFetchEvents(), SAFELINK_MAX_FETCH_INTERVAL) 191 return 192 } 193 194 let res: ToolsOzoneSafelinkQueryEvents.Response 195 try { 196 const cursor = await this.getCursor() 197 res = await agent.tools.ozone.safelink.queryEvents({ 198 cursor, 199 limit: 100, 200 sortDirection: 'asc', 201 }) 202 } catch (err) { 203 if (err instanceof ExpiredTokenError) { 204 redirectLogger.info('ozone agent had expired session, refreshing...') 205 await this.ozoneAgent.refreshSession() 206 setTimeout(() => this.runFetchEvents(), SAFELINK_MIN_FETCH_INTERVAL) 207 return 208 } 209 210 redirectLogger.error( 211 {error: err}, 212 'error fetching safelink events from Ozone', 213 ) 214 setTimeout(() => this.runFetchEvents(), SAFELINK_MAX_FETCH_INTERVAL) 215 return 216 } 217 218 if (res.data.events.length === 0) { 219 redirectLogger.info('received no new safelink events from ozone') 220 setTimeout(() => this.runFetchEvents(), SAFELINK_MAX_FETCH_INTERVAL) 221 } else { 222 await this.db.transaction(async db => { 223 for (const rule of res.data.events) { 224 switch (rule.eventType) { 225 case 'removeRule': 226 await this.removeRule(db, rule) 227 break 228 case 'addRule': 229 case 'updateRule': 230 await this.addRule(db, rule) 231 break 232 default: 233 redirectLogger.warn({rule}, 'received unknown rule event type') 234 } 235 } 236 }) 237 if (res.data.cursor) { 238 redirectLogger.info( 239 {cursor: res.data.cursor}, 240 'received new safelink events from Ozone', 241 ) 242 await this.setCursor(res.data.cursor) 243 } 244 setTimeout(() => this.runFetchEvents(), SAFELINK_MIN_FETCH_INTERVAL) 245 } 246 } 247 248 private async getCursor() { 249 if (this.cursor === '') { 250 const res = await this.db.db 251 .selectFrom('safelink_cursor') 252 .selectAll() 253 .where('id', '=', 1) 254 .executeTakeFirst() 255 if (!res) { 256 return '' 257 } 258 this.cursor = res.cursor 259 } 260 return this.cursor 261 } 262 263 private async setCursor(cursor: string) { 264 const updatedAt = new Date() 265 try { 266 await this.db.db 267 .insertInto('safelink_cursor') 268 .values({ 269 id: 1, 270 cursor, 271 updatedAt, 272 }) 273 .onConflict(oc => oc.column('id').doUpdateSet({cursor, updatedAt})) 274 .execute() 275 this.cursor = cursor 276 } catch (err) { 277 redirectLogger.error({error: err}, 'failed to update safelink cursor') 278 } 279 } 280 281 private static normalizeUrl(input: string) { 282 if (!SCHEME_REGEX.test(input)) { 283 input = `https://${input}` 284 } 285 const u = new URL(input) 286 u.hash = '' 287 let normalized = u.href.replace(SCHEME_REGEX, '').toLowerCase() 288 if (normalized.endsWith('/')) { 289 normalized = normalized.substring(0, normalized.length - 1) 290 } 291 return normalized 292 } 293 294 private static normalizeDomain(input: string) { 295 if (!SCHEME_REGEX.test(input)) { 296 input = `https://${input}` 297 } 298 const u = new URL(input) 299 return u.host.toLowerCase() 300 } 301} 302 303export class OzoneAgent { 304 private identifier: string 305 private password: string 306 307 private session: CredentialSession 308 private agent: AtpAgent 309 310 private refreshAt = 0 311 312 constructor(pdsHost: string, identifier: string, password: string) { 313 this.identifier = identifier 314 this.password = password 315 316 this.session = new CredentialSession(new URL(pdsHost)) 317 this.agent = new AtpAgent(this.session) 318 } 319 320 public async getAgent() { 321 if (!this.identifier && !this.password) { 322 throw new Error( 323 'OZONE_AGENT_HANDLE and OZONE_AGENT_PASS environment variables must be set', 324 ) 325 } 326 327 if (!this.session.hasSession) { 328 redirectLogger.info('creating Ozone session') 329 await this.session.login({ 330 identifier: this.identifier, 331 password: this.password, 332 }) 333 redirectLogger.info('ozone session created successfully') 334 this.refreshAt = Date.now() + 50 * MINUTE 335 } 336 337 if (Date.now() <= this.refreshAt) { 338 await this.refreshSession() 339 } 340 341 return this.agent 342 } 343 344 public async refreshSession() { 345 try { 346 await this.session.refreshSession() 347 this.refreshAt = Date.now() + 50 * MINUTE 348 } catch (e) { 349 redirectLogger.error({error: e}, 'error refreshing session') 350 } 351 } 352}