a tool for shared writing and social publishing
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add new fix inngest function

+220
+5
app/api/inngest/client.ts
··· 36 36 tokenCount: number; 37 37 }; 38 38 }; 39 + "documents/fix-publication-references": { 40 + data: { 41 + documentUris: string[]; 42 + }; 43 + }; 39 44 }; 40 45 41 46 // Create a client to send and receive events
+213
app/api/inngest/functions/fix_standard_document_publications.ts
··· 1 + import { supabaseServerClient } from "supabase/serverClient"; 2 + import { inngest } from "../client"; 3 + import { restoreOAuthSession } from "src/atproto-oauth"; 4 + import { AtpBaseClient, SiteStandardDocument } from "lexicons/api"; 5 + import { AtUri } from "@atproto/syntax"; 6 + import { Json } from "supabase/database.types"; 7 + 8 + async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> { 9 + const result = await restoreOAuthSession(did); 10 + if (!result.ok) { 11 + throw new Error(`Failed to restore OAuth session: ${result.error.message}`); 12 + } 13 + const credentialSession = result.value; 14 + return new AtpBaseClient( 15 + credentialSession.fetchHandler.bind(credentialSession), 16 + ); 17 + } 18 + 19 + /** 20 + * Fixes site.standard.document records that have stale pub.leaflet.publication 21 + * references in their site field. Updates both the PDS record and database. 22 + */ 23 + export const fix_standard_document_publications = inngest.createFunction( 24 + { id: "fix_standard_document_publications" }, 25 + { event: "documents/fix-publication-references" }, 26 + async ({ event, step }) => { 27 + const { documentUris } = event.data as { documentUris: string[] }; 28 + 29 + const stats = { 30 + documentsFixed: 0, 31 + joinEntriesFixed: 0, 32 + errors: [] as string[], 33 + }; 34 + 35 + if (!documentUris || documentUris.length === 0) { 36 + return { success: true, stats, message: "No documents to fix" }; 37 + } 38 + 39 + // Group documents by DID (author) for efficient OAuth session handling 40 + const docsByDid = new Map<string, string[]>(); 41 + for (const uri of documentUris) { 42 + try { 43 + const aturi = new AtUri(uri); 44 + const did = aturi.hostname; 45 + const existing = docsByDid.get(did) || []; 46 + existing.push(uri); 47 + docsByDid.set(did, existing); 48 + } catch (e) { 49 + stats.errors.push(`Invalid URI: ${uri}`); 50 + } 51 + } 52 + 53 + // Process each DID's documents 54 + for (const [did, uris] of docsByDid) { 55 + // Verify OAuth session for this user 56 + const oauthValid = await step.run( 57 + `verify-oauth-${did.slice(-8)}`, 58 + async () => { 59 + const result = await restoreOAuthSession(did); 60 + return result.ok; 61 + }, 62 + ); 63 + 64 + if (!oauthValid) { 65 + stats.errors.push(`No valid OAuth session for ${did}`); 66 + continue; 67 + } 68 + 69 + // Fix each document 70 + for (const docUri of uris) { 71 + const result = await step.run( 72 + `fix-doc-${docUri.slice(-12)}`, 73 + async () => { 74 + // Fetch the document 75 + const { data: doc, error: fetchError } = await supabaseServerClient 76 + .from("documents") 77 + .select("uri, data") 78 + .eq("uri", docUri) 79 + .single(); 80 + 81 + if (fetchError || !doc) { 82 + return { 83 + success: false as const, 84 + error: `Document not found: ${fetchError?.message || "no data"}`, 85 + }; 86 + } 87 + 88 + const data = doc.data as SiteStandardDocument.Record; 89 + const oldSite = data?.site; 90 + 91 + if (!oldSite || !oldSite.includes("/pub.leaflet.publication/")) { 92 + return { 93 + success: false as const, 94 + error: "Document does not have a pub.leaflet.publication site reference", 95 + }; 96 + } 97 + 98 + // Convert to new publication URI 99 + const oldPubAturi = new AtUri(oldSite); 100 + const newSite = `at://${oldPubAturi.hostname}/site.standard.publication/${oldPubAturi.rkey}`; 101 + 102 + // Update the record 103 + const updatedRecord: SiteStandardDocument.Record = { 104 + ...data, 105 + site: newSite, 106 + }; 107 + 108 + // Write to PDS 109 + const docAturi = new AtUri(docUri); 110 + const agent = await createAuthenticatedAgent(did); 111 + await agent.com.atproto.repo.putRecord({ 112 + repo: did, 113 + collection: "site.standard.document", 114 + rkey: docAturi.rkey, 115 + record: updatedRecord, 116 + validate: false, 117 + }); 118 + 119 + // Update database 120 + const { error: dbError } = await supabaseServerClient 121 + .from("documents") 122 + .update({ data: updatedRecord as Json }) 123 + .eq("uri", docUri); 124 + 125 + if (dbError) { 126 + return { 127 + success: false as const, 128 + error: `Database update failed: ${dbError.message}`, 129 + }; 130 + } 131 + 132 + return { 133 + success: true as const, 134 + oldSite, 135 + newSite, 136 + }; 137 + }, 138 + ); 139 + 140 + if (result.success) { 141 + stats.documentsFixed++; 142 + 143 + // Fix the documents_in_publications entry 144 + const joinResult = await step.run( 145 + `fix-join-${docUri.slice(-12)}`, 146 + async () => { 147 + // Find the publication URI that exists in the database 148 + const { data: doc } = await supabaseServerClient 149 + .from("documents") 150 + .select("data") 151 + .eq("uri", docUri) 152 + .single(); 153 + 154 + const newSite = (doc?.data as any)?.site; 155 + if (!newSite) { 156 + return { success: false as const, error: "Could not read updated document" }; 157 + } 158 + 159 + // Check which publication URI exists 160 + const newPubAturi = new AtUri(newSite); 161 + const oldPubUri = `at://${newPubAturi.hostname}/pub.leaflet.publication/${newPubAturi.rkey}`; 162 + 163 + const { data: pubs } = await supabaseServerClient 164 + .from("publications") 165 + .select("uri") 166 + .in("uri", [newSite, oldPubUri]); 167 + 168 + const existingPubUri = pubs?.find((p) => p.uri === newSite)?.uri || 169 + pubs?.find((p) => p.uri === oldPubUri)?.uri; 170 + 171 + if (!existingPubUri) { 172 + return { success: false as const, error: "No matching publication found" }; 173 + } 174 + 175 + // Delete any existing entries for this document 176 + await supabaseServerClient 177 + .from("documents_in_publications") 178 + .delete() 179 + .eq("document", docUri); 180 + 181 + // Insert the correct entry 182 + const { error: insertError } = await supabaseServerClient 183 + .from("documents_in_publications") 184 + .insert({ 185 + document: docUri, 186 + publication: existingPubUri, 187 + }); 188 + 189 + if (insertError) { 190 + return { success: false as const, error: insertError.message }; 191 + } 192 + 193 + return { success: true as const, publication: existingPubUri }; 194 + }, 195 + ); 196 + 197 + if (joinResult.success) { 198 + stats.joinEntriesFixed++; 199 + } else { 200 + stats.errors.push(`Join table fix failed for ${docUri}: ${"error" in joinResult ? joinResult.error : "unknown error"}`); 201 + } 202 + } else { 203 + stats.errors.push(`${docUri}: ${result.error}`); 204 + } 205 + } 206 + } 207 + 208 + return { 209 + success: stats.errors.length === 0, 210 + stats, 211 + }; 212 + }, 213 + );
+2
app/api/inngest/route.tsx
··· 5 5 import { batched_update_profiles } from "./functions/batched_update_profiles"; 6 6 import { index_follows } from "./functions/index_follows"; 7 7 import { migrate_user_to_standard } from "./functions/migrate_user_to_standard"; 8 + import { fix_standard_document_publications } from "./functions/fix_standard_document_publications"; 8 9 import { 9 10 cleanup_expired_oauth_sessions, 10 11 check_oauth_session, ··· 18 19 batched_update_profiles, 19 20 index_follows, 20 21 migrate_user_to_standard, 22 + fix_standard_document_publications, 21 23 cleanup_expired_oauth_sessions, 22 24 check_oauth_session, 23 25 ],