this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

emitEvent implementation

futurGH 921c27e6 d65ce416

+201 -6
+1
package.json
··· 40 40 "dependencies": { 41 41 "@atcute/cbor": "^1.0.0", 42 42 "@atproto/crypto": "^0.4.0", 43 + "@atproto/identity": "^0.4.0", 43 44 "@atproto/xrpc-server": "^0.6.2", 44 45 "better-sqlite3": "^11.1.2", 45 46 "express": "^4.19.2",
+57
pnpm-lock.yaml
··· 11 11 '@atproto/crypto': 12 12 specifier: ^0.4.0 13 13 version: 0.4.0 14 + '@atproto/identity': 15 + specifier: ^0.4.0 16 + version: 0.4.0 14 17 '@atproto/xrpc-server': 15 18 specifier: ^0.6.2 16 19 version: 0.6.2 ··· 129 132 '@noble/curves': 1.5.0 130 133 '@noble/hashes': 1.4.0 131 134 uint8arrays: 3.0.0 135 + dev: false 136 + 137 + /@atproto/identity@0.4.0: 138 + resolution: {integrity: sha512-KKdVlqBgkFuTUx3KFiiQe0LuK9kopej1bhKm6SHRPEYbSEPFmRZQMY9TAjWJQrvQt8DpQzz6kVGjASFEjd3teQ==} 139 + dependencies: 140 + '@atproto/common-web': 0.3.0 141 + '@atproto/crypto': 0.4.0 142 + axios: 0.27.2 143 + transitivePeerDependencies: 144 + - debug 132 145 dev: false 133 146 134 147 /@atproto/lexicon@0.4.1: ··· 652 665 engines: {node: '>=8'} 653 666 dev: true 654 667 668 + /asynckit@0.4.0: 669 + resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==} 670 + dev: false 671 + 655 672 /atomic-sleep@1.0.0: 656 673 resolution: {integrity: sha512-kNOjDqAh7px0XWNI+4QbzoiR/nTkHAWNud2uvnJquD1/x5a7EQZMJT0AczqK0Qn67oY/TTQ1LbUKajZpp3I9tQ==} 657 674 engines: {node: '>=8.0.0'} ··· 660 677 /await-lock@2.2.2: 661 678 resolution: {integrity: sha512-aDczADvlvTGajTDjcjpJMqRkOF6Qdz3YbPZm/PyW6tKPkx2hlYBzxMhEywM/tU72HrVZjgl5VCdRuMlA7pZ8Gw==} 662 679 dev: true 680 + 681 + /axios@0.27.2: 682 + resolution: {integrity: sha512-t+yRIyySRTp/wua5xEr+z1q60QmLq8ABsS5O9Me1AsE5dfKqgnCFzwiCZZ/cGNd1lq4/7akDWMxdhVlucjmnOQ==} 683 + dependencies: 684 + follow-redirects: 1.15.6 685 + form-data: 4.0.0 686 + transitivePeerDependencies: 687 + - debug 688 + dev: false 663 689 664 690 /balanced-match@1.0.2: 665 691 resolution: {integrity: sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==} ··· 810 836 resolution: {integrity: sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==} 811 837 dev: true 812 838 839 + /combined-stream@1.0.8: 840 + resolution: {integrity: sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==} 841 + engines: {node: '>= 0.8'} 842 + dependencies: 843 + delayed-stream: 1.0.0 844 + dev: false 845 + 813 846 /concat-map@0.0.1: 814 847 resolution: {integrity: sha1-2Klr13/Wjfd5OnMDajug1UBdR3s=} 815 848 dev: true ··· 890 923 es-define-property: 1.0.0 891 924 es-errors: 1.3.0 892 925 gopd: 1.0.1 926 + dev: false 927 + 928 + /delayed-stream@1.0.0: 929 + resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} 930 + engines: {node: '>=0.4.0'} 893 931 dev: false 894 932 895 933 /depd@2.0.0: ··· 1221 1259 /flatted@3.2.9: 1222 1260 resolution: {integrity: sha512-36yxDn5H7OFZQla0/jFJmbIKTdZAQHngCedGxiMmpNfEZM0sdEeT+WczLQrjK6D7o2aiyLYDnkw0R3JK0Qv1RQ==} 1223 1261 dev: true 1262 + 1263 + /follow-redirects@1.15.6: 1264 + resolution: {integrity: sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==} 1265 + engines: {node: '>=4.0'} 1266 + peerDependencies: 1267 + debug: '*' 1268 + peerDependenciesMeta: 1269 + debug: 1270 + optional: true 1271 + dev: false 1272 + 1273 + /form-data@4.0.0: 1274 + resolution: {integrity: sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==} 1275 + engines: {node: '>= 6'} 1276 + dependencies: 1277 + asynckit: 0.4.0 1278 + combined-stream: 1.0.8 1279 + mime-types: 2.1.35 1280 + dev: false 1224 1281 1225 1282 /forwarded@0.2.0: 1226 1283 resolution: {integrity: sha512-buRG0fpBtRHSTCOASe6hD258tEubFoRLb4ZNA6NxMVHNw2gOcwHo9wyablzMzOA5z9xA9L1KNjk/Nt6MT9aYow==}
+143 -6
src/LabelerServer.ts
··· 1 - import type { ComAtprotoLabelDefs } from "@atproto/api"; 1 + import { 2 + ComAtprotoLabelDefs, 3 + ToolsOzoneModerationDefs, 4 + ToolsOzoneModerationEmitEvent, 5 + } from "@atproto/api"; 2 6 import { Keypair, Secp256k1Keypair } from "@atproto/crypto"; 3 - import { ErrorFrame, InvalidRequestError, MessageFrame, XRPCError } from "@atproto/xrpc-server"; 7 + import { IdResolver } from "@atproto/identity"; 8 + import { 9 + AuthRequiredError, 10 + ErrorFrame, 11 + InvalidRequestError, 12 + MessageFrame, 13 + parseReqNsid, 14 + verifyJwt, 15 + XRPCError, 16 + } from "@atproto/xrpc-server"; 4 17 import Database, { type Database as SQLiteDatabase } from "better-sqlite3"; 5 - import express from "express"; 18 + import express, { type RequestHandler } from "express"; 6 19 import expressWs, { type Application, WebsocketRequestHandler } from "express-ws"; 7 20 import { Server } from "node:http"; 8 21 import { fromString as ui8FromString } from "uint8arrays"; ··· 13 26 export interface LabelerOptions { 14 27 did: string; 15 28 signingKey: string; 29 + auth?: (did: string) => boolean | Promise<boolean>; 16 30 dbFile?: string; 17 31 } 18 32 ··· 25 39 26 40 did: string; 27 41 42 + private auth: (did: string) => boolean | Promise<boolean>; 43 + 28 44 private signingKey: Keypair; 45 + 46 + private idResolver = new IdResolver(); 29 47 30 48 private subscriptions = new Set<WebSocket>(); 31 49 32 50 constructor(options: LabelerOptions) { 33 51 this.did = options.did; 34 52 this.signingKey = new Secp256k1Keypair(ui8FromString(options.signingKey, "hex"), false); 53 + this.auth = options.auth ?? ((did) => did === this.did); 35 54 36 55 this.db = new Database(options.dbFile ?? "labels.db"); 37 56 this.db.pragma("journal_mode = WAL"); ··· 52 71 this.app = expressWs(express().use(express.json())).app; 53 72 this.app.get("/xrpc/com.atproto.label.queryLabels", this.queryLabelsHandler); 54 73 this.app.ws("/xrpc/com.atproto.label.subscribeLabels", this.subscribeLabelsHandler); 74 + this.app.post("/xrpc/tools.ozone.moderation.emitEvent", this.emitEventHandler); 55 75 } 56 76 57 77 start(port = 443, callback?: () => void) { ··· 69 89 VALUES (?, ?, ?, ?, ?, ?, ?, ?) 70 90 `); 71 91 const { src, uri, cid, val, neg, cts, exp, sig } = signed; 72 - stmt.run(src, uri, cid, val, neg, cts, exp, sig); 92 + const result = stmt.run(src, uri, cid, val, neg, cts, exp, sig); 93 + if (!result.changes) throw new Error("Failed to insert label"); 94 + await this.emitLabel(signed); 73 95 return signed; 74 96 } 75 97 98 + async createLabels( 99 + subject: { uri: string; cid?: string | undefined }, 100 + labels: { create?: Array<string>; negate?: Array<string> }, 101 + ) { 102 + const { uri, cid } = subject; 103 + const { create, negate } = labels; 104 + if (create) { 105 + await Promise.all( 106 + create.map((val) => 107 + this.createLabel({ 108 + src: this.did, 109 + uri, 110 + ...(cid ? { cid } : {}), 111 + val, 112 + cts: new Date().toISOString(), 113 + }) 114 + ), 115 + ); 116 + } 117 + if (negate) { 118 + await Promise.all( 119 + negate.map((val) => 120 + this.createLabel({ 121 + src: this.did, 122 + uri, 123 + ...(cid ? { cid } : {}), 124 + val, 125 + neg: true, 126 + cts: new Date().toISOString(), 127 + }) 128 + ), 129 + ); 130 + } 131 + } 132 + 76 133 private async ensureSignedLabel(label: ComAtprotoLabelDefs.Label): Promise<SignedLabel> { 77 134 if (!labelIsSigned(label)) { 78 135 const signed = await signLabel(label, this.signingKey); ··· 87 144 return formatLabel(label); 88 145 } 89 146 90 - queryLabelsHandler: express.RequestHandler = async (req, res) => { 147 + private async emitLabel(label: ComAtprotoLabelDefs.Label) { 148 + const signed = await this.ensureSignedLabel(label); 149 + const frame = new MessageFrame({ seq: label.id, labels: [signed] }, { type: "#labels" }); 150 + this.subscriptions.forEach((ws) => ws.send(frame.toBytes())); 151 + } 152 + 153 + private async parseAuthHeaderDid(req: express.Request): Promise<string> { 154 + const authHeader = req.get("Authorization"); 155 + if (!authHeader) throw new AuthRequiredError("Authorization header is required"); 156 + 157 + const [type, token] = authHeader.split(" "); 158 + if (type !== "Bearer" || !token) { 159 + throw new InvalidRequestError("Missing or invalid bearer token", "MissingJwt"); 160 + } 161 + 162 + const payload = await verifyJwt( 163 + token, 164 + this.did, 165 + parseReqNsid(req), 166 + async (did, forceRefresh) => 167 + (await this.idResolver.did.resolveAtprotoData(did, forceRefresh)).signingKey, 168 + ); 169 + 170 + return payload.iss; 171 + } 172 + 173 + queryLabelsHandler: RequestHandler = async (req, res) => { 91 174 try { 92 175 const { uriPatterns, sources, limit: limitStr, cursor: cursorStr } = req.query as { 93 176 uriPatterns?: Array<string>; ··· 139 222 console.error(e); 140 223 res.status(500).json({ 141 224 error: "InternalError", 142 - message: e instanceof Error ? e.message : "An unknown error occurred", 225 + message: "An unknown error occurred", 143 226 }); 144 227 } 145 228 return; ··· 181 264 ws.on("close", () => { 182 265 this.subscriptions.delete(ws); 183 266 }); 267 + }; 268 + 269 + emitEventHandler: RequestHandler = async (req, res) => { 270 + try { 271 + const actorDid = await this.parseAuthHeaderDid(req); 272 + const authed = await this.auth(actorDid); 273 + if (!authed) { 274 + throw new AuthRequiredError("Unauthorized"); 275 + } 276 + 277 + const { event, subject, createdBy } = req 278 + .body as ToolsOzoneModerationEmitEvent.InputSchema; 279 + if (!event || !subject || !createdBy) { 280 + throw new InvalidRequestError("Missing required field(s)"); 281 + } 282 + 283 + if (event.$type !== "tools.ozone.moderation.defs#modEventLabel") { 284 + throw new InvalidRequestError("Unsupported event type"); 285 + } 286 + const labelEvent = event as ToolsOzoneModerationDefs.ModEventLabel; 287 + const uri = 288 + subject.$type === "com.atproto.admin.defs#repoRef" 289 + && typeof subject.did === "string" 290 + ? subject.did 291 + : (subject.$type === "com.atproto.repo.strongRef#main" 292 + || subject.$type === "com.atproto.repo.strongRef") 293 + && typeof subject.uri === "string" 294 + ? subject.uri 295 + : null; 296 + const cid = 297 + subject.$type === "com.atproto.admin.defs#repoRef" 298 + && typeof subject.cid === "string" 299 + ? subject.cid 300 + : undefined; 301 + 302 + if (!uri) { 303 + throw new InvalidRequestError("Invalid subject"); 304 + } 305 + 306 + await this.createLabels({ uri, cid }, { 307 + create: labelEvent.createLabelVals, 308 + negate: labelEvent.negateLabelVals, 309 + }); 310 + } catch (e) { 311 + if (e instanceof XRPCError) { 312 + res.status(e.type).json(e.payload); 313 + } else { 314 + console.error(e); 315 + res.status(500).json({ 316 + error: "InternalError", 317 + message: "An unknown error occurred", 318 + }); 319 + } 320 + } 184 321 }; 185 322 }