an independent Bluesky client using Constellation, PDS Queries, and other services
reddwarf.app
frontend
spa
bluesky
reddwarf
microcosm
client
app
1import { useAtom, useAtomValue } from "jotai";
2import { useEffect, useRef } from "react";
3
4import { fetchLabelsBatch } from "~/api/moderation";
5import {
6 CACHE_TIMEOUT_MS,
7 labelerConfigAtom,
8 moderationCacheAtom,
9 pendingUriQueueAtom,
10 processingUriSetAtom,
11} from "~/state/moderationAtoms";
12
13const BATCH_CHUNK_SIZE = 25;
14
15export const ModerationBatcher = () => {
16 const [queue, setQueue] = useAtom(pendingUriQueueAtom);
17 const [processingSet, setProcessingSet] = useAtom(processingUriSetAtom);
18 const [cache, setCache] = useAtom(moderationCacheAtom);
19 const labelers = useAtomValue(labelerConfigAtom);
20
21 const stateRef = useRef({ queue, processingSet, cache, labelers });
22 useEffect(() => {
23 stateRef.current = { queue, processingSet, cache, labelers };
24 }, [queue, processingSet, cache, labelers]);
25
26 useEffect(() => {
27 const interval = setInterval(async () => {
28 const {
29 queue: currentQueue,
30 processingSet: currentProcessing,
31 cache: currentCache,
32 labelers: currentLabelers,
33 } = stateRef.current;
34
35 if (currentQueue.size === 0 || currentLabelers.length === 0) return;
36
37 const now = Date.now();
38
39 // 1. Identify stale items
40 const batchUris = Array.from(currentQueue).filter((uri) => {
41 const entry = currentCache.get(uri);
42 const isStale = entry ? now - entry.timestamp > CACHE_TIMEOUT_MS : true;
43 return !currentProcessing.has(uri) && isStale;
44 });
45
46 if (batchUris.length === 0) return;
47
48 console.log(`[Batcher] Processing ${batchUris.length} URIs...`);
49
50 // 2. Lock items
51 setProcessingSet((prev) => {
52 const next = new Set(prev);
53 batchUris.forEach((u) => next.add(u));
54 return next;
55 });
56 setQueue((prev) => {
57 const next = new Set(prev);
58 batchUris.forEach((u) => next.delete(u));
59 return next;
60 });
61
62 // 3. Process chunks
63 const chunks = [];
64 for (let i = 0; i < batchUris.length; i += BATCH_CHUNK_SIZE) {
65 chunks.push(batchUris.slice(i, i + BATCH_CHUNK_SIZE));
66 }
67
68 for (const chunk of chunks) {
69 try {
70 const results = await Promise.allSettled(
71 currentLabelers.map((l) => fetchLabelsBatch(l.url, chunk)),
72 );
73
74 setCache((prevCache) => {
75 const nextCache = new Map(prevCache);
76 const updateTime = Date.now();
77
78 // A. Initialize requested URIs (to remove loading state)
79 chunk.forEach((uri) => {
80 if (!nextCache.has(uri) || nextCache.get(uri)!.timestamp < updateTime) {
81 nextCache.set(uri, { labels: [], timestamp: updateTime });
82 }
83 });
84
85 // B. Process Results
86 results.forEach((res, index) => {
87 if (res.status === "fulfilled") {
88 const labeler = currentLabelers[index];
89 const rawLabels = res.value.labels || [];
90
91 // --- REDUCTION LOGIC START ---
92
93 // 1. Group by URI
94 const labelsByUri = new Map<string, typeof rawLabels>();
95 rawLabels.forEach((l) => {
96 if (!labelsByUri.has(l.uri)) labelsByUri.set(l.uri, []);
97 labelsByUri.get(l.uri)!.push(l);
98 });
99
100 // 2. Process each URI's history
101 labelsByUri.forEach((labels, uri) => {
102 // Only process if this URI is actually in our cache/interest
103 if (!nextCache.has(uri)) return;
104 const cacheEntry = nextCache.get(uri)!;
105
106 // 3. Find latest state per (Source + Value)
107 // Key: "did:plc:xyz::porn" -> Latest Label Object
108 const latestState = new Map<string, typeof rawLabels[0]>();
109
110 labels.forEach((l) => {
111 const key = `${l.src}::${l.val}`;
112 const existing = latestState.get(key);
113
114 const currentCts = new Date(l.cts).getTime();
115 const existingCts = existing ? new Date(existing.cts).getTime() : 0;
116
117 if (!existing || currentCts > existingCts) {
118 latestState.set(key, l);
119 }
120 });
121
122 // 4. Push only active (non-negated) labels
123 for (const activeLabel of latestState.values()) {
124 if (activeLabel.neg) continue; // Skip deleted labels
125
126 // Resolve preference from the Labeler Config (our subscription)
127 // Note: We attribute the label to the 'labeler.did' (the service we subscribed to)
128 // even if the signer (src) is different, because prefs are attached to the service.
129 const resolvedPref =
130 labeler.supportedLabels?.[activeLabel.val] || "ignore";
131
132 cacheEntry.labels.push({
133 sourceDid: labeler.did,
134 val: activeLabel.val,
135 cts: activeLabel.cts,
136 preference: resolvedPref,
137 });
138 }
139 });
140 // --- REDUCTION LOGIC END ---
141
142 } else {
143 console.error(`[Batcher] Labeler ${currentLabelers[index].url} failed:`, res.reason);
144 }
145 });
146
147 return nextCache;
148 });
149 } catch (e) {
150 console.error("[Batcher] Chunk failed", e);
151 }
152 }
153
154 // 5. Release Lock
155 setProcessingSet((prev) => {
156 const next = new Set(prev);
157 batchUris.forEach((u) => next.delete(u));
158 return next;
159 });
160 }, 2000);
161
162 return () => clearInterval(interval);
163 }, []);
164
165 return null;
166};