···1313 pdsConfigs: Record<string, PDSConfig>,
1414) => {
1515 try {
1616+ let targetDid = "";
1717+ if (label.uri.startsWith("did:")) {
1818+ // Identity label
1919+ targetDid = label.uri;
2020+ } else {
2121+ // Content label for a Record
2222+ //TODO need to pass on the full url later for logging to the db and notifiation
2323+ let atUriSplit = label.uri.split("/");
2424+ let repoDid = atUriSplit[2];
2525+ }
2626+1627 // TODO: MAKE SURE TO CHECK NEG
1728 let labledDate = new Date(label.cts);
1829 logger.debug(
···3344 .from(schema.watchedRepos)
3445 .where(
3546 and(
3636- eq(schema.watchedRepos.did, label.uri),
4747+ eq(schema.watchedRepos.did, targetDid),
3748 eq(schema.watchedRepos.active, true),
3849 ),
3950 )
···4253 if (isRepoWatched.length > 0) {
4354 const watchedRepo = isRepoWatched[0];
4455 if (watchedRepo == undefined) {
4545- throw new Error(`Unexpected error on watched repo: ${label.uri}`);
5656+ throw new Error(`Unexpected error on watched repo: ${targetDid}`);
4657 }
4758 const pdsConfig = Object.values(pdsConfigs).find(
4859 (config) => config.host === watchedRepo.pdsHost,
···6172 .from(schema.labelsApplied)
6273 .where(
6374 and(
6464- eq(schema.labelsApplied.did, label.uri),
7575+ eq(schema.labelsApplied.did, targetDid),
6576 eq(schema.labelsApplied.label, label.val),
6677 eq(schema.labelsApplied.labeler, config.host),
6778 ),
···7990 })
8091 .where(eq(schema.labelsApplied.id, existingRecord.id));
8192 logger.debug(
8282- { did: label.uri, label: label.val },
9393+ { did: targetDid, label: label.val },
8394 "Updated existing label record",
8495 );
8596 } else {
8697 await db.insert(schema.labelsApplied).values({
8787- did: label.uri,
9898+ did: targetDid,
8899 label: label.val,
89100 labeler: config.host,
90101 action: labelConfig.action,
···97108 if (labelConfig.action === "notify") {
98109 //TODO need to prob move this to a queue cause backfill can hit rate limit
99110 await sendLabelNotification(pdsConfig.notifyEmails, {
100100- did: label.uri,
111111+ did: targetDid,
101112 pds: pdsConfig.host,
102113 label: label.val,
103114 labeler: config.host,
+6-6
src/handlers/lablerSubscriber.ts
···3434 // Saves the cursor for resume and re connect
3535 if ("seq" in message) {
3636 cursor = message.seq;
3737+ // May change to only save cursor every so often to cut down on writes
3838+ // if (cursor % 10 === 0) {
3739 await db
3840 .insert(labelerCursor)
3941 .values({ labelerId: config.host, cursor: message.seq })
···4143 target: [labelerCursor.labelerId],
4244 set: { cursor: message.seq },
4345 });
4646+ // }
4447 }
4548 switch (message.$type) {
4649 case "com.atproto.label.subscribeLabels#info": {
···4952 }
5053 case "com.atproto.label.subscribeLabels#labels": {
5154 for (const label of message.labels) {
5252- // We only care about labels for identities, not content for now
5353- if (label.uri.startsWith("did:")) {
5454- queue.add(async () => {
5555- await handleNewLabel(config, label, db, pdsConfigs);
5656- });
5757- }
5555+ queue.add(async () => {
5656+ await handleNewLabel(config, label, db, pdsConfigs);
5757+ });
5858 }
5959 break;
6060 }
-7
src/index.ts
···1313const labelQueue = new PQueue({ concurrency: 2 });
1414const identityQueue = new PQueue({ concurrency: 2 });
15151616-// TODO
1717-// 1. Figure out a schema for settings we want. PDSs to watch.Labelers and their Labels
1818-// and which actions to do for them (notification/email) or auto takedown. thinking toml file maybe?
1919-// 2. Add a CLI argument to backfill PDS repos on start up. If finds a new active repo adds it
2020-// 3. Add a firehose listener that subscribes to the PDSs for new identities? (I say maybe not cause of bandwidth)
2121-// 4. We can save the last sen sequence from the labler to the db and restore it on startup for backfill
2222-2316// Run Drizzle migrations on startup
2417migrate(db, { migrationsFolder: process.env.MIGRATIONS_FOLDER ?? "drizzle" });
2518