ATlast — you'll never need to find your favorites on another platform again. Find your favs in the ATmosphere.
atproto
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: add results api routes to hono and kysely repositories

byarielm.fyi e90e34b7 fbe0c9dc

verified
+462 -7
+4 -6
packages/api/src/repositories/MatchRepository.ts
··· 158 158 ) 159 159 .leftJoin('user_match_status as ums', (join) => 160 160 join 161 - .onRef('am.id', '=', 'ums.atproto_match_id') 161 + .onRef('am.id', '=', 'ums.match_id') 162 162 .on('ums.user_did', '=', userDid) 163 163 ) 164 164 .select([ ··· 207 207 208 208 const values = statuses.map((s) => ({ 209 209 user_did: s.userDid, 210 - atproto_match_id: s.atprotoMatchId, 211 - source_account_id: s.sourceAccountId, 210 + match_id: s.atprotoMatchId, // Fixed: match_id not atproto_match_id 212 211 viewed: s.viewed, 213 - viewed_at: s.viewed ? new Date() : null, 214 212 })); 215 213 216 214 await this.db 217 215 .insertInto('user_match_status') 218 216 .values(values) 219 217 .onConflict((oc) => 220 - oc.columns(['user_did', 'atproto_match_id']).doUpdateSet({ 218 + oc.columns(['user_did', 'match_id']).doUpdateSet({ 221 219 viewed: (eb) => eb.ref('excluded.viewed'), 222 - viewed_at: sql`CASE WHEN excluded.viewed THEN NOW() ELSE user_match_status.viewed_at END`, 220 + updated_at: sql`NOW()`, 223 221 }), 224 222 ) 225 223 .execute();
+16 -1
packages/api/src/repositories/SourceAccountRepository.ts
··· 85 85 upload_id: uploadId, 86 86 user_did: userDid, 87 87 source_account_id: link.sourceAccountId, 88 - date_on_source: link.sourceDate ? new Date(link.sourceDate) : null, 88 + // Note: date_on_source is stored in source_accounts table, not user_source_follows 89 89 })); 90 90 91 91 await this.db ··· 95 95 oc.columns(['upload_id', 'source_account_id']).doNothing(), 96 96 ) 97 97 .execute(); 98 + } 99 + 100 + /** 101 + * Mark source accounts as matched 102 + * NOTE: This is a no-op in the new schema. The old schema had match_found 103 + * and match_found_at columns, but the new schema tracks matches via the 104 + * atproto_matches table instead. Keeping this method for compatibility. 105 + */ 106 + async markAsMatched(sourceAccountIds: number[]): Promise<void> { 107 + // No-op: match status is tracked via atproto_matches table existence 108 + if (sourceAccountIds.length === 0) return; 109 + 110 + // In the new schema, a source account is "matched" if it has rows in atproto_matches 111 + // No need to update source_accounts table 112 + return; 98 113 } 99 114 }
+114
packages/api/src/repositories/UploadRepository.ts
··· 1 + /** 2 + * Upload Repository for Kysely 3 + * Manages user uploads and search results 4 + */ 5 + 6 + import { BaseRepository } from './BaseRepository'; 7 + import { sql } from 'kysely'; 8 + 9 + export interface UserUploadRow { 10 + upload_id: string; 11 + user_did: string; 12 + source_platform: string; 13 + created_at: Date; 14 + total_users: number; 15 + matched_users: number; 16 + unmatched_users: number; 17 + } 18 + 19 + export class UploadRepository extends BaseRepository { 20 + /** 21 + * Create a new upload record 22 + */ 23 + async createUpload( 24 + uploadId: string, 25 + userDid: string, 26 + sourcePlatform: string, 27 + totalUsers: number, 28 + matchedUsers: number, 29 + ): Promise<void> { 30 + await this.db 31 + .insertInto('user_uploads') 32 + .values({ 33 + upload_id: uploadId, 34 + user_did: userDid, 35 + source_platform: sourcePlatform, 36 + total_users: totalUsers, 37 + matched_users: matchedUsers, 38 + unmatched_users: totalUsers - matchedUsers, 39 + }) 40 + .onConflict((oc) => oc.column('upload_id').doNothing()) 41 + .execute(); 42 + } 43 + 44 + /** 45 + * Get all uploads for a user, ordered by most recent first 46 + */ 47 + async getUserUploads(userDid: string): Promise<UserUploadRow[]> { 48 + const results = await this.db 49 + .selectFrom('user_uploads') 50 + .select([ 51 + 'upload_id', 52 + 'user_did', 53 + 'source_platform', 54 + 'created_at', 55 + 'total_users', 56 + 'matched_users', 57 + 'unmatched_users', 58 + ]) 59 + .where('user_did', '=', userDid) 60 + .orderBy('created_at', 'desc') 61 + .execute(); 62 + 63 + return results as UserUploadRow[]; 64 + } 65 + 66 + /** 67 + * Get a specific upload for a user 68 + */ 69 + async getUpload(uploadId: string, userDid: string): Promise<UserUploadRow | null> { 70 + const result = await this.db 71 + .selectFrom('user_uploads') 72 + .selectAll() 73 + .where('upload_id', '=', uploadId) 74 + .where('user_did', '=', userDid) 75 + .executeTakeFirst(); 76 + 77 + return (result as UserUploadRow) || null; 78 + } 79 + 80 + /** 81 + * Update upload match counts 82 + */ 83 + async updateMatchCounts( 84 + uploadId: string, 85 + matchedUsers: number, 86 + unmatchedUsers: number, 87 + ): Promise<void> { 88 + await this.db 89 + .updateTable('user_uploads') 90 + .set({ 91 + matched_users: matchedUsers, 92 + unmatched_users: unmatchedUsers, 93 + }) 94 + .where('upload_id', '=', uploadId) 95 + .execute(); 96 + } 97 + 98 + /** 99 + * Check for recent uploads (within 5 seconds) 100 + * Used to prevent duplicate submissions 101 + */ 102 + async hasRecentUpload(userDid: string): Promise<boolean> { 103 + const result = await this.db 104 + .selectFrom('user_uploads') 105 + .select('upload_id') 106 + .where('user_did', '=', userDid) 107 + .where('created_at', '>', sql`NOW() - INTERVAL '5 seconds'`) 108 + .orderBy('created_at', 'desc') 109 + .limit(1) 110 + .execute(); 111 + 112 + return result.length > 0; 113 + } 114 + }
+328
packages/api/src/routes/results.ts
··· 1 + /** 2 + * Results Routes 3 + * Endpoints for saving and retrieving search results 4 + */ 5 + 6 + import { Hono } from 'hono'; 7 + import { authMiddleware } from '../middleware/auth'; 8 + import { UploadRepository } from '../repositories/UploadRepository'; 9 + import { SourceAccountRepository } from '../repositories/SourceAccountRepository'; 10 + import { MatchRepository } from '../repositories/MatchRepository'; 11 + import { normalize } from '../utils/string.utils'; 12 + import { ValidationError, NotFoundError } from '../errors'; 13 + import { z } from 'zod'; 14 + 15 + const results = new Hono(); 16 + 17 + // Zod schemas for validation 18 + const searchResultSchema = z.object({ 19 + sourceUser: z.object({ 20 + username: z.string(), 21 + date: z.string().optional().default(''), 22 + }), 23 + atprotoMatches: z.array( 24 + z.object({ 25 + did: z.string(), 26 + handle: z.string(), 27 + displayName: z.string().optional(), 28 + avatar: z.string().optional(), 29 + description: z.string().optional(), 30 + matchScore: z.number(), 31 + postCount: z.number(), 32 + followerCount: z.number(), 33 + }), 34 + ), 35 + isSearching: z.boolean().optional(), 36 + error: z.string().optional(), 37 + selectedMatches: z.any().optional(), 38 + }); 39 + 40 + const saveResultsSchema = z.object({ 41 + uploadId: z.string(), 42 + sourcePlatform: z.string(), 43 + results: z.array(searchResultSchema), 44 + saveData: z.boolean().optional().default(true), 45 + }); 46 + 47 + const uploadDetailsParamsSchema = z.object({ 48 + uploadId: z.string(), 49 + page: z.coerce.number().min(1).default(1), 50 + pageSize: z.coerce.number().min(1).max(100).default(50), 51 + }); 52 + 53 + /** 54 + * POST /api/results/save 55 + * Save search results for a user 56 + */ 57 + results.post('/save', authMiddleware, async (c) => { 58 + const body = await c.req.json(); 59 + const { uploadId, sourcePlatform, results: searchResults, saveData } = saveResultsSchema.parse(body); 60 + 61 + const userDid = c.get('did'); 62 + 63 + // If user has disabled data storage, skip save 64 + if (saveData === false) { 65 + console.log(`[save-results] User ${userDid} has data storage disabled - skipping save`); 66 + 67 + const matchedCount = searchResults.filter((r) => r.atprotoMatches.length > 0).length; 68 + 69 + return c.json({ 70 + success: true, 71 + message: 'Data storage disabled - results not saved', 72 + uploadId, 73 + totalUsers: searchResults.length, 74 + matchedUsers: matchedCount, 75 + unmatchedUsers: searchResults.length - matchedCount, 76 + }); 77 + } 78 + 79 + const uploadRepo = new UploadRepository(); 80 + const sourceAccountRepo = new SourceAccountRepository(); 81 + const matchRepo = new MatchRepository(); 82 + let matchedCount = 0; 83 + 84 + // Check if this specific upload already exists 85 + const existingUpload = await uploadRepo.getUpload(uploadId, userDid); 86 + 87 + if (!existingUpload) { 88 + // Upload doesn't exist - create it (file upload flow) 89 + await uploadRepo.createUpload( 90 + uploadId, 91 + userDid, 92 + sourcePlatform, 93 + searchResults.length, 94 + 0, 95 + ); 96 + } else { 97 + // Upload exists (extension flow) - just update it with matches 98 + console.log(`[save-results] Updating existing upload ${uploadId} with matches`); 99 + } 100 + 101 + // Bulk create source accounts 102 + const allUsernames = searchResults.map((r) => r.sourceUser.username); 103 + const sourceAccountIdMap = await sourceAccountRepo.bulkCreate( 104 + sourcePlatform, 105 + allUsernames, 106 + ); 107 + 108 + // Link source accounts to upload 109 + const links = searchResults 110 + .map((result) => { 111 + const normalized = normalize(result.sourceUser.username); 112 + const sourceAccountId = sourceAccountIdMap.get(normalized); 113 + return { 114 + sourceAccountId: sourceAccountId!, 115 + sourceDate: result.sourceUser.date, 116 + }; 117 + }) 118 + .filter((link) => link.sourceAccountId !== undefined); 119 + 120 + await sourceAccountRepo.linkUserToAccounts(uploadId, userDid, links); 121 + 122 + // Prepare matches for bulk insert 123 + const allMatches: Array<{ 124 + sourceAccountId: number; 125 + atprotoDid: string; 126 + atprotoHandle: string; 127 + atprotoDisplayName?: string; 128 + atprotoAvatar?: string; 129 + atprotoDescription?: string; 130 + matchScore: number; 131 + postCount: number; 132 + followerCount: number; 133 + }> = []; 134 + 135 + const matchedSourceAccountIds: number[] = []; 136 + 137 + for (const result of searchResults) { 138 + const normalized = normalize(result.sourceUser.username); 139 + const sourceAccountId = sourceAccountIdMap.get(normalized); 140 + 141 + if ( 142 + sourceAccountId && 143 + result.atprotoMatches && 144 + result.atprotoMatches.length > 0 145 + ) { 146 + matchedCount++; 147 + matchedSourceAccountIds.push(sourceAccountId); 148 + 149 + for (const match of result.atprotoMatches) { 150 + allMatches.push({ 151 + sourceAccountId, 152 + atprotoDid: match.did, 153 + atprotoHandle: match.handle, 154 + atprotoDisplayName: match.displayName, 155 + atprotoAvatar: match.avatar, 156 + atprotoDescription: (match as any).description, 157 + matchScore: match.matchScore, 158 + postCount: match.postCount || 0, 159 + followerCount: match.followerCount || 0, 160 + }); 161 + } 162 + } 163 + } 164 + 165 + // Bulk store matches 166 + let matchIdMap = new Map<string, number>(); 167 + if (allMatches.length > 0) { 168 + matchIdMap = await matchRepo.bulkStoreMatches(allMatches); 169 + } 170 + 171 + // Mark source accounts as matched 172 + if (matchedSourceAccountIds.length > 0) { 173 + await sourceAccountRepo.markAsMatched(matchedSourceAccountIds); 174 + } 175 + 176 + // Create user match status records 177 + const statuses: Array<{ 178 + userDid: string; 179 + atprotoMatchId: number; 180 + sourceAccountId: number; 181 + viewed: boolean; 182 + }> = []; 183 + 184 + for (const match of allMatches) { 185 + const key = `${match.sourceAccountId}:${match.atprotoDid}`; 186 + const matchId = matchIdMap.get(key); 187 + if (matchId) { 188 + statuses.push({ 189 + userDid, 190 + atprotoMatchId: matchId, 191 + sourceAccountId: match.sourceAccountId, 192 + viewed: true, 193 + }); 194 + } 195 + } 196 + 197 + if (statuses.length > 0) { 198 + await matchRepo.upsertUserMatchStatus(statuses); 199 + } 200 + 201 + // Update upload match counts 202 + await uploadRepo.updateMatchCounts( 203 + uploadId, 204 + matchedCount, 205 + searchResults.length - matchedCount, 206 + ); 207 + 208 + return c.json({ 209 + success: true, 210 + uploadId, 211 + totalUsers: searchResults.length, 212 + matchedUsers: matchedCount, 213 + unmatchedUsers: searchResults.length - matchedCount, 214 + }); 215 + }); 216 + 217 + /** 218 + * GET /api/results/uploads 219 + * Get all uploads for the authenticated user 220 + */ 221 + results.get('/uploads', authMiddleware, async (c) => { 222 + const userDid = c.get('did'); 223 + const uploadRepo = new UploadRepository(); 224 + 225 + const uploads = await uploadRepo.getUserUploads(userDid); 226 + 227 + return c.json({ 228 + success: true, 229 + data: { 230 + uploads: uploads.map((upload) => ({ 231 + uploadId: upload.upload_id, 232 + sourcePlatform: upload.source_platform, 233 + createdAt: upload.created_at, 234 + totalUsers: upload.total_users, 235 + matchedUsers: upload.matched_users, 236 + unmatchedUsers: upload.unmatched_users, 237 + })), 238 + }, 239 + }); 240 + }); 241 + 242 + /** 243 + * GET /api/results/upload-details 244 + * Get detailed results for a specific upload with pagination 245 + */ 246 + results.get('/upload-details', authMiddleware, async (c) => { 247 + const userDid = c.get('did'); 248 + const query = c.req.query(); 249 + 250 + const { uploadId, page, pageSize } = uploadDetailsParamsSchema.parse(query); 251 + 252 + const matchRepo = new MatchRepository(); 253 + 254 + // Fetch paginated results 255 + const { results: rawResults, totalUsers } = await matchRepo.getUploadDetails( 256 + uploadId, 257 + userDid, 258 + page, 259 + pageSize, 260 + ); 261 + 262 + if (totalUsers === 0) { 263 + throw new NotFoundError('Upload not found'); 264 + } 265 + 266 + const totalPages = Math.ceil(totalUsers / pageSize); 267 + 268 + // Group results by source username 269 + const groupedResults = new Map<string, any>(); 270 + 271 + rawResults.forEach((row: any) => { 272 + const username = row.original_username; 273 + 274 + // Get or create the entry for this username 275 + let userResult = groupedResults.get(username); 276 + 277 + if (!userResult) { 278 + userResult = { 279 + sourceUser: { 280 + username: username, 281 + date: row.date_on_source || '', 282 + }, 283 + atprotoMatches: [], 284 + }; 285 + groupedResults.set(username, userResult); 286 + } 287 + 288 + // Add the match (if it exists) to the array 289 + if (row.atproto_did) { 290 + userResult.atprotoMatches.push({ 291 + did: row.atproto_did, 292 + handle: row.atproto_handle, 293 + displayName: row.display_name, 294 + matchScore: row.match_score, 295 + postCount: row.post_count, 296 + followerCount: row.follower_count, 297 + foundAt: row.found_at, 298 + dismissed: row.dismissed || false, 299 + followStatus: row.follow_status || {}, 300 + }); 301 + } 302 + }); 303 + 304 + const searchResults = Array.from(groupedResults.values()); 305 + 306 + return c.json( 307 + { 308 + success: true, 309 + data: { 310 + results: searchResults, 311 + pagination: { 312 + page, 313 + pageSize, 314 + totalPages, 315 + totalUsers, 316 + hasNextPage: page < totalPages, 317 + hasPrevPage: page > 1, 318 + }, 319 + }, 320 + }, 321 + 200, 322 + { 323 + 'Cache-Control': 'private, max-age=600', 324 + }, 325 + ); 326 + }); 327 + 328 + export default results;