a tool for shared writing and social publishing
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

use pg and pool connections in push

+66 -50
+66 -50
app/api/rpc/[command]/push.ts
··· 6 6 import { makeRoute } from "../lib"; 7 7 import { z } from "zod"; 8 8 import type { Env } from "./route"; 9 - import postgres from "postgres"; 10 - import { drizzle } from "drizzle-orm/postgres-js"; 9 + import { drizzle } from "drizzle-orm/node-postgres"; 10 + 11 + import { Pool } from "pg"; 12 + 13 + import { attachDatabasePool } from "@vercel/functions"; 14 + import { DbPool } from "@vercel/functions/db-connections"; 11 15 12 16 const mutationV0Schema = z.object({ 13 17 id: z.number(), ··· 44 48 45 49 type PushRequestZ = z.infer<typeof pushRequestSchema>; 46 50 51 + const pool = new Pool({ 52 + idleTimeoutMillis: 5000, 53 + min: 1, 54 + connectionString: process.env.DATABASE_URL, 55 + }); 56 + 57 + // Attach the pool to ensure idle connections close before suspension 58 + attachDatabasePool(pool as DbPool); 59 + 47 60 export const push = makeRoute({ 48 61 route: "push", 49 62 input: z.object({ ··· 58 71 }; 59 72 } 60 73 61 - const client = postgres(process.env.DB_URL as string, { idle_timeout: 5 }); 74 + let client = await pool.connect(); 62 75 const db = drizzle(client); 63 76 64 - await db.transaction(async (tx) => { 65 - let clientGroup = await getClientGroup(tx, pushRequest.clientGroupID); 66 - let token_rights = await tx 67 - .select() 68 - .from(permission_token_rights) 69 - .where(eq(permission_token_rights.token, token.id)); 70 - for (let mutation of pushRequest.mutations) { 71 - let lastMutationID = clientGroup[mutation.clientID] || 0; 72 - if (mutation.id <= lastMutationID) continue; 73 - clientGroup[mutation.clientID] = mutation.id; 74 - let name = mutation.name as keyof typeof mutations; 75 - if (!mutations[name]) { 76 - continue; 77 - } 78 - try { 79 - await mutations[name]( 80 - mutation.args as any, 81 - serverMutationContext(tx, token.id, token_rights), 82 - ); 83 - } catch (e) { 84 - console.log( 85 - `Error occured while running mutation: ${name}`, 86 - JSON.stringify(e), 87 - JSON.stringify(mutation, null, 2), 88 - ); 77 + let channel = supabase.channel(`rootEntity:${rootEntity}`); 78 + try { 79 + await db.transaction(async (tx) => { 80 + let clientGroup = await getClientGroup(tx, pushRequest.clientGroupID); 81 + let token_rights = await tx 82 + .select() 83 + .from(permission_token_rights) 84 + .where(eq(permission_token_rights.token, token.id)); 85 + for (let mutation of pushRequest.mutations) { 86 + let lastMutationID = clientGroup[mutation.clientID] || 0; 87 + if (mutation.id <= lastMutationID) continue; 88 + clientGroup[mutation.clientID] = mutation.id; 89 + let name = mutation.name as keyof typeof mutations; 90 + if (!mutations[name]) { 91 + continue; 92 + } 93 + try { 94 + await mutations[name]( 95 + mutation.args as any, 96 + serverMutationContext(tx, token.id, token_rights), 97 + ); 98 + } catch (e) { 99 + console.log( 100 + `Error occured while running mutation: ${name}`, 101 + JSON.stringify(e), 102 + JSON.stringify(mutation, null, 2), 103 + ); 104 + } 105 + await tx 106 + .insert(replicache_clients) 107 + .values({ 108 + client_group: pushRequest.clientGroupID, 109 + client_id: mutation.clientID, 110 + last_mutation: mutation.id, 111 + }) 112 + .onConflictDoUpdate({ 113 + target: replicache_clients.client_id, 114 + set: { last_mutation: mutation.id }, 115 + }); 89 116 } 90 - await tx 91 - .insert(replicache_clients) 92 - .values({ 93 - client_group: pushRequest.clientGroupID, 94 - client_id: mutation.clientID, 95 - last_mutation: mutation.id, 96 - }) 97 - .onConflictDoUpdate({ 98 - target: replicache_clients.client_id, 99 - set: { last_mutation: mutation.id }, 100 - }); 101 - } 102 - }); 117 + }); 103 118 104 - let channel = supabase.channel(`rootEntity:${rootEntity}`); 105 - await channel.send({ 106 - type: "broadcast", 107 - event: "poke", 108 - payload: { message: "poke" }, 109 - }); 110 - client.end(); 111 - supabase.removeChannel(channel); 112 - return { result: undefined } as const; 119 + await channel.send({ 120 + type: "broadcast", 121 + event: "poke", 122 + payload: { message: "poke" }, 123 + }); 124 + } finally { 125 + client.release(); 126 + supabase.removeChannel(channel); 127 + return { result: undefined } as const; 128 + } 113 129 }, 114 130 });