Syncs atproto following list from a source of truth of one's choosing
1import { Client, CredentialManager } from "@atcute/client";
2import { Handle, Did, ActorIdentifier } from "@atcute/lexicons";
3import * as dotenv from "dotenv";
4
5dotenv.config();
6
7const APPS: Record<string, string> = {
8 bsky: "app.bsky.graph.follow",
9 tangled: "sh.tangled.graph.follow",
10 // TODO: add more apps here. eg:
11 // whitewind: "com.whtwnd.graph.follow"
12};
13
14const BSKY_HANDLE = process.env.BSKY_HANDLE;
15const BSKY_PASSWORD = process.env.BSKY_PASSWORD;
16const SHOULD_DELETE = !process.argv.includes("--no-delete");
17
18const sourceArg = process.argv.find((arg) => arg.startsWith("--source="));
19const SOURCE_KEY = sourceArg ? sourceArg.split("=")[1] : "bsky";
20
21if (!APPS[SOURCE_KEY]) {
22 console.error(`error: source '${SOURCE_KEY}' not found in APPS config`);
23 console.error(`available apps: ${Object.keys(APPS).join(", ")}`);
24 process.exit(1);
25}
26
27if (!BSKY_HANDLE || !BSKY_PASSWORD) {
28 process.exit(1);
29}
30
31let rpc: Client;
32let manager: CredentialManager;
33let agentDID: Did;
34
35const resolveHandle = async (handle: string): Promise<Did> => {
36 const publicRpc = new Client({
37 handler: new CredentialManager({ service: "https://public.api.bsky.app" }),
38 });
39
40 const res = await publicRpc.get("com.atproto.identity.resolveHandle", {
41 params: { handle: handle as Handle },
42 });
43
44 if (!res.ok) throw new Error(res.data.error);
45 return res.data.did;
46};
47
48const getPDS = async (did: string) => {
49 const res = await fetch(
50 did.startsWith("did:web")
51 ? `https://${did.split(":")[2]}/.well-known/did.json`
52 : "https://plc.directory/" + did,
53 );
54
55 return res.json().then((doc: any) => {
56 for (const service of doc.service) {
57 if (service.id === "#atproto_pds") return service.serviceEndpoint;
58 }
59 throw new Error("no PDS endpoint found");
60 });
61};
62
63const fetchAllRecords = async (collection: string): Promise<Map<string, string>> => {
64 const records = new Map<string, string>();
65 let cursor: string | undefined;
66
67 process.stdout.write(`fetching records from ${collection}...`);
68
69 do {
70 const res = await rpc.get("com.atproto.repo.listRecords", {
71 params: {
72 repo: agentDID as ActorIdentifier,
73 collection: collection,
74 limit: 100,
75 cursor: cursor,
76 },
77 });
78
79 if (!res.ok) throw new Error(res.data.error);
80
81 res.data.records.forEach((record: any) => {
82 const rkey = record.uri.split("/").pop();
83 if (record.value.subject && rkey) {
84 records.set(record.value.subject, rkey);
85 }
86 });
87
88 cursor = res.data.cursor;
89 process.stdout.write(".");
90 } while (cursor);
91
92 console.log(` done (${records.size})`);
93 return records;
94};
95
96const createFollowRecord = async (collection: string, targetDid: Did) => {
97 const record = {
98 $type: collection,
99 subject: targetDid,
100 createdAt: new Date().toISOString(),
101 };
102
103 await rpc.post("com.atproto.repo.createRecord", {
104 input: {
105 repo: agentDID,
106 collection: collection,
107 record: record,
108 },
109 });
110};
111
112const deleteFollowRecord = async (collection: string, rkey: string) => {
113 await rpc.post("com.atproto.repo.deleteRecord", {
114 input: {
115 repo: agentDID,
116 collection: collection,
117 rkey: rkey,
118 },
119 });
120};
121
122const syncCollection = async (
123 targetAppName: string,
124 targetCollection: string,
125 sourceDids: Set<string>
126) => {
127 console.log(`\ndownstream target is ${targetAppName} (${targetCollection})`);
128
129 const currentTargetRecords = await fetchAllRecords(targetCollection);
130 let addedCount = 0;
131 let deletedCount = 0;
132
133 for (const subjectDid of sourceDids) {
134 if (!currentTargetRecords.has(subjectDid)) {
135 process.stdout.write(`[+] following ${subjectDid}... `);
136 await createFollowRecord(targetCollection, subjectDid as Did);
137 console.log("done");
138 addedCount++;
139 await new Promise((resolve) => setTimeout(resolve, 1000));
140 } else {
141 currentTargetRecords.delete(subjectDid);
142 }
143 }
144
145 if (SHOULD_DELETE && currentTargetRecords.size > 0) {
146 console.log(`found ${currentTargetRecords.size} orphans in ${targetAppName}, pruning...`);
147
148 let progress = 0;
149 for (const [did, rkey] of currentTargetRecords) {
150 progress++;
151 process.stdout.write(`[-] [${progress}/${currentTargetRecords.size}] unfollowing ${did}... `);
152 await deleteFollowRecord(targetCollection, rkey);
153 console.log("done");
154 deletedCount++;
155 await new Promise((resolve) => setTimeout(resolve, 1000));
156 }
157 } else if (!SHOULD_DELETE && currentTargetRecords.size > 0) {
158 console.log(`skipping deletion of ${currentTargetRecords.size} orphans (--no-delete)`);
159 }
160
161 console.log(`sync complete for ${targetAppName}: +${addedCount} added, -${deletedCount} removed`);
162};
163
164const main = async () => {
165 try {
166 if (!SHOULD_DELETE) console.log("running in add-only mode (--no-delete detected)\ncoward!! :3");
167
168 agentDID = BSKY_HANDLE.startsWith("did:")
169 ? (BSKY_HANDLE as Did)
170 : await resolveHandle(BSKY_HANDLE);
171
172 const pdsUrl = await getPDS(agentDID);
173 manager = new CredentialManager({ service: pdsUrl });
174 rpc = new Client({ handler: manager });
175
176 await manager.login({
177 identifier: agentDID,
178 password: BSKY_PASSWORD,
179 });
180
181 console.log(`\nSOURCE OF TRUTH: ${SOURCE_KEY} (${APPS[SOURCE_KEY]})`);
182
183 const sourceMap = await fetchAllRecords(APPS[SOURCE_KEY]);
184 const sourceDids = new Set(sourceMap.keys());
185
186 const targetApps = Object.entries(APPS).filter(([key]) => key !== SOURCE_KEY);
187
188 if (targetApps.length === 0) {
189 console.log("no target apps found to sync to");
190 return;
191 }
192
193 for (const [appName, collectionUri] of targetApps) {
194 await syncCollection(appName, collectionUri, sourceDids);
195 }
196
197 console.log("\nall syncs finished, nini");
198
199 } catch (error) {
200 console.error(error);
201 }
202};
203
204main();