A work-in-progress chat bot for Streamplace with chat overlay functionality
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Better typing, Jetstream eventHandler class

+174 -162
+1
deno.json
··· 17 17 "@atcute/client": "npm:@atcute/client@^4.0.3", 18 18 "@atcute/identity": "npm:@atcute/identity@^1.1.3", 19 19 "@atcute/identity-resolver": "npm:@atcute/identity-resolver@^1.1.3", 20 + "@atcute/jetstream": "npm:@atcute/jetstream@^1.1.2", 20 21 "@atcute/lex-cli": "npm:@atcute/lex-cli@^2.5.3", 21 22 "@atcute/lexicons": "npm:@atcute/lexicons@^1.1.1", 22 23 "@std/dotenv": "jsr:@std/dotenv@^0.225.5",
+38
deno.lock
··· 27 27 "npm:@atcute/client@^4.0.3": "4.0.3", 28 28 "npm:@atcute/identity-resolver@^1.1.3": "1.2.2_@atcute+identity@1.1.3", 29 29 "npm:@atcute/identity@^1.1.3": "1.1.3", 30 + "npm:@atcute/jetstream@^1.1.2": "1.1.2", 30 31 "npm:@atcute/lex-cli@^2.5.3": "2.5.3_@atcute+identity@1.1.3_@atcute+identity-resolver@1.2.2__@atcute+identity@1.1.3", 31 32 "npm:@atcute/lexicons@^1.1.1": "1.2.9", 32 33 "npm:@opentelemetry/api@^1.9.0": "1.9.0", ··· 214 215 "dependencies": [ 215 216 "@atcute/lexicons", 216 217 "@badrap/valita" 218 + ] 219 + }, 220 + "@atcute/jetstream@1.1.2": { 221 + "integrity": "sha512-u6p/h2xppp7LE6W/9xErAJ6frfN60s8adZuCKtfAaaBBiiYbb1CfpzN8Uc+2qtJZNorqGvuuDb5572Jmh7yHBQ==", 222 + "dependencies": [ 223 + "@atcute/lexicons", 224 + "@badrap/valita", 225 + "@mary-ext/event-iterator", 226 + "@mary-ext/simple-event-emitter", 227 + "partysocket", 228 + "type-fest", 229 + "yocto-queue" 217 230 ] 218 231 }, 219 232 "@atcute/lex-cli@2.5.3_@atcute+identity@1.1.3_@atcute+identity-resolver@1.2.2__@atcute+identity@1.1.3": { ··· 441 454 "os": ["win32"], 442 455 "cpu": ["x64"] 443 456 }, 457 + "@mary-ext/event-iterator@1.0.0": { 458 + "integrity": "sha512-l6gCPsWJ8aRCe/s7/oCmero70kDHgIK5m4uJvYgwEYTqVxoBOIXbKr5tnkLqUHEg6mNduB4IWvms3h70Hp9ADQ==", 459 + "dependencies": [ 460 + "yocto-queue" 461 + ] 462 + }, 463 + "@mary-ext/simple-event-emitter@1.0.0": { 464 + "integrity": "sha512-meA/zJZKIN1RVBNEYIbjufkUrW7/tRjHH60FjolpG1ixJKo76TB208qefQLNdOVDA7uIG0CGEDuhmMirtHKLAg==" 465 + }, 444 466 "@noble/secp256k1@3.0.0": { 445 467 "integrity": "sha512-NJBaR352KyIvj3t6sgT/+7xrNyF9Xk9QlLSIqUGVUYlsnDTAUqY8LOmwpcgEx4AMJXRITQ5XEVHD+mMaPfr3mg==" 446 468 }, ··· 515 537 "esm-env@1.2.2": { 516 538 "integrity": "sha512-Epxrv+Nr/CaL4ZcFGPJIYLWFom+YeV1DqMLHJoEd9SYRxNbaFruBwfEX/kkHUJf55j2+TUbmDcmuilbP1TmXHA==" 517 539 }, 540 + "event-target-polyfill@0.0.4": { 541 + "integrity": "sha512-Gs6RLjzlLRdT8X9ZipJdIZI/Y6/HhRLyq9RdDlCsnpxr/+Nn6bU2EFGuC94GjxqhM+Nmij2Vcq98yoHrU8uNFQ==" 542 + }, 543 + "partysocket@1.1.6": { 544 + "integrity": "sha512-LkEk8N9hMDDsDT0iDK0zuwUDFVrVMUXFXCeN3850Ng8wtjPqPBeJlwdeY6ROlJSEh3tPoTTasXoSBYH76y118w==", 545 + "dependencies": [ 546 + "event-target-polyfill" 547 + ] 548 + }, 518 549 "picocolors@1.1.1": { 519 550 "integrity": "sha512-xceH2snhtb5M9liqDsmEw56le376mTZkEX/jEb/RxNFyegNul7eNslCXP9FDj/Lcu0X8KEyMceP2ntpaHrDEVA==" 520 551 }, ··· 530 561 "prettier@3.8.1": { 531 562 "integrity": "sha512-UOnG6LftzbdaHZcKoPFtOcCKztrQ57WkHDeRD9t/PTQtmT0NHSeWWepj6pS0z/N7+08BHFDQVUrfmfMRcZwbMg==", 532 563 "bin": true 564 + }, 565 + "type-fest@4.41.0": { 566 + "integrity": "sha512-TeTSQ6H5YHvpqVwBRcnLDCBnDOHWYu7IvGbHT6N8AOymcr9PJGjc1GTtiWZTYg0NCgYwvnYWEkVChQAr9bjfwA==" 533 567 }, 534 568 "undici-types@6.21.0": { 535 569 "integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==" 536 570 }, 537 571 "unicode-segmenter@0.14.5": { 538 572 "integrity": "sha512-jHGmj2LUuqDcX3hqY12Ql+uhUTn8huuxNZGq7GvtF6bSybzH3aFgedYu/KTzQStEgt1Ra2F3HxadNXsNjb3m3g==" 573 + }, 574 + "yocto-queue@1.2.2": { 575 + "integrity": "sha512-4LCcse/U2MHZ63HAJVE+v71o7yOdIe4cZ70Wpf8D/IyjDKYQLV5GD46B+hSTjJsvV5PztjvHoU580EftxjDZFQ==" 539 576 } 540 577 }, 541 578 "workspace": { ··· 548 585 "npm:@atcute/client@^4.0.3", 549 586 "npm:@atcute/identity-resolver@^1.1.3", 550 587 "npm:@atcute/identity@^1.1.3", 588 + "npm:@atcute/jetstream@^1.1.2", 551 589 "npm:@atcute/lex-cli@^2.5.3", 552 590 "npm:@atcute/lexicons@^1.1.1", 553 591 "npm:@preact/signals@^2.5.0",
+2 -3
env.ts
··· 44 44 export const BOT_SERVICE = await getPDSfromIdentifier( 45 45 BOT_CREDENTIALS.identifier, 46 46 ); 47 - export const JETSTREAM_URL = (Deno.env.get("JETSTREAM_URL") || 48 - "wss://jetstream1.us-east.bsky.network/subscribe") + 49 - "?wantedCollections=place.stream.chat.message"; 47 + export const JETSTREAM_URL = Deno.env.get("JETSTREAM_URL") || 48 + "wss://jetstream1.us-east.bsky.network/subscribe"; 50 49 export const FOLLOWER_MODE = Deno.env.get("FOLLOWER_MODE") === "true" 51 50 ? true 52 51 : false;
+93
utils/eventHandler.ts
··· 1 + import { CommitEvent, JetstreamEvent } from "@atcute/jetstream"; 2 + import { botInstances } from "../main.ts"; 3 + import { didResolver } from "./didResolver.ts"; 4 + import { PlaceStreamChatMessage } from "./lexicons/index.ts"; 5 + 6 + export class EventHandler { 7 + handleEvent(event: JetstreamEvent) { 8 + if (event.kind === "commit") { 9 + const commit = event.commit; 10 + switch (commit.collection) { 11 + case "app.bsky.actor.profile": 12 + this.handleActorProfile(event); 13 + break; 14 + case "app.bsky.graph.follow": 15 + this.handleBskyFollow(event); 16 + break; 17 + case "place.stream.chat.message": 18 + this.handleChatMessage(event); 19 + break; 20 + case "place.stream.chat.profile": 21 + this.handleChatProfile(event); 22 + break; 23 + case "place.stream.live.teleport": 24 + this.handleTeleport(event); 25 + break; 26 + default: 27 + } 28 + } 29 + } 30 + 31 + private async handleBskyFollow(event: CommitEvent): Promise<void> { 32 + if (event.commit.operation === "create") { 33 + const record = event.commit.record; 34 + } 35 + } 36 + 37 + private async handleActorProfile(event: CommitEvent): Promise<void> { 38 + } 39 + 40 + private async handleChatMessage(event: CommitEvent): Promise<void> { 41 + if (event.commit.operation === "create") { 42 + const record = event.commit.record as PlaceStreamChatMessage.Main; 43 + if (botInstances.has(record.streamer)) { 44 + // Send to bot 45 + const streamplaceBot = botInstances.get(record.streamer); 46 + streamplaceBot!.processMessage(event); 47 + // Enrich and 48 + //const enrichedChatMessage = await this.enrichMessage(event); 49 + } 50 + } 51 + } 52 + 53 + private async handleChatProfile(event: CommitEvent): Promise<void> { 54 + } 55 + 56 + private async handleTeleport(event: CommitEvent): Promise<void> { 57 + } 58 + 59 + private async enrichMessage( 60 + commitEvent: CommitEvent, 61 + ): Promise<EnrichedChatMessage> { 62 + const profile = await didResolver.resolve(commitEvent.did); 63 + 64 + // TODO: Not handling deletes for now 65 + if (commitEvent.commit.operation === "delete") throw new Error(); 66 + 67 + return { 68 + service: "streamplace", 69 + author: { 70 + handle: profile.handle, 71 + did: commitEvent.did, 72 + pdsEndpoint: profile.pdsEndpoint, 73 + description: profile.description, 74 + displayName: profile.displayName, 75 + pronouns: profile.pronouns, 76 + website: profile.website, 77 + avatarUrl: profile.avatarUrl, 78 + bannerUrl: profile.bannerUrl, 79 + }, 80 + badges: [], //TODO 81 + chatProfile: { 82 + $type: "place.stream.chat.profile", 83 + color: profile.color, 84 + selfLabels: [], //TODO 85 + }, 86 + cid: commitEvent.commit.cid, //TODO 87 + deleted: false, 88 + indexedAt: "", //TODO. 89 + record: commitEvent.commit.record as Record<string, unknown>, //TODO 90 + uri: `at://${commitEvent.did}/place.stream.chat.message/${commitEvent.commit.rkey}`, 91 + }; 92 + } 93 + }
+15 -86
utils/globals.d.ts
··· 1 1 declare type Did = import("@atcute/lexicons").Did; 2 2 declare type Handle = import("@atcute/lexicons").Handle; 3 3 declare type Facet = import("@atcute/bluesky").AppBskyRichtextFacet.Main; 4 - 5 - // Raw chat message as received from stream.place WebSocket 6 - interface RawChatMessage { 7 - $type: "place.stream.chat.defs#messageView"; 8 - author: { 9 - did: Did; 10 - handle: Handle; 11 - }; 12 - chatProfile: { 13 - $type: "place.stream.chat.profile"; 14 - color: { 15 - red: number; 16 - green: number; 17 - blue: number; 18 - }; 19 - }; 20 - cid: string; 21 - indexedAt: string; 22 - record: { 23 - $type: "place.stream.chat.message"; 24 - createdAt: string; 25 - facets?: Array<{ 26 - features: unknown[]; 27 - index: { 28 - byteStart: number; 29 - byteEnd: number; 30 - }; 31 - }>; 32 - streamer: string; 33 - text: string; 34 - reply?: { 35 - root: unknown; // AT Proto strongRef 36 - parent: unknown; // AT Proto strongRef 37 - }; 38 - }; 39 - uri: string; 40 - } 41 - 42 - // Raw chat message as received from Jetstream WebSocket 43 - interface JetstreamMessage { 44 - did: Did; 45 - time_us: number; 46 - kind: "commit"; 47 - commit: { 48 - rev: string; 49 - operation: "create" | "delete" | "update"; 50 - collection: string; 51 - rkey: string; 52 - record?: { 53 - $type: "place.stream.chat.message"; 54 - createdAt: string; 55 - streamer: Did; 56 - text: string; 57 - facets?: Array<{ 58 - features: unknown[]; 59 - index: { 60 - byteStart: number; 61 - byteEnd: number; 62 - }; 63 - }>; 64 - reply?: { 65 - root: unknown; 66 - parent: unknown; 67 - }; 68 - }; 69 - cid: string; 70 - }; 71 - } 4 + declare type MessageView = 5 + import("./lexicons/index.ts").PlaceStreamChatDefs.MessageView; 72 6 73 7 // Handle + app.bsky.actor.profile + place.stream.chat.profile 74 8 // + pronouns from labeler for maximum info about chatter ··· 88 22 displayName?: string; 89 23 } 90 24 91 - // User chat role 92 - interface Role { 93 - text: string; 94 - icon?: string; // SVG string or icon identifier 95 - color?: string; // Hex color for label styling 96 - type?: "broadcaster" | "moderator" | "VIP" | "admin"; 97 - } 98 - 99 - // Enriched message for overlay display 100 - interface EnrichedChatMessage { 101 - id: string; 102 - text: string; 103 - author: UserProfile; 104 - streamer: Did; 105 - timestamp: Date; // Parsed createdAt 106 - roles?: Role[]; 107 - facets?: Facet[]; 108 - isReply?: boolean; // Simplified reply indicator 25 + interface EnrichedChatMessage extends MessageView { 26 + service: "streamplace" | "twitch"; 27 + author: { 28 + did: Did; 29 + handle: Handle; 30 + pdsEndpoint: string; 31 + description?: string; 32 + displayName?: string; 33 + pronouns?: string; 34 + website?: string; 35 + avatarUrl?: string; 36 + bannerUrl?: string; 37 + }; 109 38 }
+16 -9
utils/streamplaceBot.ts
··· 1 1 import { AuthLoginOptions } from "@atcute/client"; 2 + import { CommitEvent } from "@atcute/jetstream"; 2 3 import { AtprotoClient, listRecords } from "./atcuteUtils.ts"; 3 4 import { didResolver } from "./didResolver.ts"; 4 5 import { defaultCommands } from "./commands/defaultCommands.ts"; 6 + import { PlaceStreamChatMessage } from "./lexicons/index.ts"; 5 7 6 8 export interface CommandHandler { 7 9 ( 8 - message: JetstreamMessage, 10 + event: CommitEvent, 9 11 args: string[], 10 12 bot: StreamplaceBot, 11 13 ): Promise<void> | void; ··· 102 104 } 103 105 104 106 // Process an incoming chat message and respond if it's a command 105 - async processMessage(message: JetstreamMessage): Promise<void> { 107 + async processMessage(event: CommitEvent): Promise<void> { 106 108 if (!this.enabled) return; 109 + if (event.commit.operation === "delete") { 110 + throw new Error( 111 + "CommandHandler did not receive CreateCommit, nothing to handle.", 112 + ); 113 + } 107 114 108 115 const streamer = await this.getUserProfile(this.streamerDid); 109 - const record = message.commit.record!; 116 + const record = event.commit.record as PlaceStreamChatMessage.Main; 110 117 const text = record.text.trim(); 111 118 112 119 // Get or cache chatter information 113 - const chatter = await this.getUserProfile(message.did); 120 + const chatter = await this.getUserProfile(event.did); 114 121 115 122 // Auto-greet first-time chatters with shoutout if they have one 116 - if (!this.hasBeenGreeted.get(message.did)) { 117 - this.hasBeenGreeted.set(message.did, true); 123 + if (!this.hasBeenGreeted.get(event.did)) { 124 + this.hasBeenGreeted.set(event.did, true); 118 125 119 - if (this.shoutouts.get(message.did)) { 120 - const shoutout = this.shoutouts.get(message.did); 126 + if (this.shoutouts.get(event.did)) { 127 + const shoutout = this.shoutouts.get(event.did); 121 128 if (shoutout) { 122 129 await this.sendMessage(shoutout.text, shoutout.facets); 123 130 } ··· 139 146 `Executing command: ${commandName} in: ${streamer.handle} from user: ${chatter.handle}`, 140 147 ); 141 148 try { 142 - await handler(message, args, this); 149 + await handler(event, args, this); 143 150 } catch (error) { 144 151 console.error(`Error executing command ${commandName}:`, error); 145 152 }
+9 -64
utils/websocket.ts
··· 1 + import { JetstreamEvent } from "@atcute/jetstream"; 1 2 import { JETSTREAM_URL } from "../env.ts"; 2 - import { didResolver } from "./didResolver.ts"; 3 - import { botInstances } from "../main.ts"; 3 + import { EventHandler } from "./eventHandler.ts"; 4 4 5 5 // Client subscription message 6 6 interface SubscriptionMessage { ··· 13 13 private jetstreamWs: WebSocket | null = null; 14 14 private clients = new Map<WebSocket, Set<string>>(); // client -> subscribed streamers 15 15 private streamerClients = new Map<string, Set<WebSocket>>(); // streamer -> clients 16 + private eventHandler = new EventHandler; 16 17 17 18 constructor() {} 18 19 ··· 34 35 } 35 36 36 37 private connectToJetstream() { 37 - const jetstreamUrl = JETSTREAM_URL ?? 38 - "wss://jetstream1.us-east.bsky.network/subscribe?wantedCollections=place.stream.chat.message"; 39 - 38 + const jetstreamUrl = JETSTREAM_URL; 39 + 40 40 this.jetstreamWs = new WebSocket(jetstreamUrl); 41 41 42 42 this.jetstreamWs.onopen = () => { ··· 45 45 46 46 this.jetstreamWs.onmessage = (event) => { 47 47 try { 48 - const jetstreamMessage: JetstreamMessage = JSON.parse( 48 + const jetstreamEvent: JetstreamEvent = JSON.parse( 49 49 event.data, 50 50 ); 51 - 52 - // Only process create operations 53 - if ( 54 - jetstreamMessage.kind === "commit" && 55 - jetstreamMessage.commit?.operation === "create" && 56 - jetstreamMessage.commit.record && 57 - botInstances.has(jetstreamMessage.commit.record.streamer) 58 - ) { 59 - this.processJetstreamMessage(jetstreamMessage); 60 - } 51 + 52 + this.eventHandler.handleEvent(jetstreamEvent); 53 + 61 54 } catch (error) { 62 55 console.error("Error processing jetstream message:", error); 63 56 } ··· 134 127 } 135 128 } 136 129 this.clients.delete(client); 137 - } 138 - 139 - private async processJetstreamMessage(jetstreamMessage: JetstreamMessage) { 140 - // Extract the record data 141 - const record = jetstreamMessage.commit.record!; 142 - const streamer = record.streamer; 143 - const enrichedMessage = await this.enrichMessage( 144 - jetstreamMessage, 145 - record, 146 - ); 147 - 148 - // Let bot handle it 149 - const streamplaceBot = botInstances.get(streamer); 150 - streamplaceBot!.processMessage(jetstreamMessage); 151 - 152 - // Pass to subscribed websocket clients 153 - const subscribedClients = this.streamerClients.get(streamer); 154 - if (subscribedClients && subscribedClients.size > 0) { 155 - const messageJson = JSON.stringify({ 156 - type: "chat_message", 157 - data: enrichedMessage, 158 - }); 159 - 160 - // Send to all subscribed clients 161 - for (const client of subscribedClients) { 162 - if (client.readyState === WebSocket.OPEN) { 163 - client.send(messageJson); 164 - } 165 - } 166 - } 167 - } 168 - 169 - private async enrichMessage( 170 - jetstreamMessage: JetstreamMessage, 171 - record: NonNullable<JetstreamMessage["commit"]["record"]>, 172 - ): Promise<EnrichedChatMessage> { 173 - // Placeholder enrichment - just convert the format for now 174 - const profile = await didResolver.resolve(jetstreamMessage.did); 175 - 176 - return { 177 - id: jetstreamMessage.commit.cid, 178 - text: record.text, 179 - author: profile, 180 - timestamp: new Date(record.createdAt), 181 - facets: undefined, //record.facets || undefined, 182 - isReply: !!record.reply, 183 - streamer: record.streamer, 184 - }; 185 130 } 186 131 } 187 132