an independent Bluesky client using Constellation, PDS Queries, and other services reddwarf.app
frontend spa bluesky reddwarf microcosm client app
94
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 166 lines 5.8 kB view raw
1import { useAtom, useAtomValue } from "jotai"; 2import { useEffect, useRef } from "react"; 3 4import { fetchLabelsBatch } from "~/api/moderation"; 5import { 6 CACHE_TIMEOUT_MS, 7 labelerConfigAtom, 8 moderationCacheAtom, 9 pendingUriQueueAtom, 10 processingUriSetAtom, 11} from "~/state/moderationAtoms"; 12 13const BATCH_CHUNK_SIZE = 25; 14 15export const ModerationBatcher = () => { 16 const [queue, setQueue] = useAtom(pendingUriQueueAtom); 17 const [processingSet, setProcessingSet] = useAtom(processingUriSetAtom); 18 const [cache, setCache] = useAtom(moderationCacheAtom); 19 const labelers = useAtomValue(labelerConfigAtom); 20 21 const stateRef = useRef({ queue, processingSet, cache, labelers }); 22 useEffect(() => { 23 stateRef.current = { queue, processingSet, cache, labelers }; 24 }, [queue, processingSet, cache, labelers]); 25 26 useEffect(() => { 27 const interval = setInterval(async () => { 28 const { 29 queue: currentQueue, 30 processingSet: currentProcessing, 31 cache: currentCache, 32 labelers: currentLabelers, 33 } = stateRef.current; 34 35 if (currentQueue.size === 0 || currentLabelers.length === 0) return; 36 37 const now = Date.now(); 38 39 // 1. Identify stale items 40 const batchUris = Array.from(currentQueue).filter((uri) => { 41 const entry = currentCache.get(uri); 42 const isStale = entry ? now - entry.timestamp > CACHE_TIMEOUT_MS : true; 43 return !currentProcessing.has(uri) && isStale; 44 }); 45 46 if (batchUris.length === 0) return; 47 48 console.log(`[Batcher] Processing ${batchUris.length} URIs...`); 49 50 // 2. Lock items 51 setProcessingSet((prev) => { 52 const next = new Set(prev); 53 batchUris.forEach((u) => next.add(u)); 54 return next; 55 }); 56 setQueue((prev) => { 57 const next = new Set(prev); 58 batchUris.forEach((u) => next.delete(u)); 59 return next; 60 }); 61 62 // 3. Process chunks 63 const chunks = []; 64 for (let i = 0; i < batchUris.length; i += BATCH_CHUNK_SIZE) { 65 chunks.push(batchUris.slice(i, i + BATCH_CHUNK_SIZE)); 66 } 67 68 for (const chunk of chunks) { 69 try { 70 const results = await Promise.allSettled( 71 currentLabelers.map((l) => fetchLabelsBatch(l.url, chunk)), 72 ); 73 74 setCache((prevCache) => { 75 const nextCache = new Map(prevCache); 76 const updateTime = Date.now(); 77 78 // A. Initialize requested URIs (to remove loading state) 79 chunk.forEach((uri) => { 80 if (!nextCache.has(uri) || nextCache.get(uri)!.timestamp < updateTime) { 81 nextCache.set(uri, { labels: [], timestamp: updateTime }); 82 } 83 }); 84 85 // B. Process Results 86 results.forEach((res, index) => { 87 if (res.status === "fulfilled") { 88 const labeler = currentLabelers[index]; 89 const rawLabels = res.value.labels || []; 90 91 // --- REDUCTION LOGIC START --- 92 93 // 1. Group by URI 94 const labelsByUri = new Map<string, typeof rawLabels>(); 95 rawLabels.forEach((l) => { 96 if (!labelsByUri.has(l.uri)) labelsByUri.set(l.uri, []); 97 labelsByUri.get(l.uri)!.push(l); 98 }); 99 100 // 2. Process each URI's history 101 labelsByUri.forEach((labels, uri) => { 102 // Only process if this URI is actually in our cache/interest 103 if (!nextCache.has(uri)) return; 104 const cacheEntry = nextCache.get(uri)!; 105 106 // 3. Find latest state per (Source + Value) 107 // Key: "did:plc:xyz::porn" -> Latest Label Object 108 const latestState = new Map<string, typeof rawLabels[0]>(); 109 110 labels.forEach((l) => { 111 const key = `${l.src}::${l.val}`; 112 const existing = latestState.get(key); 113 114 const currentCts = new Date(l.cts).getTime(); 115 const existingCts = existing ? new Date(existing.cts).getTime() : 0; 116 117 if (!existing || currentCts > existingCts) { 118 latestState.set(key, l); 119 } 120 }); 121 122 // 4. Push only active (non-negated) labels 123 for (const activeLabel of latestState.values()) { 124 if (activeLabel.neg) continue; // Skip deleted labels 125 126 // Resolve preference from the Labeler Config (our subscription) 127 // Note: We attribute the label to the 'labeler.did' (the service we subscribed to) 128 // even if the signer (src) is different, because prefs are attached to the service. 129 const resolvedPref = 130 labeler.supportedLabels?.[activeLabel.val] || "ignore"; 131 132 cacheEntry.labels.push({ 133 sourceDid: labeler.did, 134 val: activeLabel.val, 135 cts: activeLabel.cts, 136 preference: resolvedPref, 137 }); 138 } 139 }); 140 // --- REDUCTION LOGIC END --- 141 142 } else { 143 console.error(`[Batcher] Labeler ${currentLabelers[index].url} failed:`, res.reason); 144 } 145 }); 146 147 return nextCache; 148 }); 149 } catch (e) { 150 console.error("[Batcher] Chunk failed", e); 151 } 152 } 153 154 // 5. Release Lock 155 setProcessingSet((prev) => { 156 const next = new Set(prev); 157 batchUris.forEach((u) => next.delete(u)); 158 return next; 159 }); 160 }, 2000); 161 162 return () => clearInterval(interval); 163 }, []); 164 165 return null; 166};