[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

db models cleanup and search upgrade

+378 -251
+113 -95
api/so/sprk/feed/searchPosts.ts
··· 1 - import { Server } from "../../../../lex/index.ts"; 1 + import { mapDefined } from "@atp/common"; 2 + import { ServerConfig } from "../../../../config.ts"; 2 3 import { AppContext } from "../../../../context.ts"; 3 - import { transformPostsToPostViews } from "../../../../utils/post-transformer.ts"; 4 - import * as SoSprkFeedDefs from "../../../../lex/types/so/sprk/feed/defs.ts"; 5 - import { OutputSchema } from "../../../../lex/types/so/sprk/feed/searchPosts.ts"; 6 - import { RootFilterQuery } from "mongoose"; 7 - import { PostDocument } from "../../../../data-plane/db/models.ts"; 8 - 9 - // Helper to escape user input for safe RegExp usage 10 - function escapeRegExp(str: string): string { 11 - return str.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); 12 - } 4 + import { DataPlane } from "../../../../data-plane/index.ts"; 5 + import { 6 + parsePostSearchQuery, 7 + PostSearchQuery, 8 + } from "../../../../data-plane/util.ts"; 9 + import { HydrateCtx, Hydrator } from "../../../../hydration/index.ts"; 10 + import { parseString } from "../../../../hydration/util.ts"; 11 + import { Server } from "../../../../lex/index.ts"; 12 + import { QueryParams } from "../../../../lex/types/so/sprk/feed/searchPosts.ts"; 13 + import { 14 + createPipeline, 15 + HydrationFnInput, 16 + PresentationFnInput, 17 + RulesFnInput, 18 + SkeletonFnInput, 19 + } from "../../../../pipeline.ts"; 20 + import { uriToDid as creatorFromUri } from "../../../../utils/uris.ts"; 21 + import { Views } from "../../../../views/index.ts"; 22 + import { resHeaders } from "../../../util.ts"; 13 23 14 24 export default function (server: Server, ctx: AppContext) { 25 + const searchPosts = createPipeline( 26 + skeleton, 27 + hydration, 28 + noBlocksOrTagged, 29 + presentation, 30 + ); 15 31 server.so.sprk.feed.searchPosts({ 16 32 auth: ctx.authVerifier.standardOptional, 17 - handler: async ({ params, auth }) => { 18 - const { q, limit, cursor, sort } = params; 19 - const userDid = auth.credentials.type === "standard" 20 - ? auth.credentials.iss 21 - : undefined; 33 + handler: async ({ auth, params }) => { 34 + const { viewer } = ctx.authVerifier.parseCreds(auth); 35 + const hydrateCtx = ctx.hydrator.createContext({ viewer }); 36 + const results = await searchPosts( 37 + { ...params, hydrateCtx }, 38 + ctx, 39 + ); 40 + return { 41 + encoding: "application/json", 42 + body: results, 43 + headers: resHeaders({}), 44 + }; 45 + }, 46 + }); 47 + } 22 48 23 - let skip = 0; 24 - if (cursor) { 25 - const parsedCursor = parseInt(cursor, 10); 26 - if (!isNaN(parsedCursor) && parsedCursor > 0) { 27 - skip = parsedCursor; 28 - } 29 - } 49 + const skeleton = async (inputs: SkeletonFnInput<Context, Params>) => { 50 + const { ctx, params } = inputs; 51 + const parsedQuery = parsePostSearchQuery(params.q); 30 52 31 - const escapedQuery = escapeRegExp(q.trim()); 32 - const regex = new RegExp(escapedQuery, "i"); 53 + const res = await ctx.dataplane.search.searchPosts( 54 + params.q, 55 + params.limit, 56 + params.cursor, 57 + ); 58 + return { 59 + posts: res.uris, 60 + cursor: parseString(res.cursor), 61 + parsedQuery, 62 + }; 63 + }; 33 64 34 - const query: RootFilterQuery<PostDocument> = { 35 - reply: { $eq: null }, 36 - $or: [ 37 - { text: regex }, 38 - { "embed.images.alt": regex }, 39 - { "embed.alt": regex }, 40 - ], 41 - }; 65 + const hydration = async ( 66 + inputs: HydrationFnInput<Context, Params, Skeleton>, 67 + ) => { 68 + const { ctx, params, skeleton } = inputs; 69 + return await ctx.hydrator.hydratePosts( 70 + skeleton.posts.map((uri) => ({ uri })), 71 + params.hydrateCtx, 72 + undefined, 73 + ); 74 + }; 42 75 43 - let posts; 76 + const noBlocksOrTagged = (inputs: RulesFnInput<Context, Params, Skeleton>) => { 77 + const { ctx, params, skeleton, hydration } = inputs; 44 78 45 - if (sort === "top") { 46 - // For 'top', we use aggregation to count likes and sort by popularity 47 - posts = await ctx.db.models.Post.aggregate([ 48 - { $match: query }, 49 - { 50 - $lookup: { 51 - from: "likes", 52 - localField: "uri", 53 - foreignField: "subject", 54 - as: "likes", 55 - pipeline: [ 56 - { $project: { _id: 1 } }, // Only fetch _id for counting 57 - ], 58 - }, 59 - }, 60 - { 61 - $addFields: { 62 - likeCount: { $size: "$likes" }, 63 - }, 64 - }, 65 - { 66 - $sort: { 67 - likeCount: -1, 68 - createdAt: -1, // Secondary sort by creation date 69 - }, 70 - }, 71 - { $skip: skip }, 72 - { $limit: limit }, 73 - { 74 - $project: { 75 - likes: 0, // Remove the likes array from the result 76 - }, 77 - }, 78 - ]); 79 - } else { 80 - // For 'latest' or default sorting 81 - const sortOrder: Record<string, 1 | -1> = { createdAt: -1 }; 79 + skeleton.posts = skeleton.posts.filter((uri) => { 80 + const post = hydration.posts?.get(uri); 81 + if (!post) return; 82 82 83 - posts = await ctx.db.models.Post.find(query) 84 - .sort(sortOrder) 85 - .skip(skip) 86 - .limit(limit) 87 - .lean(); 88 - } 83 + const creator = creatorFromUri(uri); 84 + const isPostByViewer = creator === params.hydrateCtx.viewer; 89 85 90 - const postViews = await transformPostsToPostViews(posts, ctx, userDid); 86 + // Cases to always show. 87 + if (isPostByViewer) return true; 91 88 92 - const filteredPostViews = postViews.filter( 93 - (v: SoSprkFeedDefs.PostView | null): v is SoSprkFeedDefs.PostView => 94 - v !== null, 95 - ); 89 + // Cases to never show. 90 + if (ctx.views.viewerBlockExists(creator, hydration)) return false; 91 + return true; 92 + }); 93 + return skeleton; 94 + }; 96 95 97 - let nextCursor: string | undefined; 98 - if (filteredPostViews.length === limit) { 99 - nextCursor = (skip + limit).toString(); 100 - } 96 + const presentation = ( 97 + inputs: PresentationFnInput<Context, Params, Skeleton>, 98 + ) => { 99 + const { ctx, skeleton, hydration } = inputs; 100 + const posts = mapDefined(skeleton.posts, (uri) => { 101 + const post = hydration.posts?.get(uri); 102 + if (!post) return; 101 103 102 - return { 103 - encoding: "application/json", 104 - body: { 105 - posts: filteredPostViews, 106 - cursor: nextCursor, 107 - } satisfies OutputSchema, 108 - }; 109 - }, 104 + return ctx.views.post(uri, hydration); 110 105 }); 111 - } 106 + return { 107 + posts, 108 + cursor: skeleton.cursor, 109 + hitsTotal: skeleton.hitsTotal, 110 + }; 111 + }; 112 + 113 + type Context = { 114 + cfg: ServerConfig; 115 + dataplane: DataPlane; 116 + hydrator: Hydrator; 117 + views: Views; 118 + }; 119 + 120 + type Params = QueryParams & { 121 + hydrateCtx: HydrateCtx; 122 + }; 123 + 124 + type Skeleton = { 125 + posts: string[]; 126 + hitsTotal?: number; 127 + cursor?: string; 128 + parsedQuery: PostSearchQuery; 129 + };
+81 -84
data-plane/db/models.ts
··· 1 1 import { Document, Model, Schema } from "mongoose"; 2 2 3 + interface RecordRef { 4 + uri: string; 5 + cid: string; 6 + } 7 + 3 8 // Plugin for adding author DID population to schemas 4 9 function addAuthor(schema: Schema) { 5 10 // Only add if schema has authorDid field ··· 39 44 $type: string; 40 45 ref: { $link: string }; 41 46 } 42 - 43 47 export interface ImageMedia extends MediaRef { 44 48 alt: string; 45 49 aspectRatio: { ··· 47 51 height: number; 48 52 }; 49 53 } 50 - 51 54 export interface VideoMedia extends MediaRef { 52 55 alt: string; 53 56 aspectRatio: { ··· 55 58 height: number; 56 59 }; 57 60 } 58 - 59 61 interface Label { 60 62 src: string; 61 63 uri: string; ··· 63 65 val: string; 64 66 neg: boolean; 65 67 } 66 - 67 68 interface Facet { 68 69 index: { 69 70 byteStart: number; ··· 76 77 tag?: string; 77 78 }>; 78 79 } 80 + export interface PostMedia { 81 + $type: string; 82 + video?: VideoMedia; 83 + images?: ImageMedia[]; 84 + } 85 + export interface StoryMedia { 86 + $type: string; 87 + video?: VideoMedia; 88 + image?: ImageMedia; 89 + } 90 + export interface Caption { 91 + text: string; 92 + facets?: Facet[]; 93 + } 94 + 95 + // records 79 96 80 97 export interface RecordDocument extends Document { 81 98 uri: string; ··· 90 107 takedownRef: string; 91 108 invalidReplyRoot?: boolean; 92 109 } 93 - 94 110 export const recordSchema = new Schema<RecordDocument>({ 95 111 uri: { type: String, required: true, unique: true, index: true }, 96 112 cid: { type: String, required: true }, ··· 105 121 invalidReplyRoot: { type: Boolean, required: false }, 106 122 }); 107 123 124 + // duplicate records 125 + 108 126 export interface DuplicateRecordDocument extends Document { 109 127 uri: string; 110 128 cid: string; 111 129 duplicateOf: string; 112 130 indexedAt: string; 113 131 } 114 - 115 132 export const duplicateRecordSchema = new Schema<DuplicateRecordDocument>({ 116 133 uri: { type: String, required: true, unique: true, index: true }, 117 134 cid: { type: String, required: true }, ··· 119 136 indexedAt: { type: String, required: true }, 120 137 }); 121 138 139 + // actor sync 140 + 122 141 export interface ActorSyncDocument extends Document { 123 142 did: string; 124 143 commitCid: string; 125 144 repoRev: string | null; 126 145 } 127 - 128 146 export const actorSyncSchema = new Schema<ActorSyncDocument>({ 129 147 did: { type: String, required: true, unique: true, index: true }, 130 148 commitCid: { type: String, required: true }, 131 149 repoRev: { type: String, required: false, default: null }, 132 150 }); 133 151 152 + // likes 153 + 134 154 export interface LikeDocument extends AuthoredDocument { 135 155 subject: string; 136 156 subjectCid: string; 137 157 via?: string | null; 138 158 viaCid?: string | null; 139 159 } 140 - 141 160 export const likeSchema = new Schema<LikeDocument>({ 142 161 ...authoredSchema, 143 162 subject: { type: String, required: true, index: true }, 144 163 subjectCid: { type: String, required: true }, 145 164 via: { type: String, required: false }, 146 165 viaCid: { type: String, required: false }, 147 - }); 166 + }) 167 + .index({ authorDid: 1, subject: 1 }, { unique: true }) 168 + .index({ subject: 1, createdAt: -1 }); 169 + 170 + // follows 148 171 149 172 export interface FollowDocument extends AuthoredDocument { 150 173 subject: string; 151 174 } 152 - 153 175 export const followSchema = new Schema<FollowDocument>({ 154 176 ...authoredSchema, 155 177 subject: { type: String, required: true, index: true }, 156 - }); 178 + }) 179 + .index({ authorDid: 1, subject: 1 }, { unique: true }) 180 + .index({ subject: 1, createdAt: -1 }); 181 + 182 + // blocks 157 183 158 184 export interface BlockDocument extends AuthoredDocument { 159 185 subject: string; ··· 162 188 export const blockSchema = new Schema<BlockDocument>({ 163 189 ...authoredSchema, 164 190 subject: { type: String, required: true, index: true }, 165 - }); 191 + }) 192 + .index({ authorDid: 1, subject: 1 }, { unique: true }) 193 + .index({ subject: 1, createdAt: -1 }); 166 194 167 - interface RecordRef { 168 - uri: string; 169 - cid: string; 170 - } 195 + // profiles 171 196 172 197 export interface ProfileDocument extends AuthoredDocument { 173 198 displayName?: string; ··· 180 205 followersCount: number; 181 206 followsCount: number; 182 207 } 183 - 184 208 export const profileSchema = new Schema<ProfileDocument>({ 185 209 ...authoredSchema, 186 210 displayName: { type: String, required: false }, ··· 192 216 postsCount: { type: Number, required: true, default: 0 }, 193 217 followersCount: { type: Number, required: true, default: 0 }, 194 218 followsCount: { type: Number, required: true, default: 0 }, 195 - }); 219 + }) 220 + .index({ displayName: "text", description: "text" }); 196 221 197 - // Add text index for profile search 198 - profileSchema.index({ 199 - displayName: "text", 200 - description: "text", 201 - }); 222 + // audio 202 223 203 224 export interface AudioDocument extends AuthoredDocument { 204 225 sound: MediaRef; ··· 211 232 labels?: Label[]; 212 233 useCount: number; 213 234 } 214 - 215 235 export const audioSchema = new Schema<AudioDocument>({ 216 236 ...authoredSchema, 217 237 sound: { type: Object, required: true }, ··· 220 240 details: { type: Object, required: false }, 221 241 labels: { type: [Object], required: false }, 222 242 useCount: { type: Number, required: true, default: 0 }, 223 - }); 243 + }) 244 + .index({ authorDid: 1, createdAt: -1 }) 245 + .index({ useCount: -1, createdAt: -1 }); 246 + 247 + // reposts 224 248 225 249 export interface RepostDocument extends AuthoredDocument { 226 - subject: RecordRef; 250 + subject: string; 251 + subjectCid: string; 227 252 via?: string | null; 228 253 viaCid?: string | null; 229 254 } 230 - 231 255 export const repostSchema = new Schema<RepostDocument>({ 232 256 ...authoredSchema, 233 - subject: { type: Object, required: true }, 257 + subject: { type: String, required: true }, 258 + subjectCid: { type: String, required: true }, 234 259 via: { type: String, required: false }, 235 260 viaCid: { type: String, required: false }, 236 - }); 261 + }) 262 + .index({ subject: 1, createdAt: -1 }) 263 + .index({ authorDid: 1, createdAt: -1 }); 237 264 238 - export interface PostMedia { 239 - $type: string; 240 - video?: VideoMedia; 241 - images?: ImageMedia[]; 242 - } 243 - 244 - export interface StoryMedia { 245 - $type: string; 246 - video?: VideoMedia; 247 - image?: ImageMedia; 248 - } 249 - 250 - export interface Caption { 251 - text: string; 252 - facets?: Facet[]; 253 - } 265 + // posts 254 266 255 267 export interface PostDocument extends AuthoredDocument { 256 268 caption?: Caption; ··· 263 275 replyCount: number; 264 276 repostCount: number; 265 277 } 266 - 267 278 export const postSchema = new Schema<PostDocument>({ 268 279 ...authoredSchema, 269 280 caption: { ··· 287 298 likeCount: { type: Number, required: true, default: 0 }, 288 299 replyCount: { type: Number, required: true, default: 0 }, 289 300 repostCount: { type: Number, required: true, default: 0 }, 290 - }); 301 + }) 302 + .index({ authorDid: 1, createdAt: -1 }) 303 + .index({ tags: 1, createdAt: -1 }); 291 304 292 - // Compound indexes for more efficient queries 293 - postSchema.index({ authorDid: 1, createdAt: -1 }); 294 - postSchema.index({ tags: 1, createdAt: -1 }); 305 + // replies 295 306 296 307 export interface ReplyDocument extends AuthoredDocument { 297 308 text?: string; ··· 306 317 likeCount: number; 307 318 replyCount: number; 308 319 } 309 - 310 320 export const replySchema = new Schema<ReplyDocument>({ 311 321 ...authoredSchema, 312 322 text: { type: String, required: false }, ··· 329 339 labels: { type: [Object], required: false, default: [] }, 330 340 likeCount: { type: Number, required: true, default: 0 }, 331 341 replyCount: { type: Number, required: true, default: 0 }, 332 - }); 342 + }) 343 + .index({ reply: 1, createdAt: -1 }); 333 344 334 - replySchema.index({ reply: 1, createdAt: -1 }); 345 + // stories 335 346 336 347 export interface StoryDocument extends AuthoredDocument { 337 348 media: StoryMedia; 338 349 sound?: RecordRef; 339 350 labels?: Label[]; 340 351 } 341 - 342 352 export const storySchema = new Schema<StoryDocument>({ 343 353 ...authoredSchema, 344 354 media: { type: Object, required: true }, ··· 350 360 required: false, 351 361 }, 352 362 labels: { type: [Object], required: false, default: [] }, 353 - }); 354 - 355 - storySchema.index({ authorDid: 1, createdAt: -1 }); 356 - storySchema.index({ tags: 1, createdAt: -1 }); 357 - 358 - followSchema.index({ authorDid: 1, subject: 1 }); 359 - followSchema.index({ subject: 1, createdAt: -1 }); 360 - followSchema.index({ type: 1, createdAt: -1 }); 363 + }) 364 + .index({ authorDid: 1, createdAt: -1 }); 361 365 362 - blockSchema.index({ authorDid: 1, subject: 1 }); 363 - blockSchema.index({ subject: 1, createdAt: -1 }); 364 - 365 - audioSchema.index({ authorDid: 1, createdAt: -1 }); 366 - audioSchema.index({ useCount: -1, createdAt: -1 }); 367 - repostSchema.index({ authorDid: 1, createdAt: -1 }); 368 - repostSchema.index({ "subject.uri": 1, createdAt: -1 }); 366 + // generators 369 367 370 368 export interface GeneratorDocument extends AuthoredDocument { 371 369 displayName: string; ··· 376 374 labels?: Label[]; 377 375 likeCount: number; 378 376 } 379 - 380 377 export const generatorSchema = new Schema<GeneratorDocument>({ 381 378 ...authoredSchema, 382 379 displayName: { type: String, required: true }, ··· 386 383 acceptsInteractions: { type: Boolean, required: false }, 387 384 labels: { type: [Object], required: false }, 388 385 likeCount: { type: Number, required: false, default: 0 }, 389 - }); 386 + }) 387 + .index({ authorDid: 1, createdAt: -1 }); 390 388 391 - // Add compound indexes for Generator 392 - generatorSchema.index({ authorDid: 1, createdAt: -1 }); 389 + // takedowns 393 390 394 391 export interface TakedownDocument extends Document { 395 392 targetUri: string; ··· 400 397 ref: string | null; 401 398 applied: boolean; 402 399 } 403 - 404 400 export const takedownSchema = new Schema<TakedownDocument>({ 405 401 targetUri: { type: String, required: true, unique: true, index: true }, 406 402 targetCid: { type: String, required: true }, ··· 411 407 applied: { type: Boolean, required: true, default: false }, 412 408 }); 413 409 414 - // Repository takedown schema 410 + // repo takedowns 411 + 415 412 export interface RepoTakedownDocument extends Document { 416 413 did: string; 417 414 reason: string; ··· 420 417 ref: string | null; 421 418 applied: boolean; 422 419 } 423 - 424 420 export const repoTakedownSchema = new Schema<RepoTakedownDocument>({ 425 421 did: { type: String, required: true, unique: true, index: true }, 426 422 reason: { type: String, required: true }, ··· 430 426 applied: { type: Boolean, required: true, default: false }, 431 427 }); 432 428 433 - // Blob takedown schema 429 + // blobs takedowns 430 + 434 431 export interface BlobTakedownDocument extends Document { 435 432 did: string; 436 433 cid: string; ··· 440 437 ref: string | null; 441 438 applied: boolean; 442 439 } 443 - 444 440 export const blobTakedownSchema = new Schema<BlobTakedownDocument>({ 445 441 did: { type: String, required: true, index: true }, 446 442 cid: { type: String, required: true, index: true }, ··· 449 445 takenDownAt: { type: String, required: true }, 450 446 ref: { type: String, required: false, default: null }, 451 447 applied: { type: Boolean, required: true, default: false }, 452 - }); 448 + }) 449 + .index({ did: 1, cid: 1 }, { unique: true }); 453 450 454 - // Ensure compound index on did + cid for blob takedowns 455 - blobTakedownSchema.index({ did: 1, cid: 1 }, { unique: true }); 451 + // actors 456 452 457 453 export interface ActorDocument extends Document { 458 454 did: string; ··· 463 459 keys: string[]; 464 460 services: string; 465 461 } 466 - 467 462 export const actorSchema = new Schema<ActorDocument>({ 468 463 did: { type: String, required: true, unique: true, index: true }, 469 464 handle: { type: String, required: false, index: true }, ··· 473 468 keys: { type: [String], required: true }, 474 469 services: { type: String, required: true }, 475 470 }); 471 + 472 + // preferences 476 473 477 474 export interface PreferenceDocument extends Document { 478 475 userDid: string; ··· 531 528 createdAt: string; 532 529 updatedAt: string; 533 530 } 534 - 535 531 export const preferenceSchema = new Schema<PreferenceDocument>({ 536 532 userDid: { type: String, required: true, unique: true, index: true }, 537 533 contentLabelPrefs: { type: [Object], required: false }, ··· 548 544 updatedAt: { type: String, required: true }, 549 545 }); 550 546 547 + // cursor state 548 + 551 549 export interface CursorStateDocument extends Document { 552 550 identifier: string; // To ensure a single document, e.g., 'last_processed_cursor' 553 551 cursorValue: number; 554 552 updatedAt: Date; 555 553 } 556 - 557 554 export const cursorStateSchema = new Schema<CursorStateDocument>({ 558 555 identifier: { type: String, required: true, unique: true, index: true }, 559 556 cursorValue: { type: Number, required: true },
+3
data-plane/index.ts
··· 16 16 import { Sync } from "./routes/sync.ts"; 17 17 import { Threads } from "./routes/threads.ts"; 18 18 import { Preferences } from "./routes/preferences.ts"; 19 + import { Search } from "./routes/search.ts"; 19 20 20 21 export { RepoSubscription } from "./subscription.ts"; 21 22 ··· 45 46 public sync: Sync; 46 47 public threads: Threads; 47 48 public preferences: Preferences; 49 + public search: Search; 48 50 49 51 constructor( 50 52 db: Database, ··· 70 72 this.sync = new Sync(db); 71 73 this.threads = new Threads(db); 72 74 this.preferences = new Preferences(db); 75 + this.search = new Search(db); 73 76 } 74 77 }
+7 -9
data-plane/indexing/plugins/repost.ts
··· 25 25 uri: uri.toString(), 26 26 cid: cid.toString(), 27 27 authorDid: uri.host, 28 - subject: { 29 - uri: obj.subject.uri, 30 - cid: obj.subject.cid, 31 - }, 28 + subject: obj.subject.uri, 29 + subjectCid: obj.subject.cid, 32 30 via, 33 31 viaCid, 34 32 createdAt: normalizeDatetimeAlways(obj.createdAt), ··· 51 49 ): Promise<AtUri | null> => { 52 50 const found = await db.models.Repost.findOne({ 53 51 authorDid: uri.host, 54 - "subject.uri": obj.subject.uri, 52 + subject: obj.subject.uri, 55 53 }).lean(); 56 54 return found ? new AtUri(found.uri) : null; 57 55 }; 58 56 59 57 const notifsForInsert = (obj: IndexedRepost) => { 60 - const subjectUri = new AtUri(obj.subject.uri); 58 + const subjectUri = new AtUri(obj.subject); 61 59 // prevent self-notifications 62 60 const isRepostFromSubjectUser = subjectUri.host === obj.authorDid; 63 61 if (isRepostFromSubjectUser) { ··· 128 126 129 127 const updateAggregates = async (db: Database, repost: IndexedRepost) => { 130 128 const repostCount = await db.models.Repost.countDocuments({ 131 - "subject.uri": repost.subject.uri, 129 + subject: repost.subject, 132 130 }); 133 131 134 132 const existingPost = await db.models.Post.findOne({ 135 - uri: repost.subject.uri, 133 + uri: repost.subject, 136 134 }); 137 135 138 136 if (existingPost) { 139 137 await db.models.Post.findOneAndUpdate( 140 - { uri: repost.subject.uri }, 138 + { uri: repost.subject }, 141 139 { $set: { repostCount } }, 142 140 { new: true }, 143 141 );
+5 -55
data-plane/routes/feeds.ts
··· 104 104 const followedDids = follows.map((f) => f.subject); 105 105 const timelineDids = [...followedDids, actorDid]; 106 106 107 - const fetchLimit = limit * 2; 108 - 109 107 // Get timeline posts 110 108 const postsQuery = this.db.models.Post.find({ 111 109 authorDid: { $in: timelineDids }, 112 110 }); 113 111 114 - // Get timeline reposts 115 - const repostsQuery = this.db.models.Repost.find({ 116 - authorDid: { $in: timelineDids }, 117 - }); 118 - 119 112 // Apply pagination using createdAt + cid (which matches DB schema and indexes) 120 113 const paginatedPostsQuery = this.timeCidKeyset.paginate(postsQuery, { 121 - limit: fetchLimit, 114 + limit, 122 115 cursor, 123 116 direction: "desc", 124 117 }); 125 118 126 - const paginatedRepostsQuery = this.timeCidKeyset.paginate( 127 - repostsQuery, 128 - { 129 - limit: fetchLimit, 130 - cursor, 131 - direction: "desc", 132 - }, 133 - ); 119 + const posts = await paginatedPostsQuery.exec(); 134 120 135 - // Fetch both in parallel 136 - const [posts, reposts] = await Promise.all([ 137 - paginatedPostsQuery.exec(), 138 - paginatedRepostsQuery.exec(), 139 - ]); 140 - 141 - // Transform and combine results 121 + // Transform posts 142 122 const transformedPosts: FeedItem[] = posts.map((p) => ({ 143 123 uri: p.uri, 144 124 cid: p.cid, ··· 148 128 sortAt: compositeTime(p.createdAt, p.indexedAt) || p.createdAt, 149 129 })); 150 130 151 - const transformedReposts: FeedItem[] = reposts.map((r) => ({ 152 - uri: r.subject?.uri || r.uri, 153 - cid: r.cid, 154 - authorDid: r.authorDid, 155 - createdAt: r.createdAt, 156 - type: "repost" as const, 157 - repostUri: r.uri, 158 - sortAt: compositeTime(r.createdAt, r.indexedAt) || r.createdAt, 159 - })); 160 - 161 - // Combine and sort all items 162 - const allItems = [...transformedPosts, ...transformedReposts] 163 - .sort((a, b) => { 164 - if (a.createdAt !== b.createdAt) { 165 - return a.createdAt > b.createdAt ? -1 : 1; 166 - } 167 - return a.cid > b.cid ? -1 : 1; 168 - }) 169 - .slice(0, limit); 170 - 171 - // Generate cursor from the last item if we have a full page 172 - let nextCursor: string | undefined; 173 - if (allItems.length >= limit) { 174 - const lastItem = allItems[allItems.length - 1]; 175 - nextCursor = this.timeCidKeyset.pack({ 176 - primary: lastItem.createdAt, 177 - secondary: lastItem.cid, 178 - }); 179 - } 180 - 181 131 return { 182 - items: allItems.map(feedItemFromRow), 183 - cursor: nextCursor, 132 + items: transformedPosts.map(feedItemFromRow), 133 + cursor: this.timeCidKeyset.packFromResult(transformedPosts), 184 134 }; 185 135 } 186 136 }
+3 -3
data-plane/routes/reposts.ts
··· 21 21 22 22 // Build query for reposts of this subject 23 23 const repostsQuery = this.db.models.Repost.find({ 24 - "subject.uri": subject.uri, 24 + subject: subject.uri, 25 25 }); 26 26 27 27 // Apply pagination using TimeCidKeyset ··· 61 61 const subjectUris = refs.map(({ uri }) => uri); 62 62 const reposts = await this.db.models.Repost.find({ 63 63 authorDid: actorDid, 64 - "subject.uri": { $in: subjectUris }, 64 + subject: { $in: subjectUris }, 65 65 }); 66 66 67 67 // Create a map for quick lookup 68 - const repostMap = new Map(reposts.map((r) => [r.subject.uri, r.uri])); 68 + const repostMap = new Map(reposts.map((r) => [r.subject, r.uri])); 69 69 const uris = refs.map(({ uri }) => repostMap.get(uri) || ""); 70 70 71 71 return { uris };
+110
data-plane/routes/search.ts
··· 1 + import { Database } from "../db/index.ts"; 2 + import { IndexedAtDidKeyset, TimeCidKeyset } from "../db/pagination.ts"; 3 + import { parsePostSearchQuery } from "../util.ts"; 4 + import { compositeTime } from "./records.ts"; 5 + 6 + // Remove leading @ in case a handle is input that way 7 + const cleanQuery = (query: string) => query.trim().replace(/^@/g, ""); 8 + 9 + export class Search { 10 + private db: Database; 11 + private indexedAtDidKeyset: IndexedAtDidKeyset; 12 + private timeCidKeyset: TimeCidKeyset; 13 + 14 + constructor(db: Database) { 15 + this.db = db; 16 + this.indexedAtDidKeyset = new IndexedAtDidKeyset(); 17 + this.timeCidKeyset = new TimeCidKeyset(); 18 + } 19 + 20 + // @TODO actor search endpoints still fall back to search service 21 + async searchActors(term: string, limit = 50, cursor?: string) { 22 + const cleanedTerm = cleanQuery(term); 23 + const regex = new RegExp(cleanedTerm, "i"); 24 + 25 + const actorsQuery = this.db.models.Actor.find({ 26 + handle: { $regex: regex }, 27 + }); 28 + 29 + const paginatedQuery = this.indexedAtDidKeyset.paginate(actorsQuery, { 30 + limit, 31 + cursor, 32 + direction: "desc", 33 + }); 34 + 35 + const actors = await paginatedQuery.exec(); 36 + 37 + // Generate cursor from the last item if we have a full page 38 + let nextCursor: string | undefined; 39 + if (actors.length === limit && actors.length > 0) { 40 + const lastActor = actors[actors.length - 1]; 41 + nextCursor = this.indexedAtDidKeyset.pack({ 42 + primary: lastActor.indexedAt, 43 + secondary: lastActor.did, 44 + }); 45 + } 46 + 47 + return { 48 + dids: actors.map((actor) => actor.did), 49 + cursor: nextCursor, 50 + }; 51 + } 52 + 53 + // @TODO post search endpoint still falls back to search service 54 + async searchPosts(term: string, limit = 50, cursor?: string) { 55 + const { q, author } = parsePostSearchQuery(term); 56 + 57 + let authorDid = author; 58 + if (author && !author?.startsWith("did:")) { 59 + const actor = await this.db.models.Actor.findOne({ 60 + handle: author, 61 + }); 62 + authorDid = actor?.did; 63 + } 64 + 65 + // Build query for posts matching the search term 66 + const query: Record<string, unknown> = {}; 67 + 68 + if (q) { 69 + // Search in caption.text using regex 70 + query["caption.text"] = { $regex: q, $options: "i" }; 71 + } 72 + 73 + if (authorDid) { 74 + query.authorDid = authorDid; 75 + } 76 + 77 + const postsQuery = this.db.models.Post.find(query); 78 + 79 + // Apply pagination using createdAt + cid (which matches DB schema and indexes) 80 + const paginatedQuery = this.timeCidKeyset.paginate(postsQuery, { 81 + limit, 82 + cursor, 83 + direction: "desc", 84 + }); 85 + 86 + const posts = await paginatedQuery.exec(); 87 + 88 + // Transform posts to include sortAt for cursor generation 89 + const transformedPosts = posts.map((p) => ({ 90 + uri: p.uri, 91 + cid: p.cid, 92 + sortAt: compositeTime(p.createdAt, p.indexedAt) || p.createdAt, 93 + })); 94 + 95 + // Generate cursor from the last item if we have a full page 96 + let nextCursor: string | undefined; 97 + if (transformedPosts.length === limit && transformedPosts.length > 0) { 98 + const lastPost = transformedPosts[transformedPosts.length - 1]; 99 + nextCursor = this.timeCidKeyset.pack({ 100 + primary: lastPost.sortAt, 101 + secondary: lastPost.cid, 102 + }); 103 + } 104 + 105 + return { 106 + uris: transformedPosts.map((p) => p.uri), 107 + cursor: nextCursor, 108 + }; 109 + } 110 + }
+51
data-plane/util.ts
··· 230 230 return urlStr; 231 231 } 232 232 }; 233 + 234 + // @NOTE: This type is not complete with all supported options. 235 + // Only the ones that we needed to apply custom logic on are currently present. 236 + export type PostSearchQuery = { 237 + q: string; 238 + author: string | undefined; 239 + }; 240 + 241 + export const parsePostSearchQuery = ( 242 + qParam: string, 243 + params?: { 244 + author?: string; 245 + }, 246 + ): PostSearchQuery => { 247 + // Accept individual params, but give preference to options embedded in `q`. 248 + let author = params?.author; 249 + 250 + const parts: string[] = []; 251 + let curr = ""; 252 + let quoted = false; 253 + for (const c of qParam) { 254 + if (c === " " && !quoted) { 255 + curr.trim() && parts.push(curr); 256 + curr = ""; 257 + continue; 258 + } 259 + 260 + if (c === '"') { 261 + quoted = !quoted; 262 + } 263 + curr += c; 264 + } 265 + curr.trim() && parts.push(curr); 266 + 267 + const qParts: string[] = []; 268 + for (const p of parts) { 269 + const tokens = p.split(":"); 270 + if (tokens[0] === "did") { 271 + author = p; 272 + } else if (tokens[0] === "author" || tokens[0] === "from") { 273 + author = tokens[1]; 274 + } else { 275 + qParts.push(p); 276 + } 277 + } 278 + 279 + return { 280 + q: qParts.join(" "), 281 + author, 282 + }; 283 + };
+5 -5
utils/post-transformer.ts
··· 43 43 ]), 44 44 // Get repost counts 45 45 ctx.db.models.Repost.aggregate([ 46 - { $match: { "subject.uri": { $in: postUris } } }, 47 - { $group: { _id: "$subject.uri", count: { $sum: 1 } } }, 46 + { $match: { subject: { $in: postUris } } }, 47 + { $group: { _id: "$subject", count: { $sum: 1 } } }, 48 48 ]), 49 49 50 50 // Get authors ··· 67 67 // Get viewer reposts 68 68 userDid 69 69 ? ctx.db.models.Repost.find({ 70 - "subject.uri": { $in: postUris }, 70 + subject: { $in: postUris }, 71 71 authorDid: userDid, 72 72 }).lean() 73 73 : Promise.resolve([]), ··· 89 89 viewerLikes.map((like) => [like.subject, like.uri]), 90 90 ); 91 91 const viewerRepostsMap = new Map( 92 - viewerReposts.map((repost: { subject: { uri: string }; uri: string }) => [ 93 - repost.subject.uri, 92 + viewerReposts.map((repost: { subject: string; uri: string }) => [ 93 + repost.subject, 94 94 repost.uri, 95 95 ]), 96 96 );