A work-in-progress chat bot for Streamplace with chat overlay functionality
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

No more hardcoded DIDs, bot instances for all following accounts

+118 -28
+6 -4
env.ts
··· 2 2 3 3 await load({ export: true }); 4 4 5 - export const PDS_HOST_URL = Deno.env.get("PDS_HOST_URL"); 6 - export const ATPROTO_USERNAME = Deno.env.get("ATPROTO_USERNAME"); 7 - export const ATPROTO_PASSWORD = Deno.env.get("ATPROTO_PASSWORD"); 8 - export const JETSTREAM_URL = Deno.env.get("JETSTREAM_URL"); 5 + export const PDS_HOST_URL = Deno.env.get("PDS_HOST_URL")!; 6 + export const ATPROTO_USERNAME = Deno.env.get("ATPROTO_USERNAME")! as 7 + | Did 8 + | Handle; 9 + export const ATPROTO_PASSWORD = Deno.env.get("ATPROTO_PASSWORD")!; 10 + export const JETSTREAM_URL = Deno.env.get("JETSTREAM_URL")!;
+27 -9
main.ts
··· 1 1 import { App, staticFiles } from "fresh"; 2 2 import { type State } from "./utils.ts"; 3 - import { streamplaceWS } from "./utils/websocket.ts"; 3 + import { isHandle } from "@atcute/lexicons/syntax"; 4 + import { resolveHandle } from "./utils/atcuteUtils.ts"; 5 + import { filterByStreamplace, getBacklinks } from "./utils/constellationUtils.ts"; 4 6 import StreamplaceBot from "./utils/streamplaceBot.ts"; 7 + import { streamplaceWS } from "./utils/websocket.ts"; 5 8 import { ATPROTO_PASSWORD, ATPROTO_USERNAME, PDS_HOST_URL } from "./env.ts"; 6 9 7 10 export const app = new App<State>(); 8 11 9 - export const streamplaceBot = new StreamplaceBot( 10 - "did:plc:o6xucog6fghiyrvp7pyqxcs3", //this is me, make more flexible later 11 - { 12 - username: ATPROTO_USERNAME!, 13 - password: ATPROTO_PASSWORD!, 14 - pdsHostUrl: PDS_HOST_URL!, 15 - }, 12 + // Get bot following 13 + const botDid = isHandle(ATPROTO_USERNAME) 14 + ? await resolveHandle(ATPROTO_USERNAME) 15 + : ATPROTO_USERNAME; 16 + export const botInstances: Map<Did, StreamplaceBot> = new Map(); 17 + const backlinks = await getBacklinks( 18 + botDid, 19 + "app.bsky.graph.follow", 20 + ".subject", 21 + 100, 16 22 ); 17 - streamplaceBot.init(); 23 + const filteredBacklinks = await filterByStreamplace(backlinks); 24 + for (const backlink of filteredBacklinks) { 25 + const streamplaceBot = new StreamplaceBot( 26 + backlink, 27 + { 28 + username: ATPROTO_USERNAME!, 29 + password: ATPROTO_PASSWORD!, 30 + pdsHostUrl: PDS_HOST_URL!, 31 + }, 32 + ); 33 + streamplaceBot.init(); 34 + botInstances.set(backlink, streamplaceBot); 35 + } 18 36 streamplaceWS.start(); 19 37 app.use(staticFiles()); 20 38
+46 -3
utils/atcuteUtils.ts
··· 142 142 return await handleResolver.resolve(handle); 143 143 } 144 144 145 + // com.atproto.repo.describeRepo wrapper 146 + export async function describeRepo(did: Did, pdsHostUrl: string): Promise< 147 + { 148 + collections: `${string}.${string}.${string}`[]; 149 + did: `did:${string}:${string}`; 150 + didDoc: Record<string, unknown>; 151 + handle: `${string}.${string}`; 152 + handleIsCorrect: boolean; 153 + } | null 154 + > { 155 + const rpc = new Client({ 156 + handler: simpleFetchHandler({ 157 + service: pdsHostUrl, 158 + }), 159 + }); 160 + 161 + const response = await rpc.get("com.atproto.repo.describeRepo", { 162 + params: { repo: did }, 163 + }); 164 + 165 + if (response.ok) { 166 + return response.data; 167 + } else { 168 + switch (response.data.error) { 169 + case "InvalidRequest": 170 + // handle or account doesn't exist 171 + console.log(`describeRepo: ${did} does not exist.`); 172 + break; 173 + case "RepoTakendown": 174 + // account was taken down 175 + console.log(`describeRepo: ${did} was taken down.`); 176 + break; 177 + case "AccountDeactivated": 178 + // account deactivated by user 179 + console.log(`describeRepo: ${did} is deactivated.`); 180 + break; 181 + } 182 + } 183 + return null; 184 + } 185 + 145 186 // Get DID document 146 187 export async function getDidDocument(did: Did): Promise<DidDocument> { 147 188 if (!isAtprotoDid(did)) { ··· 206 247 params: params, 207 248 }); 208 249 if (!record.ok) { 209 - console.log( 210 - `No record of collection ${params.collection} and rkey ${params.rkey} found in ${params.repo}.`, 211 - ); 250 + // as this project scales, there will be a lot of this 251 + // maybe I should let users fine tune their logging 252 + // console.log( 253 + // `No record of collection ${params.collection} and rkey ${params.rkey} found in ${params.repo}.`, 254 + // ); 212 255 return undefined; 213 256 } 214 257
+28
utils/constellationUtils.ts
··· 1 + import { describeRepo, listRecords } from "./atcuteUtils.ts"; 2 + import { didResolver } from "./didResolver.ts"; 3 + 1 4 // Microcosm backlinks 2 5 interface ConstellationResponse { 3 6 total: number; ··· 37 40 throw error; 38 41 } 39 42 } 43 + 44 + export async function filterByStreamplace( 45 + backlinks: ConstellationResponse, 46 + ): Promise<Did[]> { 47 + const activeDids: Did[] = []; 48 + 49 + // Doing in batches probably necessary for larger following 50 + for (const follower of backlinks.linking_records) { 51 + const profile = await didResolver.resolve(follower.did); 52 + const repoDescription = await describeRepo( 53 + follower.did, 54 + profile.pdsEndpoint, 55 + ); 56 + if (!repoDescription) continue; 57 + const livestreams = await listRecords(profile.pdsEndpoint, { 58 + repo: follower.did, 59 + // Should this be "place.stream.livestream" instead? 60 + collection: "place.stream.chat.profile", 61 + }); 62 + if (livestreams.records?.length > 0) { 63 + activeDids.push(follower.did); 64 + } 65 + } 66 + return activeDids; 67 + }
+2 -1
utils/streamplaceBot.ts
··· 106 106 async processMessage(message: JetstreamMessage): Promise<void> { 107 107 if (!this.enabled) return; 108 108 109 + const streamer = await this.getUserProfile(this.streamerDid); 109 110 const record = message.commit.record!; 110 111 const text = record.text.trim(); 111 112 ··· 136 137 const handler = this.commands.get(commandName); 137 138 if (handler) { 138 139 console.log( 139 - `Executing command: ${commandName} from user: ${chatter.handle}`, 140 + `Executing command: ${commandName} in: ${streamer.handle} from user: ${chatter.handle}`, 140 141 ); 141 142 try { 142 143 await handler(message, args, this);
+9 -11
utils/websocket.ts
··· 1 1 import { JETSTREAM_URL } from "../env.ts"; 2 2 import { didResolver } from "./didResolver.ts"; 3 - import { streamplaceBot } from "../main.ts"; 3 + import { botInstances } from "../main.ts"; 4 4 5 5 // Client subscription message 6 6 interface SubscriptionMessage { ··· 54 54 jetstreamMessage.kind === "commit" && 55 55 jetstreamMessage.commit?.operation === "create" && 56 56 jetstreamMessage.commit.record && 57 - // TODO: make flexible later 58 - jetstreamMessage.commit.record.streamer === 59 - "did:plc:o6xucog6fghiyrvp7pyqxcs3" 57 + botInstances.has(jetstreamMessage.commit.record.streamer) 60 58 ) { 61 59 this.processJetstreamMessage(jetstreamMessage); 62 60 } ··· 139 137 } 140 138 141 139 private async processJetstreamMessage(jetstreamMessage: JetstreamMessage) { 142 - // Let bot handle it 143 - streamplaceBot.processMessage(jetstreamMessage); 144 - 145 140 // Extract the record data 146 141 const record = jetstreamMessage.commit.record!; 142 + const streamer = record.streamer; 147 143 const enrichedMessage = await this.enrichMessage( 148 144 jetstreamMessage, 149 145 record, 150 146 ); 151 147 152 - // Get clients subscribed to this streamer 153 - const streamer = record.streamer; 154 - const subscribedClients = this.streamerClients.get(streamer); 148 + // Let bot handle it 149 + const streamplaceBot = botInstances.get(streamer); 150 + streamplaceBot!.processMessage(jetstreamMessage); 155 151 152 + // Pass to subscribed websocket clients 153 + const subscribedClients = this.streamerClients.get(streamer); 156 154 if (subscribedClients && subscribedClients.size > 0) { 157 155 const messageJson = JSON.stringify({ 158 156 type: "chat_message", ··· 180 178 text: record.text, 181 179 author: profile, 182 180 timestamp: new Date(record.createdAt), 183 - facets: undefined,//record.facets || undefined, 181 + facets: undefined, //record.facets || undefined, 184 182 isReply: !!record.reply, 185 183 streamer: record.streamer, 186 184 };