appview-less bluesky client
1import { get, writable } from 'svelte/store';
2import {
3 AtpClient,
4 setRecordCache,
5 type NotificationsStream,
6 type NotificationsStreamEvent
7} from './at/client.svelte';
8import { SvelteMap, SvelteDate, SvelteSet } from 'svelte/reactivity';
9import type { Did, Handle, Nsid, RecordKey, ResourceUri } from '@atcute/lexicons';
10import { fetchPosts, hydratePosts, type HydrateOptions, type PostWithUri } from './at/fetch';
11import { parseCanonicalResourceUri, type AtprotoDid } from '@atcute/lexicons/syntax';
12import {
13 AppBskyActorProfile,
14 AppBskyFeedPost,
15 AppBskyGraphBlock,
16 type AppBskyGraphFollow
17} from '@atcute/bluesky';
18import type { JetstreamSubscription, JetstreamEvent } from '@atcute/jetstream';
19import { expect, ok } from './result';
20import type { Backlink, BacklinksSource } from './at/constellation';
21import { now as tidNow } from '@atcute/tid';
22import type { Records } from '@atcute/lexicons/ambient';
23import {
24 blockSource,
25 extractDidFromUri,
26 likeSource,
27 replyRootSource,
28 replySource,
29 repostSource,
30 timestampFromCursor,
31 toCanonicalUri
32} from '$lib';
33import { Router } from './router.svelte';
34import { accounts, type Account } from './accounts';
35import {
36 getPreferences,
37 putPreferences,
38 type Preferences
39} from './at/pocket';
40
41export const notificationStream = writable<NotificationsStream | null>(null);
42export const jetstream = writable<JetstreamSubscription | null>(null);
43
44export const profiles = new SvelteMap<Did, AppBskyActorProfile.Main>();
45export const handles = new SvelteMap<Did, Handle>();
46
47// source -> subject -> did (who did the interaction) -> rkey
48export type BacklinksMap = SvelteMap<
49 BacklinksSource,
50 SvelteMap<ResourceUri, SvelteMap<Did, SvelteSet<RecordKey>>>
51>;
52export const allBacklinks: BacklinksMap = new SvelteMap();
53
54export const addBacklinks = (
55 subject: ResourceUri,
56 source: BacklinksSource,
57 links: Iterable<Backlink>
58) => {
59 let subjectMap = allBacklinks.get(source);
60 if (!subjectMap) {
61 subjectMap = new SvelteMap();
62 allBacklinks.set(source, subjectMap);
63 }
64
65 let didMap = subjectMap.get(subject);
66 if (!didMap) {
67 didMap = new SvelteMap();
68 subjectMap.set(subject, didMap);
69 }
70
71 for (const link of links) {
72 let rkeys = didMap.get(link.did);
73 if (!rkeys) {
74 rkeys = new SvelteSet();
75 didMap.set(link.did, rkeys);
76 }
77 rkeys.add(link.rkey);
78 }
79};
80
81export const removeBacklinks = (
82 subject: ResourceUri,
83 source: BacklinksSource,
84 links: Iterable<Backlink>
85) => {
86 const didMap = allBacklinks.get(source)?.get(subject);
87 if (!didMap) return;
88
89 for (const link of links) {
90 const rkeys = didMap.get(link.did);
91 if (!rkeys) continue;
92 rkeys.delete(link.rkey);
93 if (rkeys.size === 0) didMap.delete(link.did);
94 }
95};
96
97export const findBacklinksBy = (subject: ResourceUri, source: BacklinksSource, did: Did) => {
98 const rkeys = allBacklinks.get(source)?.get(subject)?.get(did) ?? [];
99 // reconstruct the collection from the source
100 const collection = source.split(':')[0] as Nsid;
101 return rkeys.values().map((rkey) => ({ did, collection, rkey }));
102};
103
104export const hasBacklink = (subject: ResourceUri, source: BacklinksSource, did: Did): boolean => {
105 return allBacklinks.get(source)?.get(subject)?.has(did) ?? false;
106};
107
108export const getAllBacklinksFor = (subject: ResourceUri, source: BacklinksSource): Backlink[] => {
109 const subjectMap = allBacklinks.get(source);
110 if (!subjectMap) return [];
111
112 const didMap = subjectMap.get(subject);
113 if (!didMap) return [];
114
115 const collection = source.split(':')[0] as Nsid;
116 const result: Backlink[] = [];
117
118 for (const [did, rkeys] of didMap)
119 for (const rkey of rkeys) result.push({ did, collection, rkey });
120
121 return result;
122};
123
124export const isBlockedBy = (subject: Did, blocker: Did): boolean => {
125 return hasBacklink(`at://${subject}`, 'app.bsky.graph.block:subject', blocker);
126};
127
128// eslint-disable-next-line @typescript-eslint/no-explicit-any
129const getNestedValue = (obj: any, path: string[]): any => {
130 return path.reduce((current, key) => current?.[key], obj);
131};
132
133// eslint-disable-next-line @typescript-eslint/no-explicit-any
134const setNestedValue = (obj: any, path: string[], value: any): void => {
135 const lastKey = path[path.length - 1];
136 const parent = path.slice(0, -1).reduce((current, key) => {
137 if (current[key] === undefined) current[key] = {};
138 return current[key];
139 }, obj);
140 parent[lastKey] = value;
141};
142
143export const backlinksCursors = new SvelteMap<
144 Did,
145 SvelteMap<BacklinksSource, string | undefined>
146>();
147
148export const fetchLinksUntil = async (
149 subject: Did,
150 client: AtpClient,
151 backlinkSource: BacklinksSource,
152 timestamp: number = -1
153) => {
154 let cursorMap = backlinksCursors.get(subject);
155 if (!cursorMap) {
156 cursorMap = new SvelteMap<BacklinksSource, string | undefined>();
157 backlinksCursors.set(subject, cursorMap);
158 }
159
160 const [_collection, source] = backlinkSource.split(':');
161 const collection = _collection as keyof Records;
162 const cursor = cursorMap.get(backlinkSource);
163
164 // if already fetched we dont need to fetch again
165 const cursorTimestamp = timestampFromCursor(cursor);
166 if (cursorTimestamp && cursorTimestamp <= timestamp) return;
167
168 console.log(`${subject}: fetchLinksUntil`, backlinkSource, cursor, timestamp);
169 const result = await client.listRecordsUntil(subject, collection, cursor, timestamp);
170
171 if (!result.ok) {
172 console.error('failed to fetch links until', result.error);
173 return;
174 }
175 cursorMap.set(backlinkSource, result.value.cursor);
176
177 const path = source.split('.');
178 for (const record of result.value.records) {
179 const uri = getNestedValue(record.value, path);
180 const parsedUri = parseCanonicalResourceUri(record.uri);
181 if (!parsedUri.ok) continue;
182 addBacklinks(uri, `${collection}:${source}`, [
183 {
184 did: parsedUri.value.repo,
185 collection: parsedUri.value.collection,
186 rkey: parsedUri.value.rkey
187 }
188 ]);
189 }
190};
191
192export const deletePostBacklink = async (
193 client: AtpClient,
194 post: PostWithUri,
195 source: BacklinksSource
196) => {
197 const did = client.user?.did;
198 if (!did) return;
199 const collection = source.split(':')[0] as Nsid;
200 const links = findBacklinksBy(post.uri, source, did);
201 removeBacklinks(post.uri, source, links);
202 await Promise.allSettled(
203 links.map((link) =>
204 client.user?.atcute.post('com.atproto.repo.deleteRecord', {
205 input: { repo: did, collection, rkey: link.rkey! }
206 })
207 )
208 );
209};
210
211export const createPostBacklink = async (
212 client: AtpClient,
213 post: PostWithUri,
214 source: BacklinksSource
215) => {
216 const did = client.user?.did;
217 if (!did) return;
218 const [_collection, subject] = source.split(':');
219 const collection = _collection as Nsid;
220 const rkey = tidNow();
221 addBacklinks(post.uri, source, [
222 {
223 did,
224 collection,
225 rkey
226 }
227 ]);
228 const record = {
229 $type: collection,
230 // eslint-disable-next-line svelte/prefer-svelte-reactivity
231 createdAt: new Date().toISOString()
232 };
233 const subjectPath = subject.split('.');
234 setNestedValue(record, subjectPath, post.uri);
235 setNestedValue(record, [...subjectPath.slice(0, -1), 'cid'], post.cid);
236 await client.user?.atcute.post('com.atproto.repo.createRecord', {
237 input: {
238 repo: did,
239 collection,
240 rkey,
241 record
242 }
243 });
244};
245
246export const pulsingPostId = writable<string | null>(null);
247
248export const viewClient = new AtpClient();
249export const clients = new SvelteMap<Did, AtpClient>();
250
251export const accountPreferences = new SvelteMap<Did, Preferences>();
252
253const PREFS_STORAGE_KEY = 'accountPreferences';
254
255const loadLocalPreferences = (): Map<Did, Preferences> => {
256 if (typeof localStorage === 'undefined') return new Map();
257 try {
258 const stored = localStorage.getItem(PREFS_STORAGE_KEY);
259 if (!stored) return new Map();
260 return new Map(Object.entries(JSON.parse(stored))) as Map<Did, Preferences>;
261 } catch {
262 return new Map();
263 }
264};
265
266const saveLocalPreferences = () => {
267 if (typeof localStorage === 'undefined') return;
268 const obj = Object.fromEntries(accountPreferences.entries());
269 localStorage.setItem(PREFS_STORAGE_KEY, JSON.stringify(obj));
270};
271
272export const loadAccountPreferences = async (account: Account) => {
273 const client = clients.get(account.did);
274 if (!client) return;
275
276 const localPrefs = loadLocalPreferences().get(account.did);
277 const remoteResult = await getPreferences(client);
278
279 if (!remoteResult.ok) {
280 console.error('failed to load preferences from pocket:', remoteResult.error);
281 if (localPrefs) accountPreferences.set(account.did, localPrefs);
282 return;
283 }
284
285 const remotePrefs = remoteResult.value;
286
287 if (!remotePrefs && !localPrefs) {
288 return;
289 }
290
291 if (!remotePrefs && localPrefs) {
292 accountPreferences.set(account.did, localPrefs);
293 await putPreferences(client, localPrefs);
294 return;
295 }
296
297 if (remotePrefs && !localPrefs) {
298 accountPreferences.set(account.did, remotePrefs);
299 saveLocalPreferences();
300 return;
301 }
302
303 // both exist - last modified wins
304 const localTime = new Date(localPrefs!.updatedAt).getTime();
305 const remoteTime = new Date(remotePrefs!.updatedAt).getTime();
306
307 if (localTime > remoteTime) {
308 accountPreferences.set(account.did, localPrefs!);
309 await putPreferences(client, localPrefs!);
310 } else {
311 accountPreferences.set(account.did, remotePrefs!);
312 saveLocalPreferences();
313 }
314};
315
316export const setAccountPreferences = (
317 did: Did,
318 partial: Partial<Omit<Preferences, 'updatedAt'>>
319) => {
320 const existing = accountPreferences.get(did) ?? { updatedAt: '' };
321 const updated: Preferences = {
322 ...existing,
323 ...partial,
324 updatedAt: new Date().toISOString()
325 };
326
327 accountPreferences.set(did, updated);
328 saveLocalPreferences();
329 return updated;
330};
331
332export const syncAccountPreferences = async (did: Did) => {
333 const prefs = accountPreferences.get(did);
334 if (!prefs) return;
335
336 const client = clients.get(did);
337 if (client) {
338 const result = await putPreferences(client, prefs);
339 if (!result.ok) console.error('failed to sync preferences to pocket:', result.error);
340 }
341};
342
343export const updateAccountPreferences = async (
344 did: Did,
345 partial: Partial<Omit<Preferences, 'updatedAt'>>
346) => {
347 setAccountPreferences(did, partial);
348 await syncAccountPreferences(did);
349};
350
351type FollowWithUri = {
352 uri: ResourceUri;
353 record: AppBskyGraphFollow.Main;
354};
355export const follows = new SvelteMap<Did, SvelteMap<Did, FollowWithUri>>();
356
357export const addFollows = (
358 did: Did,
359 followList: Iterable<FollowWithUri>
360) => {
361 let map = follows.get(did)!;
362 if (!map) {
363 map = new SvelteMap();
364 follows.set(did, map);
365 }
366 for (const follow of followList) map.set(follow.record.subject, follow);
367};
368
369export const fetchFollows = async (
370 account: Account
371): Promise<IteratorObject<AppBskyGraphFollow.Main>> => {
372 const client = clients.get(account.did)!;
373 const res = await client.listRecordsUntil(account.did, 'app.bsky.graph.follow');
374 if (!res.ok) {
375 console.error("can't fetch follows:", res.error);
376 return [].values();
377 }
378 addFollows(
379 account.did,
380 res.value.records.map((follow) => ({
381 uri: follow.uri,
382 record: follow.value as AppBskyGraphFollow.Main
383 }))
384 );
385 return res.value.records.values().map((follow) => follow.value as AppBskyGraphFollow.Main);
386};
387
388// this fetches up to three days of posts and interactions for using in following list
389export const fetchForInteractions = async (client: AtpClient, subject: Did) => {
390 const userDid = client.user?.did;
391 if (!userDid) return;
392
393 const threeDaysAgo = (Date.now() - 3 * 24 * 60 * 60 * 1000) * 1000;
394
395 const res = await client.listRecordsUntil(subject, 'app.bsky.feed.post', undefined, threeDaysAgo);
396 if (!res.ok) return;
397 const postsWithUri = res.value.records.map(
398 (post) =>
399 ({ cid: post.cid, uri: post.uri, record: post.value as AppBskyFeedPost.Main }) as PostWithUri
400 );
401 addPosts(postsWithUri);
402
403 // add to following buffer (not feed directly)
404 let buffer = followingBuffer.get(userDid);
405 if (!buffer) {
406 buffer = new SvelteSet();
407 followingBuffer.set(userDid, buffer);
408 }
409 for (const post of postsWithUri) buffer.add(post.uri);
410
411 if (res.value.cursor) {
412 let userCursors = followingCursors.get(userDid);
413 if (!userCursors) {
414 userCursors = new SvelteMap();
415 followingCursors.set(userDid, userCursors);
416 }
417 userCursors.set(subject, res.value.cursor);
418 }
419
420 const cursorTimestamp = timestampFromCursor(res.value.cursor) ?? -1;
421 const timestamp = Math.min(cursorTimestamp, threeDaysAgo);
422 console.log(`${subject}: fetchForInteractions`, res.value.cursor, timestamp);
423 await Promise.all([repostSource].map((s) => fetchLinksUntil(subject, client, s, timestamp)));
424};
425
426export const fetchFollowingTimeline = async (client: AtpClient, targetDid?: Did, limit: number = 10) => {
427 // 1. Identify candidates (active follows + self)
428 const userDid = targetDid ?? client.user?.did;
429 if (!userDid) return;
430
431 let buffer = followingBuffer.get(userDid);
432 if (!buffer) {
433 buffer = new SvelteSet();
434 followingBuffer.set(userDid, buffer);
435 }
436
437 let userFeed = followingFeed.get(userDid);
438 if (!userFeed) {
439 userFeed = new SvelteSet();
440 followingFeed.set(userDid, userFeed);
441 }
442
443 let userCursors = followingCursors.get(userDid);
444 if (!userCursors) {
445 userCursors = new SvelteMap();
446 followingCursors.set(userDid, userCursors);
447 }
448
449 // 0. Drain buffer first
450 if (buffer.size > 0) {
451 const sorted = Array.from(buffer).sort((a, b) => {
452 const postA = getPostFromUri(a);
453 const postB = getPostFromUri(b);
454 return (
455 new Date(postB?.record.createdAt ?? 0).getTime() -
456 new Date(postA?.record.createdAt ?? 0).getTime()
457 );
458 });
459
460 const toAdd = sorted.slice(0, limit);
461 for (const uri of toAdd) {
462 userFeed.add(uri);
463 buffer.delete(uri);
464 }
465
466 return;
467 }
468
469 const subjects = new Set(follows.get(userDid)?.keys());
470 subjects.add(userDid);
471
472 // 2. Find the "newest" cursor(s)
473 let maxCursor: string | undefined = undefined;
474 let candidates: Did[] = [];
475
476 for (const subject of subjects) {
477 const cursor = userCursors.get(subject);
478
479 // null means exhausted
480 if (cursor === null) continue;
481
482 // if we haven't fetched this user yet (undefined cursor), they are a candidate (newest)
483 if (cursor === undefined) {
484 if (maxCursor !== undefined) {
485 maxCursor = undefined;
486 candidates = [subject];
487 } else {
488 candidates.push(subject);
489 }
490 continue;
491 }
492
493 // If maxCursor is undefined (meaning we have some 'now' candidates), skip checked cursors
494 if (maxCursor === undefined && candidates.length > 0) continue;
495
496 if (maxCursor === undefined || cursor > maxCursor) {
497 maxCursor = cursor;
498 candidates = [subject];
499 } else if (cursor === maxCursor) {
500 candidates.push(subject);
501 }
502 }
503
504 if (candidates.length === 0) return; // Everyone exhausted?
505
506 // 3. Fetch from candidates
507 console.log('fetching following timeline from', candidates, maxCursor);
508 const results = await Promise.all(
509 candidates.map(async (did) => {
510 const cursor = userCursors!.get(did) ?? undefined;
511 // fetch limit is 4th argument, cursor is 3rd
512 const res = await client.listRecords(did, 'app.bsky.feed.post', cursor, limit);
513 if (!res.ok) {
514 console.error(`fetchFollowingTimeline failed for ${did}:`, res.error);
515 return null;
516 }
517 return { did, res: res.value };
518 })
519 );
520
521 // 4. Update state - use records directly from listRecords instead of re-fetching
522 const validPosts: PostWithUri[] = [];
523 for (const result of results) {
524 if (!result) continue;
525 const { did, res } = result;
526
527 // update cursor
528 if (res.cursor) userCursors!.set(did, res.cursor);
529 else userCursors!.set(did, null); // null = exhausted
530
531 for (const record of res.records) {
532 validPosts.push({
533 uri: record.uri,
534 cid: record.cid,
535 record: record.value as AppBskyFeedPost.Main
536 });
537 }
538 }
539
540 if (validPosts.length === 0) return;
541
542 addPosts(validPosts);
543
544 for (const post of validPosts) userFeed.add(post.uri);
545
546 // check if any of the post authors block the user
547 await fetchBlocksForPosts(
548 client,
549 validPosts.map((p) => p.uri)
550 );
551};
552
553// if did is in set, we have fetched blocks for them already (against logged in users)
554export const blockFlags = new SvelteMap<Did, SvelteSet<Did>>();
555
556export const fetchBlocked = async (client: AtpClient, subject: Did, blocker: Did) => {
557 const subjectUri = `at://${subject}` as ResourceUri;
558 const res = await client.getBacklinks(subjectUri, blockSource, [blocker], 1);
559 if (!res.ok) return false;
560 if (res.value.total > 0) addBacklinks(subjectUri, blockSource, res.value.records);
561
562 // mark as fetched
563 let flags = blockFlags.get(subject);
564 if (!flags) {
565 flags = new SvelteSet();
566 blockFlags.set(subject, flags);
567 }
568 flags.add(blocker);
569
570 return res.value.total > 0;
571};
572
573export const fetchBlocksForPosts = async (client: AtpClient, postUris: Iterable<ResourceUri>) => {
574 const userDid = client.user?.did;
575 if (!userDid) return;
576
577 // check if any of the post authors block the user
578 // eslint-disable-next-line svelte/prefer-svelte-reactivity
579 let distinctDids = new Set(Array.from(postUris).map((uri) => extractDidFromUri(uri)!));
580 distinctDids.delete(userDid); // dont need to check if user blocks themselves
581 const alreadyFetched = blockFlags.get(userDid);
582 if (alreadyFetched) distinctDids = distinctDids.difference(alreadyFetched);
583 if (distinctDids.size > 0)
584 await Promise.all(distinctDids.values().map((did) => fetchBlocked(client, userDid, did)));
585};
586
587export const fetchBlocks = async (account: Account) => {
588 const client = clients.get(account.did)!;
589 const res = await client.listRecordsUntil(account.did, 'app.bsky.graph.block');
590 if (!res.ok) return;
591 for (const block of res.value.records) {
592 const record = block.value as AppBskyGraphBlock.Main;
593 const parsedUri = expect(parseCanonicalResourceUri(block.uri));
594 addBacklinks(`at://${record.subject}`, blockSource, [
595 {
596 did: parsedUri.repo,
597 collection: parsedUri.collection,
598 rkey: parsedUri.rkey
599 }
600 ]);
601 }
602};
603
604export const createBlock = async (client: AtpClient, targetDid: Did) => {
605 const userDid = client.user?.did;
606 if (!userDid) return;
607
608 const rkey = tidNow();
609 const targetUri = `at://${targetDid}` as ResourceUri;
610
611 addBacklinks(targetUri, blockSource, [
612 {
613 did: userDid,
614 collection: 'app.bsky.graph.block',
615 rkey
616 }
617 ]);
618
619 const record: AppBskyGraphBlock.Main = {
620 $type: 'app.bsky.graph.block',
621 subject: targetDid,
622 // eslint-disable-next-line svelte/prefer-svelte-reactivity
623 createdAt: new Date().toISOString()
624 };
625
626 await client.user?.atcute.post('com.atproto.repo.createRecord', {
627 input: {
628 repo: userDid,
629 collection: 'app.bsky.graph.block',
630 rkey,
631 record
632 }
633 });
634};
635
636export const deleteBlock = async (client: AtpClient, targetDid: Did) => {
637 const userDid = client.user?.did;
638 if (!userDid) return;
639
640 const targetUri = `at://${targetDid}` as ResourceUri;
641 const links = findBacklinksBy(targetUri, blockSource, userDid);
642
643 removeBacklinks(targetUri, blockSource, links);
644
645 await Promise.allSettled(
646 links.map((link) =>
647 client.user?.atcute.post('com.atproto.repo.deleteRecord', {
648 input: {
649 repo: userDid,
650 collection: 'app.bsky.graph.block',
651 rkey: link.rkey
652 }
653 })
654 )
655 );
656};
657
658export const isBlockedByUser = (targetDid: Did, userDid: Did): boolean => {
659 return isBlockedBy(targetDid, userDid);
660};
661
662export const isUserBlockedBy = (userDid: Did, targetDid: Did): boolean => {
663 return isBlockedBy(userDid, targetDid);
664};
665
666export const hasBlockRelationship = (did1: Did, did2: Did): boolean => {
667 return isBlockedBy(did1, did2) || isBlockedBy(did2, did1);
668};
669
670export const getBlockRelationship = (
671 userDid: Did,
672 targetDid: Did
673): { userBlocked: boolean; blockedByTarget: boolean } => {
674 return {
675 userBlocked: isBlockedBy(targetDid, userDid),
676 blockedByTarget: isBlockedBy(userDid, targetDid)
677 };
678};
679
680export const allPosts = new SvelteMap<ResourceUri, PostWithUri>();
681export const postsByDid = new SvelteMap<Did, SvelteSet<ResourceUri>>();
682export type DeletedPostInfo = { reply?: PostWithUri['record']['reply'] };
683export const deletedPosts = new SvelteMap<ResourceUri, DeletedPostInfo>();
684
685// posts grouped by root uri for efficient thread building
686export const postsByRootUri = new SvelteMap<ResourceUri, SvelteSet<ResourceUri>>();
687// did -> post uris that are replies to that did
688export const replyIndex = new SvelteMap<Did, SvelteSet<ResourceUri>>();
689
690export const getPost = (did: Did, rkey: RecordKey) =>
691 allPosts.get(toCanonicalUri({ did, collection: 'app.bsky.feed.post', rkey }));
692
693export const getPostFromUri = (uri: ResourceUri) => allPosts.get(uri);
694
695const hydrateCacheFn: Parameters<typeof hydratePosts>[3] = (did, rkey) => {
696 const cached = getPost(did, rkey);
697 return cached ? ok(cached) : undefined;
698};
699
700export const addPosts = (newPosts: Iterable<PostWithUri>) => {
701 for (const post of newPosts) {
702 const parsedUri = expect(parseCanonicalResourceUri(post.uri));
703 allPosts.set(post.uri, post);
704
705 // update postsByDid index
706 let didPosts = postsByDid.get(parsedUri.repo);
707 if (!didPosts) {
708 didPosts = new SvelteSet();
709 postsByDid.set(parsedUri.repo, didPosts);
710 }
711 didPosts.add(post.uri);
712
713 // update postsByRootUri grouping
714 const rootUri = (post.record.reply?.root.uri as ResourceUri) || post.uri;
715 let rootGroup = postsByRootUri.get(rootUri);
716 if (!rootGroup) {
717 rootGroup = new SvelteSet();
718 postsByRootUri.set(rootUri, rootGroup);
719 }
720 rootGroup.add(post.uri);
721
722 if (post.record.reply) {
723 const link = {
724 did: parsedUri.repo,
725 collection: parsedUri.collection,
726 rkey: parsedUri.rkey
727 };
728 addBacklinks(post.record.reply.parent.uri, replySource, [link]);
729 addBacklinks(post.record.reply.root.uri, replyRootSource, [link]);
730
731 // update reply index
732 const parentDid = extractDidFromUri(post.record.reply.parent.uri);
733 if (parentDid) {
734 let set = replyIndex.get(parentDid);
735 if (!set) {
736 set = new SvelteSet();
737 replyIndex.set(parentDid, set);
738 }
739 set.add(post.uri);
740 }
741 }
742 }
743};
744
745export const deletePost = (uri: ResourceUri) => {
746 const did = extractDidFromUri(uri)!;
747 const post = allPosts.get(uri);
748 if (!post) return;
749 allPosts.delete(uri);
750 postsByDid.get(did)?.delete(uri);
751 // remove reply from index
752 const subjectDid = extractDidFromUri(post.record.reply?.parent.uri ?? '');
753 if (subjectDid) replyIndex.get(subjectDid)?.delete(uri);
754 deletedPosts.set(uri, { reply: post.record.reply });
755};
756
757export const timelines = new SvelteMap<Did, SvelteSet<ResourceUri>>();
758export const postCursors = new SvelteMap<Did, { value?: string; end: boolean }>();
759
760// feed state: Did -> feedUri -> post URIs
761export const feedTimelines = new SvelteMap<Did, SvelteMap<string, ResourceUri[]>>();
762export const feedCursors = new SvelteMap<Did, SvelteMap<string, { value?: string; end: boolean }>>();
763
764export const followingFeed = new SvelteMap<Did, SvelteSet<ResourceUri>>(); // merged timeline: UserDid -> Set<Uri>
765export const followingBuffer = new SvelteMap<Did, SvelteSet<ResourceUri>>(); // buffer: UserDid -> Set<Uri>
766export const followingCursors = new SvelteMap<Did, SvelteMap<Did, string | null>>(); // cursors: UserDid -> SubjectDid -> Cursor
767
768export const fetchFeed = async (
769 client: AtpClient,
770 feedUri: string,
771 feedServiceDid: string,
772 limit: number = 10
773) => {
774 const userDid = client.user?.did;
775 if (!userDid) return;
776
777 let userFeedCursors = feedCursors.get(userDid);
778 if (!userFeedCursors) {
779 userFeedCursors = new SvelteMap();
780 feedCursors.set(userDid, userFeedCursors);
781 }
782
783 const cursor = userFeedCursors.get(feedUri);
784 if (cursor?.end) return;
785
786 const skeleton = await import('./at/feeds').then((m) =>
787 m.fetchFeedSkeleton(client, feedUri, feedServiceDid, cursor?.value, limit)
788 );
789 if (!skeleton) throw `failed to fetch feed skeleton for ${feedUri}`;
790
791 const newCursor = { value: skeleton.cursor, end: skeleton.feed.length === 0 };
792 userFeedCursors.set(feedUri, newCursor);
793
794 const uris = skeleton.feed.slice(0, limit).map((item) => item.post as ResourceUri);
795
796 let userFeedTimelines = feedTimelines.get(userDid);
797 if (!userFeedTimelines) {
798 userFeedTimelines = new SvelteMap();
799 feedTimelines.set(userDid, userFeedTimelines);
800 }
801
802 const existing = userFeedTimelines.get(feedUri) ?? [];
803 userFeedTimelines.set(feedUri, [...existing, ...uris]);
804
805 // fetch each post record
806 const posts = await Promise.all(
807 uris.map(async (uri) => {
808 const result = await client.getRecordUri(AppBskyFeedPost.mainSchema, uri);
809 if (!result.ok) return null;
810 return { uri: result.value.uri, cid: result.value.cid, record: result.value.record };
811 })
812 );
813
814 const validPosts = posts.filter((p): p is PostWithUri => p !== null);
815 addPosts(validPosts);
816
817 // check if any of the post authors block the user
818 await fetchBlocksForPosts(
819 client,
820 validPosts.map((p) => p.uri)
821 );
822
823 return newCursor;
824};
825
826export const checkForNewPosts = async (client: AtpClient, feedUri: string, feedServiceDid: string) => {
827 const userDid = client.user?.did;
828 if (!userDid) return false;
829
830 const currentFeed = feedTimelines.get(userDid)?.get(feedUri);
831 if (!currentFeed || currentFeed.length === 0) return false;
832
833 const skeleton = await import('./at/feeds').then((m) =>
834 m.fetchFeedSkeleton(client, feedUri, feedServiceDid, undefined, 1)
835 );
836
837 if (skeleton && skeleton.feed.length > 0) {
838 const latestPost = skeleton.feed[0].post;
839 return latestPost !== currentFeed[0];
840 }
841
842 return false;
843};
844
845export const resetFeed = (did: Did, feedUri: string) => {
846 feedTimelines.get(did)?.delete(feedUri);
847 feedCursors.get(did)?.delete(feedUri);
848};
849
850
851const traversePostChain = (post: PostWithUri) => {
852 const result = [post.uri];
853 const parentUri = post.record.reply?.parent.uri;
854 if (parentUri) {
855 const parentPost = allPosts.get(parentUri as ResourceUri);
856 if (parentPost) result.push(...traversePostChain(parentPost));
857 }
858 return result;
859};
860export const addTimeline = (did: Did, uris: Iterable<ResourceUri>) => {
861 let timeline = timelines.get(did);
862 if (!timeline) {
863 timeline = new SvelteSet();
864 timelines.set(did, timeline);
865 }
866 for (const uri of uris) {
867 const post = allPosts.get(uri);
868 // we need to traverse the post chain to add all posts in the chain to the timeline
869 // because the parent posts might not be in the timeline yet
870 const chain = post ? traversePostChain(post) : [uri];
871 for (const uri of chain) timeline.add(uri);
872 }
873};
874
875export const fetchTimeline = async (
876 client: AtpClient,
877 subject: Did,
878 limit: number = 6,
879 withBacklinks: boolean = true,
880 hydrateOptions?: Partial<HydrateOptions>
881) => {
882 const cursor = postCursors.get(subject);
883 if (cursor && cursor.end) return;
884
885 const accPosts = await fetchPosts(subject, client, cursor?.value, limit, withBacklinks);
886 if (!accPosts.ok) throw `cant fetch posts ${subject}: ${accPosts.error}`;
887
888 // if the cursor is undefined, we've reached the end of the timeline
889 const newCursor = { value: accPosts.value.cursor, end: !accPosts.value.cursor };
890 postCursors.set(subject, newCursor);
891 const hydrated = await hydratePosts(
892 client,
893 subject,
894 accPosts.value.posts,
895 hydrateCacheFn,
896 hydrateOptions
897 );
898 if (!hydrated.ok) throw `cant hydrate posts ${subject}: ${hydrated.error}`;
899
900 addPosts(hydrated.value.values());
901 addTimeline(subject, hydrated.value.keys());
902
903 await fetchBlocksForPosts(client, hydrated.value.keys());
904
905 console.log(`${subject}: fetchTimeline`, accPosts.value.cursor);
906 return newCursor;
907};
908
909export const fetchInteractionsToTimelineEnd = async (
910 client: AtpClient,
911 interactor: Did,
912 subject: Did
913) => {
914 const cursor = postCursors.get(subject);
915 if (!cursor) return;
916 const timestamp = timestampFromCursor(cursor.value);
917 await Promise.all(
918 [likeSource, repostSource].map((s) => fetchLinksUntil(interactor, client, s, timestamp))
919 );
920};
921
922export const fetchInteractionsToFollowingTimelineEnd = async (
923 client: AtpClient,
924 userDid: Did
925) => {
926 const userCursors = followingCursors.get(userDid);
927 if (!userCursors) return;
928
929 const feed = followingFeed.get(userDid);
930 if (!feed || feed.size === 0) return;
931
932 let minTimestamp = Date.now() * 1000;
933 let found = false;
934
935 for (const uri of feed) {
936 const post = getPostFromUri(uri);
937 if (post) {
938 const ts = new Date(post.record.createdAt).getTime() * 1000;
939 if (ts < minTimestamp) minTimestamp = ts;
940 found = true;
941 }
942 }
943
944 if (!found) return;
945
946 await Promise.all(
947 [likeSource, repostSource].map((s) => fetchLinksUntil(userDid, client, s, minTimestamp))
948 );
949};
950
951export const fetchInteractionsToFeedTimelineEnd = async (
952 client: AtpClient,
953 userDid: Did,
954 feedUri: string
955) => {
956 const userFeedTimelines = feedTimelines.get(userDid);
957 if (!userFeedTimelines) return;
958 const posts = userFeedTimelines.get(feedUri);
959 if (!posts || posts.length === 0) return;
960
961 let minTimestamp = Date.now() * 1000;
962 let found = false;
963
964 for (const uri of posts) {
965 const post = getPostFromUri(uri);
966 if (post) {
967 const ts = new Date(post.record.createdAt).getTime() * 1000;
968 if (ts < minTimestamp) minTimestamp = ts;
969 found = true;
970 }
971 }
972
973 if (!found) return;
974
975 await Promise.all(
976 [likeSource, repostSource].map((s) => fetchLinksUntil(userDid, client, s, minTimestamp))
977 );
978};
979
980export const initialDone = new SvelteSet<Did>();
981export const fetchInitial = async (account: Account) => {
982 const start = Date.now();
983 const client = clients.get(account.did)!;
984 const profile = await client.getProfile();
985 if (profile.ok) profiles.set(account.did, profile.value);
986 await Promise.allSettled([
987 fetchBlocks(account),
988 fetchForInteractions(client, account.did),
989 fetchFollows(account).then((follows) =>
990 Promise.allSettled(follows.map((follow) => fetchForInteractions(client, follow.subject)) ?? [])
991 )
992 ]);
993 initialDone.add(account.did);
994 console.log(`initial done for ${account.did} in ${Date.now() - start}ms`);
995};
996
997export const handleJetstreamEvent = async (event: JetstreamEvent) => {
998 if (event.kind !== 'commit') return;
999
1000 const { did, commit } = event;
1001 const uri: ResourceUri = toCanonicalUri({ did, ...commit });
1002 if (commit.collection === 'app.bsky.feed.post') {
1003 if (commit.operation === 'create') {
1004 const record = commit.record as AppBskyFeedPost.Main;
1005 const posts = [
1006 {
1007 record,
1008 uri,
1009 cid: commit.cid
1010 }
1011 ];
1012 await setRecordCache(uri, record);
1013 const client = clients.get(did) ?? viewClient;
1014 const hydrated = await hydratePosts(client, did, posts, hydrateCacheFn);
1015 if (!hydrated.ok) {
1016 console.error(`cant hydrate posts ${did}: ${hydrated.error}`);
1017 return;
1018 }
1019 addPosts(hydrated.value.values());
1020 addTimeline(did, hydrated.value.keys());
1021
1022 if (record.reply) {
1023 const parentDid = extractDidFromUri(record.reply.parent.uri)!;
1024 addTimeline(parentDid, [uri]);
1025 }
1026
1027 // Broadcast to following feeds of local accounts
1028 for (const account of get(accounts)) {
1029 // does this account follow the author?
1030 let isFollowing = account.did === did;
1031 if (!isFollowing) {
1032 const accountFollows = follows.get(account.did);
1033 if (accountFollows?.has(did)) isFollowing = true;
1034 }
1035
1036 if (isFollowing) {
1037 const feed = followingFeed.get(account.did);
1038 if (feed) for (const uri of hydrated.value.keys()) feed.add(uri);
1039 }
1040 }
1041 } else if (commit.operation === 'delete') {
1042 deletePost(uri);
1043 }
1044 }
1045};
1046
1047const handlePostNotification = async (event: NotificationsStreamEvent & { type: 'message' }) => {
1048 const parsedSubjectUri = expect(parseCanonicalResourceUri(event.data.link.subject));
1049 const did = parsedSubjectUri.repo as AtprotoDid;
1050 const client = clients.get(did);
1051 if (!client) {
1052 console.error(`${did}: cant handle post notification, client not found !?`);
1053 return;
1054 }
1055 const subjectPost = await client.getRecord(
1056 AppBskyFeedPost.mainSchema,
1057 did,
1058 parsedSubjectUri.rkey
1059 );
1060 if (!subjectPost.ok) return;
1061
1062 const parsedSourceUri = expect(parseCanonicalResourceUri(event.data.link.source_record));
1063 const posts = [
1064 {
1065 record: subjectPost.value.record,
1066 uri: event.data.link.subject,
1067 cid: subjectPost.value.cid,
1068 replies: {
1069 cursor: null,
1070 total: 1,
1071 records: [
1072 {
1073 did: parsedSourceUri.repo,
1074 collection: parsedSourceUri.collection,
1075 rkey: parsedSourceUri.rkey
1076 }
1077 ]
1078 }
1079 }
1080 ];
1081 const hydrated = await hydratePosts(client, did, posts, hydrateCacheFn);
1082 if (!hydrated.ok) {
1083 console.error(`cant hydrate posts ${did}: ${hydrated.error}`);
1084 return;
1085 }
1086
1087 // console.log(hydrated);
1088 addPosts(hydrated.value.values());
1089 addTimeline(did, hydrated.value.keys());
1090};
1091
1092const handleBacklink = (event: NotificationsStreamEvent & { type: 'message' }) => {
1093 const parsedSource = expect(parseCanonicalResourceUri(event.data.link.source_record));
1094 addBacklinks(event.data.link.subject, event.data.link.source, [
1095 {
1096 did: parsedSource.repo,
1097 collection: parsedSource.collection,
1098 rkey: parsedSource.rkey
1099 }
1100 ]);
1101};
1102
1103export const handleNotification = async (event: NotificationsStreamEvent) => {
1104 if (event.type === 'message') {
1105 if (event.data.link.source.startsWith('app.bsky.feed.post')) handlePostNotification(event);
1106 else handleBacklink(event);
1107 }
1108};
1109
1110export const currentTime = new SvelteDate();
1111
1112if (typeof window !== 'undefined')
1113 setInterval(() => {
1114 currentTime.setTime(Date.now());
1115 }, 1000);
1116
1117export const router = new Router();