atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 190 lines 4.9 kB view raw
1/** 2 * Periodic challenge orchestration. 3 * 4 * Reads from PolicyEngine + SyncStorage to derive challenge schedules, 5 * then issues challenges via ChallengeTransport and records results. 6 * 7 * Policy integration boundary: 8 * - preferredPeers → who to challenge 9 * - minCopies - 1 → required successful challenges 10 * - priority >= 70 → use "combined" type (otherwise "mst-proof") 11 * - intervalSec * 2 → challenge frequency (2x sync interval) 12 */ 13 14import type { PolicyEngine } from "../../policy/engine.js"; 15import type { SyncStorage } from "../sync-storage.js"; 16import type { BlockStore } from "../../ipfs.js"; 17import type { 18 ChallengeConfig, 19 ChallengeSchedule, 20 ChallengeType, 21} from "./types.js"; 22import { DEFAULT_CHALLENGE_CONFIG } from "./types.js"; 23import { generateChallenge, computeEpoch } from "./challenge-generator.js"; 24import { verifyResponse } from "./challenge-verifier.js"; 25import { ChallengeStorage } from "./challenge-storage.js"; 26import type { ChallengeTransport } from "./transport.js"; 27 28export class ChallengeScheduler { 29 private timer: ReturnType<typeof setInterval> | null = null; 30 private stopped = false; 31 private config: ChallengeConfig; 32 33 constructor( 34 private challengerDid: string, 35 private policyEngine: PolicyEngine | null, 36 private syncStorage: SyncStorage, 37 private challengeStorage: ChallengeStorage, 38 private blockStore: BlockStore, 39 private transport: ChallengeTransport, 40 config?: Partial<ChallengeConfig>, 41 ) { 42 this.config = { ...DEFAULT_CHALLENGE_CONFIG, ...config }; 43 } 44 45 /** 46 * Derive challenge schedules from policy configuration. 47 * 48 * Reads existing policy fields to determine: 49 * - Who to challenge (preferredPeers) 50 * - How many must pass (minCopies - 1) 51 * - What type (priority >= 70 → combined, else mst-proof) 52 * - How often (2x sync interval) 53 */ 54 deriveSchedules(): ChallengeSchedule[] { 55 if (!this.policyEngine) return []; 56 57 const schedules: ChallengeSchedule[] = []; 58 const states = this.syncStorage.getAllStates(); 59 60 for (const state of states) { 61 const effective = this.policyEngine.evaluate(state.did); 62 if (!effective.shouldReplicate) continue; 63 64 const targetPeers = effective.replication.preferredPeers ?? []; 65 if (targetPeers.length === 0) continue; 66 67 const challengeType: ChallengeType = 68 effective.priority >= 70 ? "combined" : "mst-proof"; 69 const intervalMs = effective.sync.intervalSec * 2 * 1000; 70 71 schedules.push({ 72 subjectDid: state.did, 73 targetPeers, 74 requiredSuccesses: Math.max( 75 0, 76 effective.replication.minCopies - 1, 77 ), 78 challengeType, 79 intervalMs, 80 }); 81 } 82 83 return schedules; 84 } 85 86 /** 87 * Start periodic challenge orchestration. 88 */ 89 start(intervalMs?: number): void { 90 if (this.timer) return; 91 this.stopped = false; 92 93 const tickMs = intervalMs ?? 60_000; 94 95 this.timer = setInterval(() => { 96 if (!this.stopped) { 97 this.tick().catch((err) => { 98 console.error("[challenge-scheduler] tick error:", err); 99 }); 100 } 101 }, tickMs); 102 } 103 104 /** 105 * Stop the scheduler. 106 */ 107 stop(): void { 108 this.stopped = true; 109 if (this.timer) { 110 clearInterval(this.timer); 111 this.timer = null; 112 } 113 } 114 115 /** 116 * Execute one round of challenges based on derived schedules. 117 */ 118 async tick(): Promise<void> { 119 const schedules = this.deriveSchedules(); 120 121 for (const schedule of schedules) { 122 if (this.stopped) break; 123 124 const state = this.syncStorage.getState(schedule.subjectDid); 125 if (!state?.rootCid) continue; 126 127 const epoch = computeEpoch( 128 Date.now(), 129 this.config.epochDurationMs, 130 ); 131 132 const blockCids = this.syncStorage.getBlockCids( 133 schedule.subjectDid, 134 ); 135 const recordPaths = this.syncStorage.getRecordPaths( 136 schedule.subjectDid, 137 ); 138 139 for (const targetPeer of schedule.targetPeers) { 140 if (this.stopped) break; 141 142 try { 143 const challenge = generateChallenge({ 144 challengerDid: this.challengerDid, 145 targetDid: targetPeer, 146 subjectDid: schedule.subjectDid, 147 commitCid: state.rootCid, 148 availableRecordPaths: recordPaths, 149 availableBlockCids: blockCids, 150 challengeType: schedule.challengeType, 151 epoch, 152 config: this.config, 153 }); 154 155 const response = 156 await this.transport.sendChallenge( 157 state.pdsEndpoint, 158 challenge, 159 ); 160 161 const result = await verifyResponse( 162 challenge, 163 response, 164 this.blockStore, 165 ); 166 167 this.challengeStorage.recordResult( 168 this.challengerDid, 169 targetPeer, 170 schedule.subjectDid, 171 schedule.challengeType, 172 result, 173 ); 174 } catch (err) { 175 console.error( 176 `[challenge-scheduler] Challenge to ${targetPeer} for ${schedule.subjectDid} failed:`, 177 err instanceof Error ? err.message : String(err), 178 ); 179 } 180 } 181 } 182 } 183 184 /** 185 * Get the underlying ChallengeStorage instance. 186 */ 187 getChallengeStorage(): ChallengeStorage { 188 return this.challengeStorage; 189 } 190}