this repo has no description
1const COLLECTION = "com.jakelazaroff.webrtc.session";
2const JETSTREAM = "wss://jetstream1.us-east.bsky.network/subscribe";
3const MAX_AGE_MS = 60_000;
4
5/**
6 * @param {import("./atsw.js").OAuthSession} session
7 * @param {string} targetDid - recipient's DID
8 * @param {"offer"|"answer"} type
9 * @param {string} sdp
10 * @returns {Promise<string>} the AT URI of the created record
11 */
12export async function publishSignal(session, targetDid, type, sdp) {
13 const res = await fetch(`${session.pds}/xrpc/com.atproto.repo.createRecord`, {
14 method: "POST",
15 headers: { "content-type": "application/json", "x-atsw-did": session.did },
16 body: JSON.stringify({
17 repo: session.did,
18 collection: COLLECTION,
19 record: {
20 $type: COLLECTION,
21 target: targetDid,
22 type,
23 sdp,
24 createdAt: new Date().toISOString(),
25 },
26 }),
27 });
28 const data = await res.json();
29 return data.uri;
30}
31
32/**
33 * @param {import("./atsw.js").OAuthSession} session
34 * @param {string} rkey
35 */
36export async function deleteRecord(session, rkey) {
37 await fetch(`${session.pds}/xrpc/com.atproto.repo.deleteRecord`, {
38 method: "POST",
39 headers: { "content-type": "application/json", "x-atsw-did": session.did },
40 body: JSON.stringify({ repo: session.did, collection: COLLECTION, rkey }),
41 });
42}
43
44/**
45 * Delete all expired session records for the given repo.
46 * @param {import("./atsw.js").OAuthSession} session
47 */
48export async function deleteExpiredRecords(session) {
49 const res = await fetch(
50 `${session.pds}/xrpc/com.atproto.repo.listRecords?repo=${session.did}&collection=${COLLECTION}&limit=100`,
51 { headers: { "x-atsw-did": session.did } },
52 );
53 const data = await res.json();
54 const now = Date.now();
55 await Promise.all(
56 data.records
57 .filter((r) => now - new Date(r.value.createdAt).getTime() > MAX_AGE_MS)
58 .map((r) => {
59 const rkey = r.uri.split("/").pop();
60 return deleteRecord(session, rkey);
61 }),
62 );
63}
64
65/**
66 * @param {string} myDid
67 * @param {(callerDid: string, record: object) => void} onOffer
68 * @param {(answererDid: string, record: object) => void} onAnswer
69 * @returns {WebSocket}
70 */
71export function connectJetstream(myDid, onOffer, onAnswer) {
72 const url = `${JETSTREAM}?wantedCollections=${COLLECTION}`;
73 let ws;
74 let destroyed = false;
75
76 function connect() {
77 ws = new WebSocket(url);
78
79 ws.onmessage = (e) => {
80 const event = JSON.parse(e.data);
81 if (event.kind !== "commit") return;
82 if (event.commit.operation !== "create") return;
83
84 const record = event.commit.record;
85 if (record.target !== myDid) return;
86
87 // ignore stale signals
88 const age = Date.now() - new Date(record.createdAt).getTime();
89 if (age > MAX_AGE_MS) return;
90
91 if (record.type === "offer") onOffer(event.did, record);
92 else if (record.type === "answer") onAnswer(event.did, record);
93 };
94
95 ws.onclose = () => {
96 if (!destroyed) setTimeout(connect, 2000);
97 };
98
99 ws.onerror = () => ws.close();
100 }
101
102 connect();
103
104 return {
105 close() {
106 destroyed = true;
107 ws?.close();
108 },
109 };
110}