this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

made test case pass lol

ansxor 8da03676 72c23cbd

+161 -66
+105 -19
apps/firehose/index.ts
··· 1 1 import { JetstreamSubscription, type JetstreamEvent } from "@atcute/jetstream"; 2 - import { is } from '@atcute/lexicons'; 2 + import { is, type Blob as AtBlob } from "@atcute/lexicons"; 3 3 import { CaAnsxorCatnipTrack } from "lexicon/atcute-lexicon"; 4 + import { createDb } from "db"; 5 + import { tracks, artists, trackArtists, type BlobRef } from "db/schema"; 6 + import { eq } from "drizzle-orm"; 4 7 5 8 const subscription = new JetstreamSubscription({ 6 - url: "wss://jetstream2.us-east.bsky.network", 7 - wantedCollections: ["ca.ansxor.catnip.track"] 9 + url: "wss://jetstream2.us-east.bsky.network", 10 + wantedCollections: ["ca.ansxor.catnip.track"], 8 11 }); 9 12 10 - export async function loop(subscription: AsyncIterable<JetstreamEvent>) { 13 + function mapBlob(blob: AtBlob): BlobRef { 14 + return { 15 + $type: "blob", 16 + ref: { $link: blob.ref.$link }, 17 + mimeType: blob.mimeType, 18 + size: blob.size, 19 + } satisfies BlobRef; 20 + } 11 21 12 - for await (const event of subscription) { 13 - if (event.kind !== "commit") { 14 - continue; 15 - } 22 + export async function loop(subscription: AsyncIterable<JetstreamEvent>, db: ReturnType<typeof createDb>) { 23 + for await (const event of subscription) { 24 + if (event.kind !== "commit") { 25 + continue; 26 + } 16 27 17 - const commit = event.commit; 18 - if (commit.operation !== "create") { 19 - continue; 20 - } 21 - const { collection, operation, rkey, rev } = event.commit; 28 + const commit = event.commit; 29 + if (commit.operation !== "create") { 30 + continue; 31 + } 32 + const { rkey } = event.commit; 22 33 23 - if (!is(CaAnsxorCatnipTrack.mainSchema, commit.record)) { 24 - console.warn('invalid record', commit.record); 25 - continue; 26 - } 34 + if (!is(CaAnsxorCatnipTrack.mainSchema, commit.record)) { 35 + console.warn("invalid record", commit.record); 36 + continue; 37 + } 27 38 39 + const record = commit.record; 40 + const uri = `at://${event.did}/ca.ansxor.catnip.track/${rkey}`; 28 41 29 - } 42 + try { 43 + await db.transaction(async (tx) => { 44 + await tx.insert(tracks).values({ 45 + uri, 46 + did: event.did, 47 + rkey, 48 + cid: commit.cid, 49 + title: record.title, 50 + description: record.description, 51 + createdAt: new Date(record.createdAt), 52 + releaseDate: record.releaseDate 53 + ? new Date(record.releaseDate) 54 + : null, 55 + durationMs: record.durationMs, 56 + tags: record.tags, 57 + language: record.language, 58 + license: record.license, 59 + lyrics: record.lyrics, 60 + albumArt: 61 + record.albumArt && "$type" in record.albumArt 62 + ? mapBlob(record.albumArt) 63 + : undefined, 64 + audio: 65 + "$type" in record.audio ? mapBlob(record.audio) : undefined, 66 + externalUrl: record.externalUrl, 67 + }); 68 + 69 + if (record.artists) { 70 + for (let i = 0; i < record.artists.length; i++) { 71 + const credit = record.artists[i]!; 72 + let artistId: number; 73 + 74 + if (credit.did) { 75 + const existing = await tx 76 + .select({ id: artists.id }) 77 + .from(artists) 78 + .where(eq(artists.did, credit.did)) 79 + .limit(1); 80 + 81 + if (existing[0]) { 82 + artistId = existing[0].id; 83 + } else { 84 + const inserted = await tx 85 + .insert(artists) 86 + .values({ did: credit.did, name: credit.name }) 87 + .returning({ id: artists.id }) 88 + .then((rows) => rows[0]!); 89 + artistId = inserted.id; 90 + } 91 + } else { 92 + const inserted = await tx 93 + .insert(artists) 94 + .values({ name: credit.name }) 95 + .returning({ id: artists.id }) 96 + .then((rows) => rows[0]!); 97 + artistId = inserted.id; 98 + } 99 + 100 + await tx.insert(trackArtists).values({ 101 + trackUri: uri, 102 + artistId, 103 + position: i, 104 + }); 105 + } 106 + } 107 + }); 108 + 109 + console.log(`indexed track: ${uri}`); 110 + } catch (err) { 111 + console.error(`failed to index track ${uri}:`, err); 112 + } 113 + } 30 114 } 31 115 32 116 if (import.meta.main) { 33 - await loop(subscription); 117 + const db = createDb(); 118 + 119 + await loop(subscription, db); 34 120 }
+3 -2
apps/firehose/package.json
··· 11 11 "@atcute/firehose": "^0.1.0", 12 12 "@atcute/jetstream": "^1.1.2", 13 13 "@atcute/lexicons": "^1.2.9", 14 - "lexicon": "workspace:*", 15 - "db": "workspace:*" 14 + "db": "workspace:*", 15 + "drizzle-orm": "^0.45.1", 16 + "lexicon": "workspace:*" 16 17 } 17 18 }
+52 -45
apps/firehose/tests/firehose.test.ts
··· 8 8 let db: Awaited<ReturnType<typeof setupTestDb>>; 9 9 10 10 beforeAll(async () => { 11 - db = await setupTestDb(); 11 + db = await setupTestDb(); 12 12 }); 13 13 14 14 afterAll(async () => { 15 - await teardownTestDb(); 15 + await teardownTestDb(); 16 16 }); 17 17 18 - function mockSubscription(events: JetstreamEvent[]): AsyncIterable<JetstreamEvent> { 19 - return { 20 - async *[Symbol.asyncIterator]() { 21 - for (const event of events) { 22 - yield event; 23 - } 24 - }, 25 - }; 18 + function mockSubscription( 19 + events: JetstreamEvent[], 20 + ): AsyncIterable<JetstreamEvent> { 21 + return { 22 + async *[Symbol.asyncIterator]() { 23 + for (const event of events) { 24 + yield event; 25 + } 26 + }, 27 + }; 26 28 } 27 29 28 30 const SAMPLE_DID = "did:plc:ragtjsm2j2vknwkz3zp4oxrd"; 29 31 const SAMPLE_RKEY = "3jui7kd54zh2i"; 30 - const SAMPLE_CID = "bafyreig2fjxi3a743bb54z2at6lsv2brz4f5d3hefma6ludqt5d2k2ofey"; 32 + const SAMPLE_CID = 33 + "bafyreig2fjxi3a743bb54z2at6lsv2brz4f5d3hefma6ludqt5d2k2ofey"; 31 34 const SAMPLE_COLLECTION = "ca.ansxor.catnip.track"; 32 35 33 36 const createTrackEvent: JetstreamEvent = { 34 - did: SAMPLE_DID, 35 - time_us: Date.now() * 1000, 36 - kind: "commit", 37 - commit: { 38 - operation: "create", 39 - collection: SAMPLE_COLLECTION, 40 - rkey: SAMPLE_RKEY, 41 - rev: "3jui7kd54zh2i", 42 - cid: SAMPLE_CID, 43 - record: { 44 - $type: "ca.ansxor.catnip.track", 45 - title: "Midnight Echoes", 46 - createdAt: new Date().toISOString(), 47 - audio: { 48 - $type: "blob", 49 - ref: { $link: "bafyreig2fjxi3a743bb54z2at6lsv2brz4f5d3hefma6ludqt5d2k2ofey" }, 50 - mimeType: "audio/ogg", 51 - size: 5_000_000, 52 - }, 53 - } satisfies InferOutput<typeof CaAnsxorCatnipTrack.mainSchema>, 54 - }, 37 + did: SAMPLE_DID, 38 + time_us: Date.now() * 1000, 39 + kind: "commit", 40 + commit: { 41 + operation: "create", 42 + collection: SAMPLE_COLLECTION, 43 + rkey: SAMPLE_RKEY, 44 + rev: "3jui7kd54zh2i", 45 + cid: SAMPLE_CID, 46 + record: { 47 + $type: "ca.ansxor.catnip.track", 48 + title: "Midnight Echoes", 49 + createdAt: new Date().toISOString(), 50 + audio: { 51 + $type: "blob", 52 + ref: { 53 + $link: "bafyreig2fjxi3a743bb54z2at6lsv2brz4f5d3hefma6ludqt5d2k2ofey", 54 + }, 55 + mimeType: "audio/ogg", 56 + size: 5_000_000, 57 + }, 58 + } satisfies InferOutput<typeof CaAnsxorCatnipTrack.mainSchema>, 59 + }, 55 60 }; 56 61 57 62 test("can query tracks", async () => { 58 - const tracks = await db.query.tracks.findMany(); 59 - expect(tracks).toEqual([]); 63 + const tracks = await db.query.tracks.findMany(); 64 + expect(tracks).toEqual([]); 60 65 }); 61 66 62 67 test("inserts track from create commit event", async () => { 63 - const subscription = mockSubscription([createTrackEvent]); 64 - await loop(subscription); 68 + const subscription = mockSubscription([createTrackEvent]); 69 + await loop(subscription, db); 65 70 66 - const tracks = await db.query.tracks.findMany(); 67 - expect(tracks).toHaveLength(1); 68 - const track = tracks[0]; 69 - expect(track).not.toBeUndefined(); 70 - expect(track!.uri).toBe(`at://${SAMPLE_DID}/${SAMPLE_COLLECTION}/${SAMPLE_RKEY}`); 71 - expect(track!.did).toBe(SAMPLE_DID); 72 - expect(track!.rkey).toBe(SAMPLE_RKEY); 73 - expect(track!.cid).toBe(SAMPLE_CID); 74 - expect(track!.title).toBe("Midnight Echoes"); 71 + const tracks = await db.query.tracks.findMany(); 72 + expect(tracks).toHaveLength(1); 73 + const track = tracks[0]; 74 + expect(track).not.toBeUndefined(); 75 + expect(track!.uri).toBe( 76 + `at://${SAMPLE_DID}/${SAMPLE_COLLECTION}/${SAMPLE_RKEY}`, 77 + ); 78 + expect(track!.did).toBe(SAMPLE_DID); 79 + expect(track!.rkey).toBe(SAMPLE_RKEY); 80 + expect(track!.cid).toBe(SAMPLE_CID); 81 + expect(track!.title).toBe("Midnight Echoes"); 75 82 });
+1
bun.lock
··· 41 41 "@atcute/jetstream": "^1.1.2", 42 42 "@atcute/lexicons": "^1.2.9", 43 43 "db": "workspace:*", 44 + "drizzle-orm": "^0.45.1", 44 45 "lexicon": "workspace:*", 45 46 }, 46 47 "devDependencies": {