a tool for shared writing and social publishing
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add inngest workflow to cleanup expired oauth sessions

+106
+3
app/api/inngest/client.ts
··· 26 26 did: string; 27 27 }; 28 28 }; 29 + "user/cleanup-expired-oauth-sessions": { 30 + data: {}; 31 + }; 29 32 }; 30 33 31 34 // Create a client to send and receive events
+101
app/api/inngest/functions/cleanup_expired_oauth_sessions.ts
··· 1 + import { supabaseServerClient } from "supabase/serverClient"; 2 + import { inngest } from "../client"; 3 + import { restoreOAuthSession } from "src/atproto-oauth"; 4 + 5 + export const cleanup_expired_oauth_sessions = inngest.createFunction( 6 + { id: "cleanup_expired_oauth_sessions" }, 7 + { event: "user/cleanup-expired-oauth-sessions" }, 8 + async ({ step }) => { 9 + const stats = { 10 + totalIdentities: 0, 11 + validSessions: 0, 12 + expiredSessions: 0, 13 + tokensDeleted: 0, 14 + errors: [] as string[], 15 + }; 16 + 17 + // Step 1: Get all identities with an atp_did (OAuth users) 18 + const identities = await step.run("fetch-oauth-identities", async () => { 19 + const { data, error } = await supabaseServerClient 20 + .from("identities") 21 + .select("id, atp_did") 22 + .not("atp_did", "is", null); 23 + 24 + if (error) { 25 + throw new Error(`Failed to fetch identities: ${error.message}`); 26 + } 27 + return data || []; 28 + }); 29 + 30 + stats.totalIdentities = identities.length; 31 + console.log(`Found ${identities.length} OAuth identities to check`); 32 + 33 + // Step 2: Check each identity's OAuth session and cleanup if expired 34 + for (const identity of identities) { 35 + if (!identity.atp_did) continue; 36 + 37 + const result = await step.run( 38 + `check-session-${identity.id}`, 39 + async () => { 40 + console.log(`Checking OAuth session for DID: ${identity.atp_did}`); 41 + 42 + const sessionResult = await restoreOAuthSession(identity.atp_did!); 43 + 44 + if (sessionResult.ok) { 45 + console.log(` Session valid for ${identity.atp_did}`); 46 + return { valid: true, tokensDeleted: 0 }; 47 + } 48 + 49 + // Session is expired/invalid - delete associated auth tokens 50 + console.log( 51 + ` Session expired for ${identity.atp_did}: ${sessionResult.error.message}`, 52 + ); 53 + 54 + const { data: deletedTokens, error: deleteError } = 55 + await supabaseServerClient 56 + .from("email_auth_tokens") 57 + .delete() 58 + .eq("identity", identity.id) 59 + .select("id"); 60 + 61 + if (deleteError) { 62 + console.error( 63 + ` Error deleting tokens for identity ${identity.id}: ${deleteError.message}`, 64 + ); 65 + return { 66 + valid: false, 67 + tokensDeleted: 0, 68 + error: deleteError.message, 69 + }; 70 + } 71 + 72 + const deletedCount = deletedTokens?.length || 0; 73 + console.log( 74 + ` Deleted ${deletedCount} auth tokens for identity ${identity.id}`, 75 + ); 76 + 77 + return { valid: false, tokensDeleted: deletedCount }; 78 + }, 79 + ); 80 + 81 + if (result.valid) { 82 + stats.validSessions++; 83 + } else { 84 + stats.expiredSessions++; 85 + stats.tokensDeleted += result.tokensDeleted; 86 + if ("error" in result && result.error) { 87 + stats.errors.push( 88 + `Identity ${identity.id}: ${result.error}`, 89 + ); 90 + } 91 + } 92 + } 93 + 94 + console.log("Cleanup completed:", stats); 95 + 96 + return { 97 + success: stats.errors.length === 0, 98 + stats, 99 + }; 100 + }, 101 + );
+2
app/api/inngest/route.tsx
··· 5 5 import { batched_update_profiles } from "./functions/batched_update_profiles"; 6 6 import { index_follows } from "./functions/index_follows"; 7 7 import { migrate_user_to_standard } from "./functions/migrate_user_to_standard"; 8 + import { cleanup_expired_oauth_sessions } from "./functions/cleanup_expired_oauth_sessions"; 8 9 9 10 export const { GET, POST, PUT } = serve({ 10 11 client: inngest, ··· 14 15 batched_update_profiles, 15 16 index_follows, 16 17 migrate_user_to_standard, 18 + cleanup_expired_oauth_sessions, 17 19 ], 18 20 });