[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: remove bsky indexing from the main repo

+92 -564
+20 -1
data-plane/db/index.ts
··· 156 156 } 157 157 } 158 158 159 + async waitForConnection(): Promise<void> { 160 + if (!this.connection) { 161 + throw new Error("Database not connected"); 162 + } 163 + 164 + await this.connection.asPromise(); 165 + } 166 + 159 167 async disconnect(): Promise<void> { 160 168 if (this.connection) { 161 169 await this.connection.close(); ··· 169 177 throw new Error("Database not connected"); 170 178 } 171 179 await db.admin().ping(); 180 + } 181 + 182 + async command<T>(command: Record<string, unknown>): Promise<T> { 183 + const db = this.connection?.db; 184 + if (!db) { 185 + throw new Error("Database not connected"); 186 + } 187 + 188 + return await db.command(command) as T; 172 189 } 173 190 174 191 // Add methods for DID resolution ··· 207 224 return getResultFromDoc(doc); 208 225 } 209 226 210 - async getCursorState(identifier = "last_processed_cursor"): Promise<number | null> { 227 + async getCursorState( 228 + identifier = "last_processed_cursor", 229 + ): Promise<number | null> { 211 230 try { 212 231 const cursorState = await this.models.CursorState.findOne({ 213 232 identifier,
-3
data-plane/indexing/index.ts
··· 26 26 import * as Story from "./plugins/story.ts"; 27 27 import * as Audio from "./plugins/audio.ts"; 28 28 import * as Labeler from "./plugins/labeler.ts"; 29 - import * as CrosspostReply from "./plugins/crosspost/reply.ts"; 30 29 import { RecordProcessor } from "./processor.ts"; 31 30 import { ServerConfig } from "../../config.ts"; 32 31 import { PushService } from "../../utils/push.ts"; ··· 44 43 story: Story.PluginType; 45 44 audio: Audio.PluginType; 46 45 labeler: Labeler.PluginType; 47 - crosspostReply: CrosspostReply.PluginType; 48 46 }; 49 47 private pushService?: PushService; 50 48 ··· 68 66 story: Story.makePlugin(this.db, this.background), 69 67 audio: Audio.makePlugin(this.db, this.background), 70 68 labeler: Labeler.makePlugin(this.db, this.background), 71 - crosspostReply: CrosspostReply.makePlugin(this.db, this.background), 72 69 }; 73 70 74 71 // Set push service on all processors
-296
data-plane/indexing/plugins/crosspost/reply.ts
··· 1 - import { CID } from "multiformats/cid"; 2 - import { AtUri } from "@atp/syntax"; 3 - import * as lex from "../../../../lex/lexicons.ts"; 4 - import { Record as BskyPostRecord } from "../../../../lex/types/app/bsky/feed/post.ts"; 5 - import { 6 - isLink, 7 - isMention, 8 - } from "../../../../lex/types/app/bsky/richtext/facet.ts"; 9 - import { BackgroundQueue } from "../../../background.ts"; 10 - import { Database } from "../../../db/index.ts"; 11 - import { CrosspostReplyDocument } from "../../../db/models.ts"; 12 - import { getAncestorsAndSelf, getDescendents } from "../../../util.ts"; 13 - import { RecordProcessor } from "../../processor.ts"; 14 - 15 - type Ancestor = { 16 - uri: string; 17 - height: number; 18 - }; 19 - type Descendent = { 20 - uri: string; 21 - depth: number; 22 - cid: string; 23 - creator: string; 24 - sortAt: string; 25 - }; 26 - type IndexedReply = { 27 - reply: CrosspostReplyDocument; 28 - facets?: { type: "mention" | "link"; value: string }[]; 29 - media?: { 30 - cid?: string; 31 - alt?: string | null; 32 - }; 33 - ancestors?: Ancestor[]; 34 - descendents?: Descendent[]; 35 - }; 36 - 37 - const lexId = lex.ids.AppBskyFeedPost; 38 - 39 - const REPLY_NOTIF_DEPTH = 5; 40 - 41 - const insertFn = async ( 42 - db: Database, 43 - uri: AtUri, 44 - cid: CID, 45 - obj: BskyPostRecord, 46 - timestamp: string, 47 - ): Promise<IndexedReply | null> => { 48 - if (!obj.reply) { 49 - return null; 50 - } 51 - 52 - const sparkPost = await db.models.Post.findOne({ 53 - "crossposts.uri": obj.reply.root.uri, 54 - }); 55 - if (!sparkPost) { 56 - return null; 57 - } 58 - 59 - const mappedRoot = { 60 - uri: sparkPost.uri, 61 - cid: sparkPost.cid, 62 - }; 63 - 64 - let mappedParent = { 65 - uri: obj.reply.parent.uri, 66 - cid: obj.reply.parent.cid, 67 - }; 68 - 69 - if ( 70 - sparkPost.crossposts?.some((crosspost) => 71 - crosspost.uri === obj.reply?.parent.uri 72 - ) 73 - ) { 74 - mappedParent = { 75 - uri: sparkPost.uri, 76 - cid: sparkPost.cid, 77 - }; 78 - } else { 79 - const [parentReply, parentCrosspostReply] = await Promise.all([ 80 - db.models.Reply.findOne({ 81 - uri: obj.reply.parent.uri, 82 - }), 83 - db.models.CrosspostReply.findOne({ 84 - uri: obj.reply.parent.uri, 85 - }), 86 - ]); 87 - const parent = parentReply || parentCrosspostReply; 88 - if (parent) { 89 - mappedParent = { 90 - uri: parent.uri, 91 - cid: parent.cid, 92 - }; 93 - } 94 - } 95 - 96 - const reply = { 97 - uri: uri.toString(), 98 - cid: cid.toString(), 99 - authorDid: uri.host, 100 - text: obj.text || "", 101 - facets: obj.facets || [], 102 - reply: { 103 - root: mappedRoot, 104 - parent: mappedParent, 105 - }, 106 - langs: obj.langs || [], 107 - labels: obj.labels || null, 108 - tags: obj.tags || [], 109 - createdAt: obj.createdAt, 110 - indexedAt: timestamp, 111 - }; 112 - 113 - const insertedReply = await db.models.CrosspostReply.findOneAndUpdate( 114 - { uri: reply.uri }, 115 - { $set: reply }, 116 - { upsert: true, returnDocument: "after" }, 117 - ); 118 - 119 - const { invalidReplyRoot } = await validateCrosspostReply(db, insertedReply); 120 - if (invalidReplyRoot) { 121 - Object.assign(insertedReply, { invalidReplyRoot }); 122 - await db.models.CrosspostReply.updateOne( 123 - { uri: reply.uri }, 124 - { $set: { invalidReplyRoot } }, 125 - ); 126 - } 127 - 128 - const facets = (obj.facets || []) 129 - .flatMap((facet) => facet.features) 130 - .flatMap((feature) => { 131 - if (isMention(feature)) { 132 - return { 133 - type: "mention" as const, 134 - value: feature.did, 135 - }; 136 - } 137 - if (isLink(feature)) { 138 - return { 139 - type: "link" as const, 140 - value: feature.uri, 141 - }; 142 - } 143 - return []; 144 - }); 145 - 146 - const ancestors = await getAncestorsAndSelf(db, { 147 - uri: reply.uri, 148 - parentHeight: REPLY_NOTIF_DEPTH, 149 - }); 150 - const descendents = await getDescendents(db, { 151 - uri: reply.uri, 152 - depth: REPLY_NOTIF_DEPTH, 153 - }); 154 - 155 - return { 156 - reply: insertedReply, 157 - facets, 158 - media: {}, 159 - ancestors, 160 - descendents, 161 - }; 162 - }; 163 - 164 - const findDuplicate = (): AtUri | null => { 165 - return null; 166 - }; 167 - 168 - const notifsForInsert = (_obj: IndexedReply) => { 169 - return []; 170 - }; 171 - 172 - const deleteFn = async ( 173 - db: Database, 174 - uri: AtUri, 175 - ): Promise<IndexedReply | null> => { 176 - const uriStr = uri.toString(); 177 - const deleted = await db.models.CrosspostReply.findOneAndDelete({ 178 - uri: uriStr, 179 - }); 180 - 181 - if (!deleted) { 182 - return null; 183 - } 184 - 185 - return { 186 - reply: deleted, 187 - facets: [], 188 - }; 189 - }; 190 - 191 - const notifsForDelete = ( 192 - deleted: IndexedReply, 193 - replacedBy: IndexedReply | null, 194 - ) => { 195 - const notifs = replacedBy ? notifsForInsert(replacedBy) : []; 196 - return { 197 - notifs, 198 - toDelete: [deleted.reply.uri], 199 - }; 200 - }; 201 - 202 - const updateAggregates = async (db: Database, replyIdx: IndexedReply) => { 203 - if (replyIdx.reply.reply?.parent?.uri) { 204 - const parentPost = await db.models.Post.findOne({ 205 - uri: replyIdx.reply.reply?.parent.uri, 206 - }); 207 - const [ 208 - parentReply, 209 - parentCrosspostReply, 210 - nativeReplyCount, 211 - ] = await Promise.all([ 212 - db.models.Reply.findOne({ 213 - uri: replyIdx.reply.reply?.parent.uri, 214 - }), 215 - db.models.CrosspostReply.findOne({ 216 - uri: replyIdx.reply.reply?.parent.uri, 217 - }), 218 - db.models.Reply.countDocuments({ 219 - "reply.parent.uri": replyIdx.reply.reply.parent.uri, 220 - }), 221 - ]); 222 - const replyCount = nativeReplyCount; 223 - 224 - if (parentPost) { 225 - await db.models.Post.findOneAndUpdate( 226 - { uri: replyIdx.reply.reply?.parent.uri }, 227 - { $set: { replyCount } }, 228 - { returnDocument: "after" }, 229 - ); 230 - } else if (parentReply) { 231 - await db.models.Reply.findOneAndUpdate( 232 - { uri: replyIdx.reply.reply?.parent.uri }, 233 - { $set: { replyCount } }, 234 - { returnDocument: "after" }, 235 - ); 236 - } else if (parentCrosspostReply) { 237 - await db.models.CrosspostReply.findOneAndUpdate( 238 - { uri: replyIdx.reply.reply?.parent.uri }, 239 - { $set: { replyCount } }, 240 - { returnDocument: "after" }, 241 - ); 242 - } 243 - } 244 - }; 245 - 246 - export type PluginType = RecordProcessor<BskyPostRecord, IndexedReply>; 247 - 248 - export const makePlugin = ( 249 - db: Database, 250 - background: BackgroundQueue, 251 - ): PluginType => { 252 - return new RecordProcessor(db, background, { 253 - lexId, 254 - insertFn, 255 - findDuplicate, 256 - deleteFn, 257 - notifsForInsert, 258 - notifsForDelete, 259 - updateAggregates, 260 - }); 261 - }; 262 - 263 - export default makePlugin; 264 - 265 - async function validateCrosspostReply( 266 - db: Database, 267 - reply: CrosspostReplyDocument, 268 - ) { 269 - const parentUri = reply.reply?.parent?.uri; 270 - const rootUri = reply.reply?.root?.uri; 271 - if (!parentUri || !rootUri) { 272 - return { invalidReplyRoot: true }; 273 - } 274 - 275 - const [parentPost, parentReply, parentCrosspostReply] = await Promise.all([ 276 - db.models.Post.findOne({ uri: parentUri }).lean(), 277 - db.models.Reply.findOne({ uri: parentUri }).lean(), 278 - db.models.CrosspostReply.findOne({ uri: parentUri }).lean(), 279 - ]); 280 - const parent = parentReply || parentCrosspostReply; 281 - 282 - if (!parentPost && !parent) { 283 - return { invalidReplyRoot: true }; 284 - } 285 - 286 - if (parentPost) { 287 - return { 288 - invalidReplyRoot: parentUri !== rootUri, 289 - }; 290 - } 291 - 292 - return { 293 - invalidReplyRoot: !!parent?.invalidReplyRoot || 294 - parent?.reply?.root?.uri !== rootUri, 295 - }; 296 - }
+23 -4
data-plane/indexing/processor.ts
··· 30 30 replacedBy: S | null, 31 31 ) => { notifs: Notif[]; toDelete: string[] }; 32 32 updateAggregates?: (db: Database, obj: S) => Promise<void>; 33 + deleteRecordIfInsertReturnsNull?: boolean; 33 34 }; 34 35 35 36 type Notif = { ··· 156 157 obj, 157 158 timestamp, 158 159 ); 159 - if (inserted) { 160 - this.aggregateOnCommit(inserted); 161 - if (!opts?.disableNotifs) { 162 - this.handleNotifs({ inserted }); 160 + if (!inserted) { 161 + if (this.params.deleteRecordIfInsertReturnsNull) { 162 + await this.db.models.Record.deleteOne({ uri: uri.toString() }); 163 + await this.db.models.DuplicateRecord.deleteOne({ 164 + uri: uri.toString(), 165 + }); 163 166 } 167 + return; 168 + } 169 + 170 + this.aggregateOnCommit(inserted); 171 + if (!opts?.disableNotifs) { 172 + this.handleNotifs({ inserted }); 164 173 } 165 174 } 166 175 ··· 234 243 timestamp, 235 244 ); 236 245 if (!inserted) { 246 + if (this.params.deleteRecordIfInsertReturnsNull) { 247 + await this.db.models.Record.deleteOne({ uri: uri.toString() }); 248 + await this.db.models.DuplicateRecord.deleteOne({ 249 + uri: uri.toString(), 250 + }); 251 + if (!opts?.disableNotifs) { 252 + await this.handleNotifs({ deleted }); 253 + } 254 + return; 255 + } 237 256 throw new Error( 238 257 "Record update failed: removed from index but could not be replaced", 239 258 );
-191
data-plane/subscription/crosspost.ts
··· 1 - import { IdResolver } from "@atp/identity"; 2 - import { WriteOpAction } from "@atp/repo"; 3 - import { Event as FirehoseEvent, Firehose, MemoryRunner } from "@atp/sync"; 4 - import { BackgroundQueue } from "../background.ts"; 5 - import { Database } from "../db/index.ts"; 6 - import { IndexingService } from "../indexing/index.ts"; 7 - import { ServerConfig } from "../../config.ts"; 8 - import { PushService } from "../../utils/push.ts"; 9 - import { PushTokens } from "../routes/push-tokens.ts"; 10 - 11 - const CURSOR_STATE_IDENTIFIER = "crosspost_comments_cursor"; 12 - 13 - export class CrosspostRepoSubscription { 14 - firehose: Firehose; 15 - runner: MemoryRunner; 16 - background: BackgroundQueue; 17 - indexingSvc: IndexingService; 18 - pushService: PushService; 19 - private firehoseRunning = false; 20 - 21 - constructor( 22 - public opts: { 23 - db: Database; 24 - idResolver: IdResolver; 25 - startCursor?: number; 26 - cfg: ServerConfig; 27 - }, 28 - ) { 29 - const { db, idResolver, startCursor, cfg } = opts; 30 - this.background = new BackgroundQueue(db); 31 - 32 - const pushTokens = new PushTokens(db); 33 - this.pushService = new PushService(pushTokens, db, { 34 - enabled: cfg.pushEnabled, 35 - fcmServiceAccount: cfg.fcmServiceAccount, 36 - }); 37 - 38 - this.indexingSvc = new IndexingService( 39 - db, 40 - cfg, 41 - idResolver, 42 - this.background, 43 - this.pushService, 44 - ); 45 - 46 - const { runner, firehose } = createCrosspostFirehose({ 47 - idResolver, 48 - service: cfg.relayUrl, 49 - indexingSvc: this.indexingSvc, 50 - db, 51 - startCursor, 52 - }); 53 - this.runner = runner; 54 - this.firehose = firehose; 55 - } 56 - 57 - start() { 58 - console.info("Starting crosspost firehose subscription"); 59 - this.firehoseRunning = true; 60 - this.firehose.start(); 61 - } 62 - 63 - async restart() { 64 - await this.destroy(); 65 - 66 - const savedCursor = await this.opts.db.getCursorState( 67 - CURSOR_STATE_IDENTIFIER, 68 - ); 69 - const startCursor = savedCursor !== null ? savedCursor : undefined; 70 - 71 - const { runner, firehose } = createCrosspostFirehose({ 72 - idResolver: this.opts.idResolver, 73 - service: this.opts.cfg.relayUrl, 74 - indexingSvc: this.indexingSvc, 75 - db: this.opts.db, 76 - startCursor, 77 - }); 78 - this.runner = runner; 79 - this.firehose = firehose; 80 - this.start(); 81 - } 82 - 83 - async processAll() { 84 - await this.runner.processAll(); 85 - await this.background.processAll(); 86 - } 87 - 88 - async destroy() { 89 - try { 90 - if (this.firehoseRunning) { 91 - await this.firehose.destroy(); 92 - this.firehoseRunning = false; 93 - } 94 - console.info("Processing remaining runner tasks..."); 95 - if (this.opts.cfg.debugMode) { 96 - const timeoutMs = 10000; 97 - let destroyTimeoutId: number | undefined; 98 - try { 99 - const timeoutPromise = new Promise<never>((_, reject) => { 100 - destroyTimeoutId = setTimeout( 101 - () => reject(new Error(`Timeout after ${timeoutMs}ms`)), 102 - timeoutMs, 103 - ); 104 - }); 105 - await Promise.race([this.runner.destroy(), timeoutPromise]); 106 - } catch (e) { 107 - console.warn("Runner destroy timed out; continuing shutdown", { 108 - e, 109 - }); 110 - } finally { 111 - if (destroyTimeoutId !== undefined) { 112 - clearTimeout(destroyTimeoutId); 113 - destroyTimeoutId = undefined; 114 - } 115 - } 116 - 117 - let bgTimeoutId: number | undefined; 118 - try { 119 - const timeoutPromise = new Promise<never>((_, reject) => { 120 - bgTimeoutId = setTimeout( 121 - () => reject(new Error(`Timeout after ${timeoutMs}ms`)), 122 - timeoutMs, 123 - ); 124 - }); 125 - await Promise.race([this.background.processAll(), timeoutPromise]); 126 - } catch (e) { 127 - console.warn("Runner destroy timed out; continuing shutdown", { 128 - e, 129 - }); 130 - } finally { 131 - if (bgTimeoutId !== undefined) { 132 - clearTimeout(bgTimeoutId); 133 - bgTimeoutId = undefined; 134 - } 135 - } 136 - } else { 137 - await this.runner.processAll(); 138 - await this.background.processAll(); 139 - } 140 - } catch (error) { 141 - console.error("Error during subscription destroy", { error }); 142 - throw error; 143 - } 144 - } 145 - } 146 - 147 - function createCrosspostFirehose(opts: { 148 - idResolver: IdResolver; 149 - service?: string; 150 - indexingSvc: IndexingService; 151 - db: Database; 152 - startCursor?: number; 153 - }): { firehose: Firehose; runner: MemoryRunner } { 154 - const { idResolver, service, indexingSvc, db, startCursor } = opts; 155 - 156 - const runner = new MemoryRunner({ 157 - startCursor, 158 - setCursorInterval: 30000, 159 - setCursor: async (cursor: number) => { 160 - await db.saveCursorState(cursor, CURSOR_STATE_IDENTIFIER); 161 - console.info("Crosspost cursor saved to database", { cursor }); 162 - }, 163 - }); 164 - 165 - const firehose = new Firehose({ 166 - idResolver, 167 - runner, 168 - service, 169 - onError: (err: Error) => 170 - console.error("error in crosspost subscription", { err }), 171 - excludeAccount: true, 172 - excludeIdentity: true, 173 - excludeSync: true, 174 - handleEvent: async (evt: FirehoseEvent) => { 175 - if (evt.event === "create" || evt.event === "update") { 176 - await indexingSvc.indexRecord( 177 - evt.uri, 178 - evt.cid, 179 - evt.record, 180 - evt.event === "create" ? WriteOpAction.Create : WriteOpAction.Update, 181 - evt.time, 182 - ); 183 - } else if (evt.event === "delete") { 184 - await indexingSvc.deleteRecord(evt.uri); 185 - } 186 - }, 187 - filterCollections: ["app.bsky.feed.post"], 188 - }); 189 - 190 - return { firehose, runner }; 191 - }
-25
ingest/crosspost.ts
··· 1 - import { CrosspostRepoSubscription } from "../data-plane/subscription/crosspost.ts"; 2 - import { IdResolver } from "@atp/identity"; 3 - import { ServerConfig } from "../config.ts"; 4 - import { Database } from "../data-plane/db/index.ts"; 5 - 6 - const cfg = ServerConfig.readEnv(); 7 - 8 - const idResolver = new IdResolver({ plcUrl: cfg.plcUrl }); 9 - const db = new Database(cfg); 10 - db.connect(); 11 - 12 - const savedCursor = cfg.debugMode 13 - ? null 14 - : await db.getCursorState("crosspost_comments_cursor"); 15 - const startCursor = savedCursor !== null ? savedCursor : undefined; 16 - 17 - const sub = new CrosspostRepoSubscription({ 18 - cfg, 19 - db, 20 - idResolver, 21 - startCursor, 22 - }); 23 - 24 - sub.start(); 25 - console.info("Crosspost subscription started");
+49 -44
tests/crosspost_thread_test.ts
··· 204 204 const secondRes = await app.request( 205 205 `/xrpc/so.sprk.feed.getCrosspostThread?anchor=${ 206 206 encodeURIComponent(parentUri) 207 - }&depth=5&parentHeight=5&sort=oldest&limit=2&cursor=${ 208 - firstBody.cursor 209 - }`, 207 + }&depth=5&parentHeight=5&sort=oldest&limit=2&cursor=${firstBody.cursor}`, 210 208 ); 211 209 assertEquals(secondRes.status, 200); 212 210 const secondBody = await secondRes.json() as OutputSchema; ··· 218 216 const thirdRes = await app.request( 219 217 `/xrpc/so.sprk.feed.getCrosspostThread?anchor=${ 220 218 encodeURIComponent(parentUri) 221 - }&depth=5&parentHeight=5&sort=oldest&limit=2&cursor=${ 222 - secondBody.cursor 223 - }`, 219 + }&depth=5&parentHeight=5&sort=oldest&limit=2&cursor=${secondBody.cursor}`, 224 220 ); 225 221 assertEquals(thirdRes.status, 200); 226 222 const thirdBody = await thirdRes.json() as OutputSchema; ··· 295 291 ); 296 292 }); 297 293 298 - await t.step("hides taken-down thread records for standard viewers", async () => { 299 - await ctx.db.models.Record.create({ 300 - uri: reply4Uri, 301 - cid: reply4Cid, 302 - did: TEST_USERS[3].did, 303 - collectionName: "app.bsky.feed.post", 304 - rkey: "cross4", 305 - createdAt: time4, 306 - indexedAt: time4, 307 - json: JSON.stringify({ 308 - $type: "app.bsky.feed.post", 309 - text: "reply-4", 294 + await t.step( 295 + "hides taken-down thread records for standard viewers", 296 + async () => { 297 + await ctx.db.models.Record.create({ 298 + uri: reply4Uri, 299 + cid: reply4Cid, 300 + did: TEST_USERS[3].did, 301 + collectionName: "app.bsky.feed.post", 302 + rkey: "cross4", 310 303 createdAt: time4, 311 - }), 312 - takedownRef: "TAKEDOWN", 313 - }); 304 + indexedAt: time4, 305 + json: JSON.stringify({ 306 + $type: "app.bsky.feed.post", 307 + text: "reply-4", 308 + createdAt: time4, 309 + }), 310 + takedownRef: "TAKEDOWN", 311 + }); 314 312 315 - const res = await app.request( 316 - `/xrpc/so.sprk.feed.getCrosspostThread?anchor=${ 317 - encodeURIComponent(parentUri) 318 - }&depth=5&parentHeight=5&sort=oldest&limit=50`, 319 - ); 320 - assertEquals(res.status, 200); 321 - const body = await res.json() as OutputSchema; 322 - assertEquals(body.thread.some((item) => item.uri === reply4Uri), false); 323 - }); 313 + const res = await app.request( 314 + `/xrpc/so.sprk.feed.getCrosspostThread?anchor=${ 315 + encodeURIComponent(parentUri) 316 + }&depth=5&parentHeight=5&sort=oldest&limit=50`, 317 + ); 318 + assertEquals(res.status, 200); 319 + const body = await res.json() as OutputSchema; 320 + assertEquals( 321 + body.thread.some((item) => item.uri === reply4Uri), 322 + false, 323 + ); 324 + }, 325 + ); 324 326 325 - await t.step("stops on cyclic ancestors and keeps anchor at depth 0", async () => { 326 - const res = await app.request( 327 - `/xrpc/so.sprk.feed.getCrosspostThread?anchor=${ 328 - encodeURIComponent(cycleAUri) 329 - }&depth=0&parentHeight=10&sort=oldest&limit=50`, 330 - ); 331 - assertEquals(res.status, 200); 327 + await t.step( 328 + "stops on cyclic ancestors and keeps anchor at depth 0", 329 + async () => { 330 + const res = await app.request( 331 + `/xrpc/so.sprk.feed.getCrosspostThread?anchor=${ 332 + encodeURIComponent(cycleAUri) 333 + }&depth=0&parentHeight=10&sort=oldest&limit=50`, 334 + ); 335 + assertEquals(res.status, 200); 332 336 333 - const body = await res.json() as OutputSchema; 334 - assertEquals(body.thread.length, 2); 335 - assertEquals(body.thread[0].uri, cycleBUri); 336 - assertEquals(body.thread[0].depth, -1); 337 - assertEquals(body.thread[1].uri, cycleAUri); 338 - assertEquals(body.thread[1].depth, 0); 339 - }); 337 + const body = await res.json() as OutputSchema; 338 + assertEquals(body.thread.length, 2); 339 + assertEquals(body.thread[0].uri, cycleBUri); 340 + assertEquals(body.thread[0].depth, -1); 341 + assertEquals(body.thread[1].uri, cycleAUri); 342 + assertEquals(body.thread[1].depth, 0); 343 + }, 344 + ); 340 345 } finally { 341 346 await cleanup(); 342 347 }