a tool for shared writing and social publishing
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

update check oauth session workflow

+82 -82
+7
app/api/inngest/client.ts
··· 29 29 "user/cleanup-expired-oauth-sessions": { 30 30 data: {}; 31 31 }; 32 + "user/check-oauth-session": { 33 + data: { 34 + identityId: string; 35 + did: string; 36 + tokenCount: number; 37 + }; 38 + }; 32 39 }; 33 40 34 41 // Create a client to send and receive events
+70 -81
app/api/inngest/functions/cleanup_expired_oauth_sessions.ts
··· 2 2 import { inngest } from "../client"; 3 3 import { restoreOAuthSession } from "src/atproto-oauth"; 4 4 5 + // Main function that fetches identities and publishes events for each one 5 6 export const cleanup_expired_oauth_sessions = inngest.createFunction( 6 7 { id: "cleanup_expired_oauth_sessions" }, 7 8 { event: "user/cleanup-expired-oauth-sessions" }, 8 9 async ({ step }) => { 9 - const stats = { 10 - totalIdentities: 0, 11 - validSessions: 0, 12 - expiredSessions: 0, 13 - tokensDeleted: 0, 14 - errors: [] as string[], 15 - }; 16 - 17 - // Step 1: Get all identities with an atp_did (OAuth users) that have at least one auth token 10 + // Get all identities with an atp_did (OAuth users) that have at least one auth token 18 11 const identities = await step.run("fetch-oauth-identities", async () => { 19 12 const { data, error } = await supabaseServerClient 20 13 .from("identities") ··· 33 26 }) 34 27 .map((identity) => ({ 35 28 id: identity.id, 36 - atp_did: identity.atp_did, 29 + atp_did: identity.atp_did!, 37 30 tokenCount: identity.email_auth_tokens?.[0]?.count ?? 0, 38 31 })); 39 32 }); 40 33 41 - stats.totalIdentities = identities.length; 42 - console.log(`Found ${identities.length} OAuth identities with active sessions to check`); 34 + console.log( 35 + `Found ${identities.length} OAuth identities with active sessions to check`, 36 + ); 43 37 44 - // Step 2: Check identities' OAuth sessions in batched parallel and cleanup if expired 45 - const BATCH_SIZE = 150; 46 - const allResults: { 47 - identityId: string; 48 - valid: boolean; 49 - tokensDeleted: number; 50 - error?: string; 51 - }[] = []; 38 + // Publish events for each identity in batches 39 + const BATCH_SIZE = 100; 40 + let totalSent = 0; 52 41 53 42 for (let i = 0; i < identities.length; i += BATCH_SIZE) { 54 43 const batch = identities.slice(i, i + BATCH_SIZE); 55 - const batchNum = Math.floor(i / BATCH_SIZE) + 1; 56 - const totalBatches = Math.ceil(identities.length / BATCH_SIZE); 57 44 58 - console.log( 59 - `Processing batch ${batchNum}/${totalBatches} (${batch.length} identities)`, 60 - ); 45 + await step.run(`send-events-batch-${i}`, async () => { 46 + const events = batch.map((identity) => ({ 47 + name: "user/check-oauth-session" as const, 48 + data: { 49 + identityId: identity.id, 50 + did: identity.atp_did, 51 + tokenCount: identity.tokenCount, 52 + }, 53 + })); 61 54 62 - const batchResults = await Promise.all( 63 - batch.map((identity) => 64 - step.run(`check-session-${identity.id}`, async () => { 65 - console.log( 66 - `Checking OAuth session for DID: ${identity.atp_did} (${identity.tokenCount} tokens)`, 67 - ); 55 + await inngest.send(events); 56 + return events.length; 57 + }); 68 58 69 - const sessionResult = await restoreOAuthSession(identity.atp_did!); 59 + totalSent += batch.length; 60 + } 70 61 71 - if (sessionResult.ok) { 72 - console.log(` Session valid for ${identity.atp_did}`); 73 - return { identityId: identity.id, valid: true, tokensDeleted: 0 }; 74 - } 62 + console.log(`Published ${totalSent} check-oauth-session events`); 63 + 64 + return { 65 + success: true, 66 + identitiesQueued: totalSent, 67 + }; 68 + }, 69 + ); 75 70 76 - // Session is expired/invalid - delete associated auth tokens 77 - console.log( 78 - ` Session expired for ${identity.atp_did}: ${sessionResult.error.message}`, 79 - ); 71 + // Function that checks a single identity's OAuth session and cleans up if expired 72 + export const check_oauth_session = inngest.createFunction( 73 + { id: "check_oauth_session" }, 74 + { event: "user/check-oauth-session" }, 75 + async ({ event, step }) => { 76 + const { identityId, did, tokenCount } = event.data; 80 77 81 - const { error: deleteError } = await supabaseServerClient 82 - .from("email_auth_tokens") 83 - .delete() 84 - .eq("identity", identity.id); 78 + const result = await step.run("check-and-cleanup", async () => { 79 + console.log(`Checking OAuth session for DID: ${did} (${tokenCount} tokens)`); 85 80 86 - if (deleteError) { 87 - console.error( 88 - ` Error deleting tokens for identity ${identity.id}: ${deleteError.message}`, 89 - ); 90 - return { 91 - identityId: identity.id, 92 - valid: false, 93 - tokensDeleted: 0, 94 - error: deleteError.message, 95 - }; 96 - } 81 + const sessionResult = await restoreOAuthSession(did); 97 82 98 - console.log( 99 - ` Deleted ${identity.tokenCount} auth tokens for identity ${identity.id}`, 100 - ); 83 + if (sessionResult.ok) { 84 + console.log(` Session valid for ${did}`); 85 + return { valid: true, tokensDeleted: 0 }; 86 + } 101 87 102 - return { 103 - identityId: identity.id, 104 - valid: false, 105 - tokensDeleted: identity.tokenCount, 106 - }; 107 - }), 108 - ), 88 + // Session is expired/invalid - delete associated auth tokens 89 + console.log( 90 + ` Session expired for ${did}: ${sessionResult.error.message}`, 109 91 ); 110 92 111 - allResults.push(...batchResults); 112 - } 93 + const { error: deleteError } = await supabaseServerClient 94 + .from("email_auth_tokens") 95 + .delete() 96 + .eq("identity", identityId); 113 97 114 - // Aggregate results 115 - for (const result of allResults) { 116 - if (result.valid) { 117 - stats.validSessions++; 118 - } else { 119 - stats.expiredSessions++; 120 - stats.tokensDeleted += result.tokensDeleted; 121 - if ("error" in result && result.error) { 122 - stats.errors.push(`Identity ${result.identityId}: ${result.error}`); 123 - } 98 + if (deleteError) { 99 + console.error( 100 + ` Error deleting tokens for identity ${identityId}: ${deleteError.message}`, 101 + ); 102 + return { 103 + valid: false, 104 + tokensDeleted: 0, 105 + error: deleteError.message, 106 + }; 124 107 } 125 - } 126 108 127 - console.log("Cleanup completed:", stats); 109 + console.log(` Deleted ${tokenCount} auth tokens for identity ${identityId}`); 110 + 111 + return { 112 + valid: false, 113 + tokensDeleted: tokenCount, 114 + }; 115 + }); 128 116 129 117 return { 130 - success: stats.errors.length === 0, 131 - stats, 118 + identityId, 119 + did, 120 + ...result, 132 121 }; 133 122 }, 134 123 );
+5 -1
app/api/inngest/route.tsx
··· 5 5 import { batched_update_profiles } from "./functions/batched_update_profiles"; 6 6 import { index_follows } from "./functions/index_follows"; 7 7 import { migrate_user_to_standard } from "./functions/migrate_user_to_standard"; 8 - import { cleanup_expired_oauth_sessions } from "./functions/cleanup_expired_oauth_sessions"; 8 + import { 9 + cleanup_expired_oauth_sessions, 10 + check_oauth_session, 11 + } from "./functions/cleanup_expired_oauth_sessions"; 9 12 10 13 export const { GET, POST, PUT } = serve({ 11 14 client: inngest, ··· 16 19 index_follows, 17 20 migrate_user_to_standard, 18 21 cleanup_expired_oauth_sessions, 22 + check_oauth_session, 19 23 ], 20 24 });