Monorepo for Aesthetic.Computer aesthetic.computer
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 1093 lines 35 kB view raw
1// Chat, 24.03.30.14.53 2// This file is a unikernel compatible chat server designed to eventually replace 3// `session-server/session.mjs` by adding both websocket and udp support. 4// But its first job is to be the chat server for AC. 5 6/* #region 🏁 TODO 7 - [🔵] Add full support for and launch a `chat-clock` backend instance. 8 - [] Proofread the code. 9 - [] Update any dependencies 10 - [] Make a dev deployment script and emacs tab. 11*/ 12 13// Management: 14// https://console.cloud.google.com/compute/instances?project=aesthetic-computer 15 16import { WebSocketServer, WebSocket } from "ws"; 17import { promises as fs, readFileSync } from "fs"; 18 19import fetch from "node-fetch"; 20import http from "http"; 21import https from "https"; 22import { URL } from "url"; 23import { StringDecoder } from "string_decoder"; 24 25import { filter } from "./filter.mjs"; // Profanity filtering. 26import { redact, unredact } from "./redact.mjs"; 27 28// import { createClient } from "redis"; // Redis 29import { MongoClient } from "mongodb"; // MongoDB 30// FCM (Firebase Cloud Messaging) 31 32import dotenv from "dotenv"; 33dotenv.config({ path: "chat.env" }); 34 35import { initializeApp, cert } from "firebase-admin/app"; // Firebase notifications. 36import { getMessaging } from "firebase-admin/messaging"; 37 38const MAX_MESSAGES = 500; // Maximum messages to keep in memory 39 40const instances = { 41 "chat-system": { 42 name: "chat-system", 43 allowedHost: "chat-system.aesthetic.computer", 44 userInfoEndpoint: "https://aesthetic.us.auth0.com/userinfo", 45 devPort: 8083, 46 }, 47 "chat-sotce": { 48 name: "chat-sotce", 49 allowedHost: "chat.sotce.net", 50 userInfoEndpoint: "https://sotce.us.auth0.com/userinfo", 51 devPort: 8084, 52 }, 53 "chat-clock": { 54 name: "chat-clock", 55 allowedHost: "chat-clock.aesthetic.computer", 56 userInfoEndpoint: "https://aesthetic.us.auth0.com/userinfo", 57 devPort: 8085, 58 }, 59}; 60 61console.log("🔵 Env:", process.env); 62 63process.on("uncaughtException", (error) => { 64 console.error("🔴 Uncaught exception:", error); 65}); 66 67process.on("unhandledRejection", (reason, promise) => { 68 console.error("🔴 Unhandled rejection at:", promise, "reason:", reason); 69}); 70 71const instance = instances[process.argv[2] || process.env.CHAT_INSTANCE]; 72 73if (!instance) { 74 console.log("🔴 No instance data found from argument:", process.argv[2]); 75 process.exit(1); 76} 77 78const dev = process.env.NODE_ENV === "development"; 79const filterDebug = process.env.FILTER_DEBUG === "true"; 80 81console.log( 82 `\n🌟 Starting the Aesthetic Computer Chat Server for: ${instance.name} 🌟\n`, 83); 84 85if (dev) { 86 console.log("🟡 Development mode."); 87} else { 88 console.log("🟢 Production mode."); 89} 90 91const allowedHost = instance.allowedHost; 92 93const subsToHandles = {}; // Cached list of handles. 94const subsToSubscribers = {}; // Cached list of active subscribers for this 95// instance if supported. 96const authorizedConnections = {}; 97 98let serviceAccount; 99try { 100 // console.log( 101 // "🔥 Loading Firebase configuration from file: ./gcp-service-key.json", 102 // ); 103 const data = await fs.readFile("./gcp-firebase-service-key.json", "utf8"); 104 serviceAccount = JSON.parse(data); 105} catch (error) { 106 console.error("Error loading service account:", error); 107 // Handle the error as needed 108} 109 110// console.log("🔥 Initializing Firebase App from:", serviceAccount); 111 112initializeApp( 113 { credential: cert(serviceAccount) }, //, 114 // "aesthetic" + ~~performance.now(), 115); 116 117// console.log("🔥 Firebase App initialized..."); 118 119let server, 120 agent, 121 connections = {}, // All active socket connections. 122 connectionId = 0; 123 124const MONGODB_CONNECTION_STRING = process.env.MONGODB_CONNECTION_STRING; 125const MONGODB_NAME = process.env.MONGODB_NAME; 126 127let client, db; 128 129// 🛑 The main HTTP route. 130const request = async (req, res) => { 131 console.log("🕸️ Web request:", req.url); 132 133 let url; 134 try { 135 url = new URL(req.url, `http://${req.headers.host}`); 136 } catch (err) { 137 console.error("🔴 Request URL Error:", err); 138 res.writeHead(404, { "Content-Type": "application/json" }); 139 res.end(JSON.stringify({ status: "error", message: "Not Found" })); 140 return; 141 } 142 143 const pathname = url.pathname; 144 const method = req.method.toUpperCase(); 145 146 // 🪵 Logs and Root HTTP Request 147 if (method === "POST" && pathname === "/log") { 148 // Handle POST request to /log 🪵 149 let body = ""; 150 const decoder = new StringDecoder("utf-8"); 151 152 req.on("data", (chunk) => { 153 body += decoder.write(chunk); 154 }); 155 156 req.on("end", async () => { 157 body += decoder.end(); 158 const authHeader = req.headers["authorization"]; 159 const token = authHeader && authHeader.split(" ")[1]; 160 161 if (token !== process.env.LOGGER_KEY) { 162 res.writeHead(403, { "Content-Type": "application/json" }); 163 res.end(JSON.stringify({ status: "error", message: "😇 Forbidden!" })); 164 return; 165 } 166 167 try { 168 const parsed = JSON.parse(body); 169 console.log("🪵 Received log data:", parsed); 170 171 // 🪧🪵 Respond to log. 172 173 messages.push(parsed); 174 175 if (messages.length > MAX_MESSAGES) messages.shift(); 176 177 // ⚠️ 178 // Look through the message buffer and update any handles 179 // that could be changed by this log. 180 // In otherr words... parse its "action". 181 182 if (parsed.action) { 183 console.log("🪵 Log action:", parsed.action, "value:", parsed.value); 184 let [object, behavior] = parsed.action.split(":"); 185 if (!behavior) { 186 behavior = object; 187 object = null; // If no separator then assume behavior tag only. 188 } 189 190 console.log( 191 "⛈️ Action ~ 🤖 Object:", 192 object, 193 "🏃 Behavior:", 194 behavior, 195 ); 196 197 if ( 198 object === "chat-system" && 199 (behavior === "mute" || behavior === "unmute") 200 ) { 201 const user = parsed.users[0]; 202 if (behavior === "mute") { 203 // console.log("⚠️ TODO: NEED TO MUTE SERVER MESSAGES FOR:", user); 204 messages.forEach((message) => { 205 if (message.sub === user) redact(message); 206 }); 207 } else if (behavior === "unmute") { 208 // console.log("⚠️ TODO: NEED TO UNMUTE SERVER MESSAGES FOR:", user); 209 messages.forEach((message) => { 210 if (message.sub === user) unredact(message); 211 }); 212 } 213 // Or get last 100 messages and redo them... 214 everyone(pack(parsed.action, { user })); 215 } 216 217 if (object === "handle") { 218 // Update handles to subs cache. 219 subsToHandles[parsed.users[0]] = parsed.value; 220 221 if (behavior === "update" || behavior === "strip") { 222 // const from = 223 // behavior === "update" ? "@" + parsed.value : parsed.value; 224 let from; 225 226 if (behavior === "update") { 227 from = "@" + parsed.value; 228 } else if (behavior === "strip") { 229 from = "nohandle"; 230 } 231 232 // const muted = await isMuted(msg.user); 233 234 // Update messages with new handles. 235 messages.forEach((message) => { 236 if (message.sub === parsed.users[0]) { 237 // if (muted) redact(message); 238 message.from = from; 239 } 240 }); 241 242 everyone( 243 pack(parsed.action, { user: parsed.users[0], handle: from }), 244 ); 245 } 246 } 247 } 248 249 everyone(pack(`message`, parsed)); // Send to everyone. 250 251 if (instance.name === "chat-system") notify("log 🪵", parsed.text); // Push notification. 252 253 res.writeHead(200, { "Content-Type": "application/json" }); 254 res.end( 255 JSON.stringify({ status: "success", message: "🪵 Log received" }), 256 ); 257 } catch (error) { 258 res.writeHead(400, { "Content-Type": "application/json" }); 259 res.end( 260 JSON.stringify({ status: "error", message: "🪵 Malformed log JSON" }), 261 ); 262 } 263 }); 264 } else if (method === "GET" && pathname === "/") { 265 // Handle GET request to / 266 const domain = req.headers.host; // Get the domain from the request 267 res.writeHead(200, { "Content-Type": "text/html; charset=utf-8" }); 268 if (instance.name === "chat-sotce") { 269 res.end(`🪷 Sotce Net\nHost: <mark>${domain}</mark>`); 270 } else { 271 res.end(`😱 Aesthetic Computer\nHost: <mark>${domain}</mark>`); 272 } 273 } else { 274 // Catch-all response for other requests 275 res.writeHead(404, { "Content-Type": "application/json" }); 276 res.end(JSON.stringify({ status: "error", message: "Not Found" })); 277 } 278}; 279 280if (dev) { 281 const key = readFileSync("ssl/localhost-key.pem"); 282 const cert = readFileSync("ssl/localhost.pem"); 283 server = https.createServer({ key, cert }, request); 284 agent = new https.Agent({ key, cert, rejectUnauthorized: false }); 285} else { 286 server = http.createServer(request); 287} 288 289if (dev) { 290 console.log("🟡 Waiting for local AC backend..."); 291 async function waitForBackend() { 292 while (true) { 293 try { 294 const response = await fetch("https://localhost:8888", { agent }); 295 if (response.status === 200) { 296 console.log("✅ Backend is ready!"); 297 break; 298 } 299 } catch (error) { 300 console.log("🟠 Backend not yet available..." /*, error*/); 301 } 302 await new Promise((resolve) => setTimeout(resolve, 1000)); // retry every 1 second 303 } 304 } 305 await waitForBackend(); 306} 307 308await makeMongoConnection(); 309const messages = []; 310await getLast100MessagesfromMongo(); 311// Retrieve the last 100 messages and then buffer in the new ones. 312 313const port = dev ? instance.devPort : 80; 314 315server.listen(port, "0.0.0.0", () => { 316 // console.log( 317 // `--> Web server running at ${dev ? "https" : "http"}://0.0.0.0:${port} 🕷️`, 318 // ); 319 startChatServer(); 320}); 321 322let everyone; 323 324async function startChatServer() { 325 // #region 🏬 Redis 326 // *** Start up two `redis` clients. (One for subscribing, and for publishing) 327 // const sub = !dev 328 // ? createClient({ url: redisConnectionString }) 329 // : createClient(); 330 // const pub = !dev 331 // ? createClient({ url: redisConnectionString }) 332 // : createClient(); 333 334 //let presubscribed = false; 335 336 //async function subscribe() { 337 // if (presubscribed) return; 338 // presubscribed = true; 339 // sub 340 // .subscribe("log", (message) => { 341 // console.log("🪵️ Received log from redis:", message); 342 // const parsed = JSON.parse(message); 343 // messages.push(parsed); 344 // if (messages.length > 100) messages.shift(); 345 // everyone(pack(`message`, parsed)); 346 // notify("system 💬", parsed.text); // Push notification. 347 // }) 348 // .then(() => { 349 // console.log("🪵 Subscribed to `log` updates from redis."); 350 // }) 351 // .catch((err) => { 352 // console.error("🪵 Could not subscribe to `log` updates.", err); 353 // presubscribed = false; 354 // }); 355 //} 356 357 // const createRedisClient = (role) => { 358 // const client = createClient({ 359 // url: redisConnectionString, 360 // socket: { 361 // reconnectStrategy: (retries) => { 362 // console.log(`🔄 ${role} Redis client reconnect attempt: ${retries}`); 363 // return Math.min(retries * 50, 3000); 364 // }, 365 // }, 366 // }); 367 368 // client.on("connect", async () => { 369 // console.log(`🟢 \`${role}\` Redis client connected successfully.`); 370 // // if (role === "subscriber") subscribe(); 371 // }); 372 373 // client.on("error", (err) => { 374 // console.log(`🔴 \`${role}\` Redis client connection failure!`, err); 375 // // if (role === "subscriber") presubscribed = false; 376 // }); 377 378 // return client; 379 // }; 380 381 // const sub = createRedisClient("subscriber"); 382 // const pub = createRedisClient("publisher"); 383 384 //try { 385 // await sub.connect(); 386 // subscribe(); 387 // await pub.connect(); 388 //} catch (err) { 389 // console.error("🔴 Could not connect to `redis` instance.", err); 390 //} 391 // #endregion 392 393 const wss = new WebSocketServer({ server }); 394 395 wss.on("connection", (ws, req) => { 396 if (!dev && req.headers.host !== allowedHost) { 397 ws.close(1008, "Policy violation"); // Close the WebSocket connection 398 return; 399 } 400 401 // Send a message to all other clients except this one. 402 function others(string) { 403 wss.clients.forEach((c) => { 404 if (c !== ws && c?.readyState === WebSocket.OPEN) c.send(string); 405 }); 406 } 407 408 connections[connectionId] = ws; 409 connectionId += 1; 410 const id = connectionId; 411 const ip = req.socket.remoteAddress || "localhost"; // beautify ip 412 ws.isAlive = true; // For checking persistence between ping-pong messages. 413 414 ws.on("pong", () => { 415 ws.isAlive = true; 416 }); // Receive a pong and stay alive! 417 418 console.log( 419 "🔌 New connection:", 420 `${id}:${ip}`, 421 "Online:", 422 wss.clients.size, 423 "🫂", 424 ); 425 426 ws.on("message", async (data, more) => { 427 let msg; 428 try { 429 msg = JSON.parse(data.toString()); 430 } catch (err) { 431 console.log( 432 "🔴 Failed to parse JSON message...", 433 data, 434 data.length, 435 more, 436 ); 437 return; 438 } 439 440 msg.id = id; 441 442 if (msg.type === "logout") { 443 console.log("🏃‍♀️ User logged out..."); 444 delete authorizedConnections[id]; 445 } else if (msg.type === "chat:message") { 446 // 💬 Received an incoming chat message. 447 console.log( 448 "💬 Received:", 449 msg.type, 450 "from:", 451 msg.content.sub, 452 "of:", 453 msg.content.text, 454 ); 455 456 // 🔇 Mute list check. 457 if (await isMuted(msg.content.sub)) { 458 ws.send( 459 pack("muted", { 460 message: `Your user has been muted and cannot send chat messages.`, 461 }), 462 ); 463 return; // Ignore long messages. 464 } 465 466 // TODO: ❤️‍🔥 Add rate-limiting / maybe quit here if needed. 467 // 🧶 Length limiting. 468 const len = 128; 469 if (msg.content.text.length > len) { 470 ws.send( 471 pack("too-long", { 472 message: `Your message was too long, please limit it to ${len} characters.`, 473 }), 474 ); 475 return; // Silently ignore long messages. 476 } 477 478 // 🔐 1. Authorization 479 // 💡️ These are cached in a "preAuthorized" section. 480 481 let authorized; 482 483 if ( 484 authorizedConnections[id /*msg.content.sub*/]?.token === 485 msg.content.token 486 ) { 487 authorized = authorizedConnections[id /*msg.content.sub*/].user; 488 console.log("🟢 Preauthorization found."); 489 } else { 490 console.log("🟡 Authorizing..."); 491 authorized = await authorize(msg.content.token); 492 if (authorized) 493 authorizedConnections[id /*authorized.sub*/] = { 494 token: msg.content.token, 495 user: authorized, 496 }; 497 } 498 499 // Check to see that the user has a registered handle here 500 // otherwise return unauthorized. 501 502 // TODO: Make sure this works across sotce-net. 503 let handle, subscribed; 504 505 if (authorized) { 506 console.log("🟢 🔐 Handle authorized:", authorized); 507 console.log("🚵 Finding handle for:", authorized.sub); 508 509 // Find handle based on email. 510 handle = await getHandleFromSub(authorized.sub); 511 // if (bareHandle) handle = "@" + bareHandle; 512 // console.log("️🐻 Bare handle is:", bareHandle); 513 console.log("🚦 Checking subscription status for:", instance.name); 514 if (instance.name === "chat-sotce") { 515 // Also ensure that they are subscribed if the instance.name is "chat-sotce". 516 // Run through the '/subscribed' endpoint from `sotce-net` and cached in `subsToSubscribers`. 517 if (!subsToSubscribers[authorized.sub]) { 518 const host = dev ? "https://localhost:8888" : "https://sotce.net"; 519 520 const options = { 521 method: "POST", 522 body: JSON.stringify({ retrieve: "subscription" }), 523 headers: { 524 Authorization: "Bearer " + msg.content.token, 525 "Content-Type": "application/json", 526 }, 527 }; 528 529 if (dev) options.agent = agent; 530 531 const response = await fetch( 532 `${host}/sotce-net/subscribed`, 533 options, 534 ); 535 if (response.status === 200) { 536 const responseBody = await response.json(); 537 if (responseBody.subscribed) { 538 console.log("️📰 Subscribed:", responseBody); 539 subscribed = true; 540 } else { 541 console.error("🗞️ Unsubscribed:", responseBody); 542 subscribed = false; 543 } 544 } else { 545 console.error("🗞️ Unsubscribed:", response); 546 subscribed = false; 547 } 548 subsToSubscribers[authorized.sub] = subscribed; // Cache the subscription call. 549 } else { 550 subscribed = subsToSubscribers[authorized.sub]; 551 } 552 } else { 553 subscribed = true; 554 } 555 } 556 557 if (!authorized || !handle || !subscribed) { 558 console.error( 559 "🔴 Unauthorized:", 560 msg.content, 561 "Authorized:", 562 authorized, 563 "Handle:", 564 handle, 565 "Subscribed:", 566 subscribed, 567 ); 568 569 ws.send( 570 pack( 571 "unauthorized", 572 { 573 message: 574 "Your message was unauthorized, please login again and/or subscribe.", 575 }, 576 id, 577 ), 578 ); 579 return; 580 } 581 582 // 📚 2. Persistence 583 // TODO: Add this chat to MongoDB, using the domain. (Only allow requests from the domain.) 584 try { 585 // 🫅 LLM Language filtering. (Could fit this in one day if it's fast enough. 24.10.31.04.32) 586 // Call out to `ask` to filter for content. 587 /* 588 let filteredText = ""; 589 let messages = []; 590 messages.push({ 591 by: "system", 592 text: "respond with the exact user text, but filter any profanities or inappropriate language with underscores - and pay attention to cases where profanities may be separated by punctuation or obscured", 593 }); 594 messages.push({ by: "user", text: msg.content.text }); 595 596 const host = dev 597 ? `https://localhost:8888` // Point to the main netlify stack. 598 : "https://ai.aesthetic.computer"; 599 600 if (dev) process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; 601 602 try { 603 const response = await fetch(`${host}/api/ask`, { 604 method: "POST", 605 headers: { "Content-Type": "application/json" }, 606 body: JSON.stringify({ messages, hint: "filter" }), 607 }); 608 609 if (!response.ok) { 610 throw new Error(`Failed to reply: ${response.status}`); 611 } 612 613 const readableStream = response.body; 614 const decoder = new TextDecoder(); 615 const reader = readableStream.getReader(); 616 617 while (true) { 618 const { done, value } = await reader.read(); 619 if (done) { 620 // convo.controller = null; 621 // if (!convo.forgetful) { 622 // convo.messages.push({ 623 // by: "system", 624 // text: filteredText, 625 // }); 626 // } 627 break; 628 } 629 const got = decoder.decode(value, { stream: true }); 630 filteredText += got; 631 console.log("🎣 Filtering:", filteredText); 632 } 633 } catch (error) { 634 console.log("Error:", error); 635 // reportFailure(error); 636 } 637 */ 638 639 const message = msg.content; 640 const fromSub = message.sub; 641 let filteredText; 642 const userIsMuted = await isMuted(fromSub); 643 644 if (userIsMuted) { 645 redact(message); 646 filteredText = message.text; 647 } else { 648 filteredText = filter(message.text, filterDebug); 649 } 650 651 // Determine 'when' by fetching from server clock or falling back to local time 652 let when; 653 const clockHost = dev 654 ? "https://localhost:8888" 655 : "https://aesthetic.computer"; 656 const clockUrl = `${clockHost}/api/clock`; 657 658 try { 659 const clockResponse = await fetch(clockUrl); 660 if (clockResponse.ok) { 661 const serverTimeISO = await clockResponse.text(); 662 when = new Date(serverTimeISO); 663 console.log("⏲️ Using server time..."); 664 } else { 665 console.error( 666 `🔴 Chat: Failed to fetch server time from ${clockUrl}: ${clockResponse.status} ${await clockResponse.text()}. Using local time.`, 667 ); 668 when = new Date(); // Fallback 669 } 670 } catch (error) { 671 console.error( 672 `🔴 Chat: Error fetching server time from ${clockUrl}: ${error}. Using local time.`, 673 ); 674 when = new Date(); // Fallback 675 } 676 677 // Don't store any actual messages to the MongoDB in development. 678 // Also don't store messages from muted users. 679 if (!dev && !userIsMuted) { 680 console.log("🟡 Storing message..."); 681 const dbmsg = { 682 user: message.sub, 683 text: message.text, // Store unfiltered text in the database. 684 when, 685 font: message.font || "font_1", // 🔤 Store user's font preference 686 }; 687 688 const collection = db.collection(instance.name); // Use the chat instance name for storing messages. 689 await collection.createIndex({ when: 1 }); // Index for `when`. 690 await collection.insertOne(dbmsg); // Store the chat message 691 692 console.log("🟢 Message stored:", dbmsg); 693 } else { 694 console.log("🟡 Message not stored:", userIsMuted ? "User muted" : "Development"); 695 } 696 697 // Retrieve handle either from cache or MongoDB 698 const handle = await getHandleFromSub(fromSub); 699 700 const out = { 701 from: handle, 702 text: filteredText, 703 redactedText: message.redactedText, 704 when, 705 sub: fromSub, // If the chat is of a specific tenant like 706 // `chat-sotce` then the subs will be local 707 // to that tenant and not prefixed. 24.10.31.21.35 708 font: message.font || "font_1", // 🔤 Include font in broadcast 709 }; 710 711 // Check if this is a duplicate of the last message from the same user 712 const lastMsg = messages[messages.length - 1]; 713 if (lastMsg && 714 lastMsg.sub === out.sub && 715 lastMsg.text === out.text && 716 !lastMsg.count) { 717 // Increment or initialize the count 718 lastMsg.count = (lastMsg.count || 1) + 1; 719 // Don't add a new message, just update the existing one 720 everyone(pack(`message:update`, { index: messages.length - 1, count: lastMsg.count })); 721 } else { 722 messages.push(out); 723 if (messages.length > MAX_MESSAGES) messages.shift(); 724 everyone(pack(`message`, out)); // Send to clients. 725 } 726 727 // Don't send push notifications for muted users 728 if (!userIsMuted) { 729 if (instance.name === "chat-system") 730 notify(handle + " 💬", filteredText); // Push notification. 731 732 if (instance.name === "chat-clock") { 733 const getClockEmoji = (date) => { 734 let hour = date.getHours(); // 0-23 735 const minutes = date.getMinutes(); // 0-59 736 737 // Convert to 12-hour format for emoji indexing (1-12) 738 let hour12 = hour % 12; 739 if (hour12 === 0) { 740 hour12 = 12; // Midnight or noon is 12 741 } 742 743 let emojiCode; 744 if (minutes < 30) { 745 // Use o'clock emoji for minutes 0-29 746 // 🕐 (0x1F550) to 🕛 (0x1F55B) 747 emojiCode = 0x1f550 + hour12 - 1; 748 } else { 749 // Use half-past emoji for minutes 30-59 750 // 🕜 (0x1F55C) to 🕧 (0x1F567) 751 emojiCode = 0x1f55c + hour12 - 1; 752 } 753 return String.fromCodePoint(emojiCode); 754 }; 755 756 // 'when' is the timestamp of the current message, defined a few lines above 757 const clockEmoji = getClockEmoji(when); 758 // console.log("The clock emoji is...!", clockEmoji); 759 notify(handle + " " + clockEmoji, filteredText); // Push notification. 760 } 761 } 762 763 // 3. Send a confirmation back to the user. 764 // No need, as this comes through redis... 765 } catch (err) { 766 console.error("🔴 Message could not be stored:", err); 767 // TODO: Show cancellation of some kind to the user. 768 } 769 } 770 }); 771 772 const interval = setInterval(function ping() { 773 wss.clients.forEach((client) => { 774 if (client.isAlive === false) { 775 return client.terminate(); 776 } 777 client.isAlive = false; 778 client.ping(); 779 }); 780 }, 15000); // 15 second pings from server before termination. 781 782 ws.on("close", () => { 783 // Delete from the connection index. 784 clearInterval(interval); 785 delete connections[id]; 786 delete authorizedConnections[id]; 787 788 console.log( 789 "🚪 Closed connection:", 790 id, 791 "Online:", 792 wss.clients.size, 793 "🫂", 794 ); 795 796 everyone(pack("left", { chatters: wss.clients.size }, id)); 797 }); 798 799 // Send a connect message to the new client. 800 // console.log("🧡 Sending connected message...", id); 801 ws.send( 802 pack( 803 "connected", 804 { 805 message: `Joined \`${instance.name}\` • 🧑‍🤝‍🧑 ${wss.clients.size}`, 806 chatters: wss.clients.size, 807 messages, 808 id, 809 }, 810 id, 811 ), 812 ); 813 814 // Send a join message to everyone else. 815 others( 816 pack( 817 "joined", 818 { 819 text: `${id} has joined. Connections open: ${wss.clients.size}`, 820 chatters: wss.clients.size, 821 }, 822 id, 823 ), 824 ); 825 }); 826 827 // console.log( 828 // `--> Socket server running at ${ 829 // dev ? "wss" : "ws" 830 // }://0.0.0.0:${port} 🧦 \n`, 831 // ); 832 833 // Sends a message to all connected clients. 834 everyone = (string) => { 835 wss.clients.forEach((c) => { 836 if (c?.readyState === WebSocket.OPEN) c.send(string); 837 }); 838 }; 839} 840 841// ⚙️ Utilities 842 843// Pack messages into a simple object protocol of `{type, content}`. 844function pack(type, content, id) { 845 if (typeof content === "object") content = JSON.stringify(content); 846 return JSON.stringify({ type, content, id }); 847} 848 849// Authorize a user token against auth0. 850async function authorize(authorization) { 851 try { 852 const response = await fetch(instance.userInfoEndpoint, { 853 headers: { 854 Authorization: "Bearer " + authorization, 855 "Content-Type": "application/json", 856 }, 857 }); 858 859 if (response.status === 200) { 860 return response.json(); 861 } else { 862 // console.log(response.text()); 863 throw new Error("🔴 Unauthorized;", response.text()); 864 } 865 } catch (error) { 866 console.error("🔴 Authorization error:", error); 867 return undefined; 868 } 869} 870 871// #region 🗺️ MongoDB 872 873async function makeMongoConnection() { 874 console.log("🟡 Connecting to MongoDB..."); 875 client = new MongoClient(MONGODB_CONNECTION_STRING); 876 await client.connect(); 877 db = client.db(MONGODB_NAME); 878 // return { client, db }; 879 console.log("🟢 Connected!"); 880} 881 882async function getLast100MessagesfromMongo() { 883 console.log(`🟡 Retrieving last ${MAX_MESSAGES} combined messages...`); 884 const chatCollection = db.collection(instance.name); 885 let combinedMessages; 886 887 if (instance.name === "chat-sotce") { 888 // 🪷 Don't include AC logs but do reverse for chronological order. 889 combinedMessages = (await chatCollection 890 .find({}) 891 .sort({ when: -1 }) 892 .limit(MAX_MESSAGES) 893 .toArray()).reverse(); 894 } else if (instance.name !== "chat-system") { 895 // 🕰️ Don't include logs. 896 combinedMessages = (await chatCollection 897 .find({}) 898 .sort({ when: -1 }) 899 .limit(MAX_MESSAGES) 900 .toArray()).reverse(); 901 } else { 902 // chat-system 903 // todo; take into account chat-clock 904 // 🟪 Assume an AC chat instance with logs rolled in. 905 combinedMessages = ( 906 await chatCollection 907 .aggregate([ 908 { 909 $unionWith: { 910 coll: "logs", 911 pipeline: [{ $match: {} }], 912 }, 913 }, 914 { $sort: { when: -1 } }, 915 { $limit: MAX_MESSAGES }, 916 ]) 917 .toArray() 918 ).reverse(); 919 } 920 921 // Basic mutes check (`chat-system` only). 922 if (instance.name === "chat-system") { 923 combinedMessages.forEach(async (msg) => { 924 if (await isMuted(msg.user)) redact(msg); 925 }); 926 } 927 928 messages.length = 0; // Clear out all messages. 929 930 for (const message of combinedMessages) { 931 let from; 932 933 if (message.user) { 934 // console.log("🗨️ User message:", message); 935 // console.log(message); 936 937 const fromSub = message.user; 938 from = await getHandleFromSub(fromSub); 939 } else { 940 // 'logs' has a 'users' array but never a 'user' field. 941 // console.log("🪵 System log:", message); 942 from = message.from || "deleted"; 943 } 944 945 console.log(`🔵 ${from}: "${message.text}" at ${message.when}`); 946 947 messages.push({ 948 from, 949 text: filter(message.text, filterDebug) || "message forgotten", 950 redactedText: message.redactedText, 951 when: message.when, 952 font: message.font || "font_1", // 🔤 Include font from DB (default for old messages) 953 }); 954 } 955} 956 957async function getHandleFromSub(fromSub) { 958 let handle; 959 // if (await isMuted(fromSub)) return "nohandle"; // Catch this on rendering. 960 961 // console.log("🟡 Looking up user record for...", fromSub); 962 if (!subsToHandles[fromSub]) { 963 try { 964 let prefix = ""; 965 if (instance.name === "chat-sotce") prefix = "sotce-"; 966 967 let host; 968 if (dev) { 969 host = "https://localhost:8888"; 970 } else { 971 if (instance.name === "chat-sotce") { 972 host = "https://sotce.net"; 973 } else { 974 host = "https://aesthetic.computer"; 975 } 976 } 977 978 // console.log("Host:", host); 979 const url = `${host}/handle?for=${prefix}${fromSub}`; 980 // console.log("Fetching from url:", url); 981 982 const options = {}; 983 if (dev) options.agent = agent; 984 const response = await fetch(url, options); 985 if (response.status === 200) { 986 const data = await response.json(); 987 handle = data.handle; 988 // console.log("🫅 Handle found:", handle); 989 } else { 990 // console.warn("❌ 🫅 Handle not found:", await response.json()); 991 } 992 } catch (error) { 993 // console.error("❌ 🫅 Handle retrieval error:", error); 994 } 995 996 // console.log("🟢 Got handle from network:", handle); 997 subsToHandles[fromSub] = handle; 998 } else { 999 handle = subsToHandles[fromSub]; 1000 // console.log("🟢 Got handle from cache:", handle); 1001 } 1002 1003 return "@" + handle; 1004} 1005 1006// #endregion 1007 1008function notify(title, body) { 1009 if (!dev) { 1010 // ☎️ Send a notification 1011 console.log("🟡 Sending FCM notification...", performance.now()); 1012 // const topicName = "industry-tech"; 1013 1014 getMessaging() 1015 .send({ 1016 notification: { title, body }, 1017 // android: { 1018 // notification: { 1019 // imageUrl: "https://aesthetic.computer/api/logo.png", 1020 // }, 1021 apns: { 1022 payload: { 1023 aps: { 1024 "mutable-content": 1, 1025 "interruption-level": "time-sensitive", // Marks as time-sensitive 1026 priority: 10, // Highest priority 1027 "content-available": 1, // Tells iOS to wake the app 1028 }, 1029 }, 1030 headers: { 1031 "apns-priority": "10", // Immediate delivery priority 1032 "apns-push-type": "alert", // Explicit push type 1033 "apns-expiration": "0", // Message won't be stored by APNs 1034 }, 1035 // fcm_options: { 1036 // image: "https://aesthetic.computer/api/logo.png", 1037 // }, 1038 }, 1039 webpush: { 1040 headers: { 1041 Urgency: "high", 1042 TTL: "0", 1043 image: "https://aesthetic.computer/api/logo.png", 1044 }, 1045 fcmOptions: { 1046 analyticsLabel: "immediate-delivery", 1047 }, 1048 }, 1049 topic: "mood", // <- TODO: Eventually replace this to wider range topic 1050 // that also must be set inside the iOS client. 1051 // topic: "chat-system", 1052 data: { piece: "chat" }, // This should send a tappable link to the chat piece. 1053 }) 1054 .then((response) => { 1055 console.log( 1056 "☎️ Successfully sent notification:", 1057 response, 1058 performance.now(), 1059 ); 1060 }) 1061 .catch((error) => { 1062 console.log( 1063 "📵 Error sending notification:", 1064 error, 1065 performance.now(), 1066 ); 1067 }); 1068 } 1069} 1070 1071// Check the database to see if a given user sub has been muted for this instance. 1072async function isMuted(sub) { 1073 if (!sub) return false; 1074 try { 1075 const mutesCollection = db.collection(instance.name + "-mutes"); 1076 const mute = await mutesCollection.findOne({ user: sub }); 1077 // console.log("Sub:", sub, "Is muted?", !!mute); 1078 return !!mute; 1079 } catch (error) { 1080 return false; // Collection doesn't exist or another error occurred 1081 } 1082} 1083 1084// 🪦 Graveyard 1085// 🏬 Publish to redis. 1086// pub 1087// .publish("chat-system", JSON.stringify(update)) 1088// .then((result) => { 1089// console.log("💬 Message succesfully published:", result); 1090// }) 1091// .catch((error) => { 1092// console.log("🙅‍♀️ Error publishing message:", error); 1093// });