a tool for shared writing and social publishing
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add duration logging to appview

+171 -143
+171 -143
appview/index.ts
··· 2 2 import { Database, Json } from "supabase/database.types"; 3 3 import { IdResolver } from "@atproto/identity"; 4 4 const idResolver = new IdResolver(); 5 - import { Firehose, MemoryRunner } from "@atproto/sync"; 5 + import { Firehose, MemoryRunner, Event } from "@atproto/sync"; 6 6 import { ids } from "lexicons/api/lexicons"; 7 7 import { 8 8 PubLeafletDocument, ··· 37 37 38 38 const client = postgres(process.env.DB_URL!); 39 39 const db = drizzle(client); 40 + async function handleEvent(evt: Event) { 41 + if (evt.event === "identity") { 42 + if (evt.handle) 43 + await supabase 44 + .from("bsky_profiles") 45 + .update({ handle: evt.handle }) 46 + .eq("did", evt.did); 47 + } 48 + if ( 49 + evt.event == "account" || 50 + evt.event === "identity" || 51 + evt.event === "sync" 52 + ) 53 + return; 54 + if (evt.collection !== "app.bsky.feed.post") 55 + console.log(`${evt.event} in ${evt.collection}`); 56 + if (evt.collection === ids.PubLeafletDocument) { 57 + if (evt.event === "create" || evt.event === "update") { 58 + let record = PubLeafletDocument.validateRecord(evt.record); 59 + if (!record.success) { 60 + return; 61 + } 62 + await supabase.from("documents").upsert({ 63 + uri: evt.uri.toString(), 64 + data: record.value as Json, 65 + }); 66 + let publicationURI = new AtUri(record.value.publication); 67 + 68 + if (publicationURI.host !== evt.uri.host) { 69 + console.log("Unauthorized to create post!"); 70 + return; 71 + } 72 + await supabase.from("documents_in_publications").insert({ 73 + publication: record.value.publication, 74 + document: evt.uri.toString(), 75 + }); 76 + } 77 + if (evt.event === "delete") { 78 + await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 79 + } 80 + } 81 + if (evt.collection === ids.PubLeafletPublication) { 82 + if (evt.event === "create" || evt.event === "update") { 83 + let record = PubLeafletPublication.validateRecord(evt.record); 84 + if (!record.success) return; 85 + let { error } = await supabase.from("publications").upsert({ 86 + uri: evt.uri.toString(), 87 + identity_did: evt.did, 88 + name: record.value.name, 89 + record: record.value as Json, 90 + }); 91 + 92 + if (error && error.code === "23503") { 93 + await createIdentity(db, { atp_did: evt.did }); 94 + await supabase.from("publications").upsert({ 95 + uri: evt.uri.toString(), 96 + identity_did: evt.did, 97 + name: record.value.name, 98 + record: record.value as Json, 99 + }); 100 + } 101 + } 102 + if (evt.event === "delete") { 103 + await supabase 104 + .from("publications") 105 + .delete() 106 + .eq("uri", evt.uri.toString()); 107 + } 108 + } 109 + if (evt.collection === ids.PubLeafletGraphSubscription) { 110 + if (evt.event === "create" || evt.event === "update") { 111 + let record = PubLeafletGraphSubscription.validateRecord(evt.record); 112 + if (!record.success) return; 113 + let { error } = await supabase 114 + .from("publication_subscriptions") 115 + .upsert({ 116 + uri: evt.uri.toString(), 117 + identity: evt.did, 118 + publication: record.value.publication, 119 + record: record.value as Json, 120 + }); 121 + if (error && error.code === "23503") { 122 + await createIdentity(db, { atp_did: evt.did }); 123 + await supabase.from("publication_subscriptions").upsert({ 124 + uri: evt.uri.toString(), 125 + identity: evt.did, 126 + publication: record.value.publication, 127 + record: record.value as Json, 128 + }); 129 + } 130 + } 131 + if (evt.event === "delete") { 132 + await supabase 133 + .from("publication_subscriptions") 134 + .delete() 135 + .eq("uri", evt.uri.toString()); 136 + } 137 + } 138 + if (evt.collection === ids.AppBskyActorProfile) { 139 + //only listen to updates because we should fetch it for the first time when they subscribe! 140 + if (evt.event === "update") { 141 + await supabaseServerClient 142 + .from("bsky_profiles") 143 + .update({ record: evt.record as Json }) 144 + .eq("did", evt.did); 145 + } 146 + } 147 + if (evt.collection === "app.bsky.feed.post") { 148 + if (evt.event !== "create") return; 149 + 150 + // Early exit if no embed 151 + if ( 152 + !evt.record || 153 + typeof evt.record !== "object" || 154 + !("embed" in evt.record) 155 + ) 156 + return; 157 + 158 + // Quick check if embed might contain our quote param 159 + const embedStr = JSON.stringify(evt.record.embed); 160 + if (!embedStr.includes(QUOTE_PARAM)) return; 161 + 162 + // Now validate the record since we know it might be relevant 163 + let record = AppBskyFeedPost.validateRecord(evt.record); 164 + if (!record.success) return; 165 + 166 + let embed = 167 + AppBskyEmbedExternal.isMain(record.value.embed) && 168 + record.value.embed.external.uri.includes(QUOTE_PARAM) 169 + ? record.value.embed.external.uri 170 + : null; 171 + if (embed) { 172 + await inngest.send({ 173 + name: "appview/index-bsky-post-mention", 174 + data: { post_uri: evt.uri.toString(), document_link: embed }, 175 + }); 176 + } 177 + } 178 + } 179 + async function timedHandleEvent(evt: Event) { 180 + const startTime = performance.now(); 181 + 182 + if (evt.event === "identity") { 183 + if (evt.handle) 184 + await supabase 185 + .from("bsky_profiles") 186 + .update({ handle: evt.handle }) 187 + .eq("did", evt.did); 188 + } 189 + 190 + await handleEvent(evt); 191 + if ( 192 + evt.event == "account" || 193 + evt.event === "identity" || 194 + evt.event === "sync" 195 + ) { 196 + const endTime = performance.now(); 197 + console.log( 198 + `${evt.event} in ${evt.event || "unknown"} took ${endTime - startTime}ms`, 199 + ); 200 + return; 201 + } 202 + 203 + const endTime = performance.now(); 204 + console.log( 205 + `${evt.event} in ${evt.collection || "unknown"} took ${endTime - startTime}ms`, 206 + ); 207 + } 208 + 40 209 const runner = new MemoryRunner({ 41 210 startCursor, 42 211 setCursor: async (cursor) => { ··· 58 227 ids.AppBskyActorProfile, 59 228 "app.bsky.feed.post", 60 229 ], 61 - handleEvent: async (evt) => { 62 - if (evt.event === "identity") { 63 - if (evt.handle) 64 - await supabase 65 - .from("bsky_profiles") 66 - .update({ handle: evt.handle }) 67 - .eq("did", evt.did); 68 - } 69 - if ( 70 - evt.event == "account" || 71 - evt.event === "identity" || 72 - evt.event === "sync" 73 - ) 74 - return; 75 - if (evt.collection !== "app.bsky.feed.post") 76 - console.log(`${evt.event} in ${evt.collection}`); 77 - if (evt.collection === ids.PubLeafletDocument) { 78 - if (evt.event === "create" || evt.event === "update") { 79 - let record = PubLeafletDocument.validateRecord(evt.record); 80 - if (!record.success) { 81 - return; 82 - } 83 - await supabase.from("documents").upsert({ 84 - uri: evt.uri.toString(), 85 - data: record.value as Json, 86 - }); 87 - let publicationURI = new AtUri(record.value.publication); 88 - 89 - if (publicationURI.host !== evt.uri.host) { 90 - console.log("Unauthorized to create post!"); 91 - return; 92 - } 93 - await supabase.from("documents_in_publications").insert({ 94 - publication: record.value.publication, 95 - document: evt.uri.toString(), 96 - }); 97 - } 98 - if (evt.event === "delete") { 99 - await supabase 100 - .from("documents") 101 - .delete() 102 - .eq("uri", evt.uri.toString()); 103 - } 104 - } 105 - if (evt.collection === ids.PubLeafletPublication) { 106 - if (evt.event === "create" || evt.event === "update") { 107 - let record = PubLeafletPublication.validateRecord(evt.record); 108 - if (!record.success) return; 109 - let { error } = await supabase.from("publications").upsert({ 110 - uri: evt.uri.toString(), 111 - identity_did: evt.did, 112 - name: record.value.name, 113 - record: record.value as Json, 114 - }); 115 - 116 - if (error && error.code === "23503") { 117 - await createIdentity(db, { atp_did: evt.did }); 118 - await supabase.from("publications").upsert({ 119 - uri: evt.uri.toString(), 120 - identity_did: evt.did, 121 - name: record.value.name, 122 - record: record.value as Json, 123 - }); 124 - } 125 - } 126 - if (evt.event === "delete") { 127 - await supabase 128 - .from("publications") 129 - .delete() 130 - .eq("uri", evt.uri.toString()); 131 - } 132 - } 133 - if (evt.collection === ids.PubLeafletGraphSubscription) { 134 - if (evt.event === "create" || evt.event === "update") { 135 - let record = PubLeafletGraphSubscription.validateRecord(evt.record); 136 - if (!record.success) return; 137 - let { error } = await supabase 138 - .from("publication_subscriptions") 139 - .upsert({ 140 - uri: evt.uri.toString(), 141 - identity: evt.did, 142 - publication: record.value.publication, 143 - record: record.value as Json, 144 - }); 145 - if (error && error.code === "23503") { 146 - await createIdentity(db, { atp_did: evt.did }); 147 - await supabase.from("publication_subscriptions").upsert({ 148 - uri: evt.uri.toString(), 149 - identity: evt.did, 150 - publication: record.value.publication, 151 - record: record.value as Json, 152 - }); 153 - } 154 - } 155 - if (evt.event === "delete") { 156 - await supabase 157 - .from("publication_subscriptions") 158 - .delete() 159 - .eq("uri", evt.uri.toString()); 160 - } 161 - } 162 - if (evt.collection === ids.AppBskyActorProfile) { 163 - //only listen to updates because we should fetch it for the first time when they subscribe! 164 - if (evt.event === "update") { 165 - await supabaseServerClient 166 - .from("bsky_profiles") 167 - .update({ record: evt.record as Json }) 168 - .eq("did", evt.did); 169 - } 170 - } 171 - if (evt.collection === "app.bsky.feed.post") { 172 - if (evt.event !== "create") return; 173 - 174 - // Early exit if no embed 175 - if ( 176 - !evt.record || 177 - typeof evt.record !== "object" || 178 - !("embed" in evt.record) 179 - ) 180 - return; 181 - 182 - // Quick check if embed might contain our quote param 183 - const embedStr = JSON.stringify(evt.record.embed); 184 - if (!embedStr.includes(QUOTE_PARAM)) return; 185 - 186 - // Now validate the record since we know it might be relevant 187 - let record = AppBskyFeedPost.validateRecord(evt.record); 188 - if (!record.success) return; 189 - 190 - let embed = 191 - AppBskyEmbedExternal.isMain(record.value.embed) && 192 - record.value.embed.external.uri.includes(QUOTE_PARAM) 193 - ? record.value.embed.external.uri 194 - : null; 195 - if (embed) { 196 - await inngest.send({ 197 - name: "appview/index-bsky-post-mention", 198 - data: { post_uri: evt.uri.toString(), document_link: embed }, 199 - }); 200 - } 201 - } 202 - }, 230 + handleEvent: timedHandleEvent, 203 231 onError: (err) => { 204 232 console.error(err); 205 233 },