atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 234 lines 6.6 kB view raw
1/** 2 * Manages replication offers: publishing, discovery, agreement detection, 3 * and policy generation from mutual agreements. 4 */ 5 6import type { PeerDiscovery } from "./peer-discovery.js"; 7import type { PolicyEngine } from "../policy/engine.js"; 8import type { Policy } from "../policy/types.js"; 9import { 10 OFFER_NSID, 11 didToRkey, 12 type OfferRecord, 13} from "./types.js"; 14 15/** 16 * Interface for record read/write operations. 17 * Both RepoManager (local) and PdsClient (remote) satisfy this interface. 18 */ 19export interface RecordWriter { 20 putRecord(collection: string, rkey: string, record: unknown): Promise<unknown>; 21 deleteRecord(collection: string, rkey: string): Promise<unknown>; 22 listRecords(collection: string, opts: { limit: number }): Promise<{ 23 records: Array<{ uri: string; cid: string; value: unknown }>; 24 }>; 25} 26 27/** A detected mutual replication agreement between two peers. */ 28export interface Agreement { 29 counterpartyDid: string; 30 localOffer: OfferRecord; 31 remoteOffer: OfferRecord; 32 effectiveParams: { minCopies: number; intervalSec: number; priority: number }; 33} 34 35/** Prefix for policy IDs generated from P2P agreements. */ 36const P2P_POLICY_PREFIX = "p2p:"; 37 38export class OfferManager { 39 constructor( 40 private recordWriter: RecordWriter, 41 private peerDiscovery: PeerDiscovery, 42 private policyEngine: PolicyEngine, 43 private localDid: string, 44 ) {} 45 46 /** 47 * Publish (or update) a replication offer for a subject DID. 48 */ 49 async publishOffer( 50 subject: string, 51 params?: { minCopies?: number; intervalSec?: number; priority?: number }, 52 ): Promise<OfferRecord> { 53 const record: OfferRecord = { 54 $type: OFFER_NSID, 55 subject, 56 minCopies: params?.minCopies ?? 2, 57 intervalSec: params?.intervalSec ?? 600, 58 priority: params?.priority ?? 50, 59 createdAt: new Date().toISOString(), 60 }; 61 62 await this.recordWriter.putRecord(OFFER_NSID, didToRkey(subject), record); 63 return record; 64 } 65 66 /** 67 * Revoke a replication offer and remove any derived policy. 68 */ 69 async revokeOffer(subject: string): Promise<void> { 70 await this.recordWriter.deleteRecord(OFFER_NSID, didToRkey(subject)); 71 // Remove the P2P policy derived from this offer 72 this.policyEngine.removePolicy(`${P2P_POLICY_PREFIX}${subject}`); 73 } 74 75 /** 76 * List all local offers from our repo. 77 */ 78 async getLocalOffers(): Promise<OfferRecord[]> { 79 const result = await this.recordWriter.listRecords(OFFER_NSID, { 80 limit: 100, 81 }); 82 return result.records 83 .map((r) => r.value) 84 .filter( 85 (v): v is OfferRecord => 86 typeof v === "object" && 87 v !== null && 88 (v as Record<string, unknown>).$type === OFFER_NSID, 89 ); 90 } 91 92 /** 93 * Discover mutual agreements with a set of peers. 94 * 95 * A mutual agreement exists when: 96 * 1. The remote peer has an offer where subject === our localDid 97 * 2. We have a local offer where subject === the remote peer's DID 98 */ 99 async discoverAgreements( 100 peers: Array<{ did: string; pdsEndpoint: string }>, 101 ): Promise<Agreement[]> { 102 const localOffers = await this.getLocalOffers(); 103 const localOffersBySubject = new Map<string, OfferRecord>(); 104 for (const offer of localOffers) { 105 localOffersBySubject.set(offer.subject, offer); 106 } 107 108 const agreements: Agreement[] = []; 109 110 for (const peer of peers) { 111 // Check if we have an offer for this peer's DID 112 const localOffer = localOffersBySubject.get(peer.did); 113 if (!localOffer) continue; 114 115 // Discover their offers 116 let remoteOffers: OfferRecord[]; 117 try { 118 remoteOffers = await this.peerDiscovery.discoverOffers( 119 peer.did, 120 peer.pdsEndpoint, 121 ); 122 } catch { 123 continue; // Skip peers we can't reach 124 } 125 126 // Check if they have an offer for our DID 127 const remoteOffer = remoteOffers.find( 128 (o) => o.subject === this.localDid, 129 ); 130 if (!remoteOffer) continue; 131 132 // Mutual agreement detected 133 agreements.push({ 134 counterpartyDid: peer.did, 135 localOffer, 136 remoteOffer, 137 effectiveParams: mergeOfferParams(localOffer, remoteOffer), 138 }); 139 } 140 141 return agreements; 142 } 143 144 /** 145 * Sync policies from detected agreements. 146 * 147 * - Adds P2P policies for new agreements 148 * - Updates existing P2P policies if params changed 149 * - Removes stale P2P policies whose agreements no longer exist 150 */ 151 syncPolicies(agreements: Agreement[]): void { 152 const activeAgreementIds = new Set<string>(); 153 154 for (const agreement of agreements) { 155 const policyId = `${P2P_POLICY_PREFIX}${agreement.counterpartyDid}`; 156 activeAgreementIds.add(policyId); 157 158 const policy: Policy = { 159 id: policyId, 160 name: `P2P agreement with ${agreement.counterpartyDid}`, 161 target: { type: "list", dids: [agreement.counterpartyDid] }, 162 replication: { 163 minCopies: agreement.effectiveParams.minCopies, 164 }, 165 sync: { 166 intervalSec: agreement.effectiveParams.intervalSec, 167 }, 168 retention: { 169 maxAgeSec: 0, 170 keepHistory: false, 171 }, 172 priority: agreement.effectiveParams.priority, 173 enabled: true, 174 }; 175 176 // Check if policy already exists with same params 177 const existing = this.policyEngine 178 .getPolicies() 179 .find((p) => p.id === policyId); 180 181 if (existing) { 182 // Update only if params changed 183 if ( 184 existing.replication.minCopies !== policy.replication.minCopies || 185 existing.sync.intervalSec !== policy.sync.intervalSec || 186 existing.priority !== policy.priority 187 ) { 188 this.policyEngine.removePolicy(policyId); 189 this.policyEngine.addPolicy(policy); 190 } 191 } else { 192 this.policyEngine.addPolicy(policy); 193 } 194 } 195 196 // Remove stale P2P policies 197 for (const existing of this.policyEngine.getPolicies()) { 198 if ( 199 existing.id.startsWith(P2P_POLICY_PREFIX) && 200 !activeAgreementIds.has(existing.id) 201 ) { 202 this.policyEngine.removePolicy(existing.id); 203 } 204 } 205 } 206 207 /** 208 * Run the full discover-and-sync cycle: discover agreements, then sync policies. 209 */ 210 async discoverAndSync( 211 peers: Array<{ did: string; pdsEndpoint: string }>, 212 ): Promise<Agreement[]> { 213 const agreements = await this.discoverAgreements(peers); 214 this.syncPolicies(agreements); 215 return agreements; 216 } 217} 218 219/** 220 * Merge parameters from two offers into effective params. 221 * - minCopies: max (most protective) 222 * - intervalSec: min (most frequent) 223 * - priority: max (highest importance) 224 */ 225function mergeOfferParams( 226 a: OfferRecord, 227 b: OfferRecord, 228): { minCopies: number; intervalSec: number; priority: number } { 229 return { 230 minCopies: Math.max(a.minCopies, b.minCopies), 231 intervalSec: Math.min(a.intervalSec, b.intervalSec), 232 priority: Math.max(a.priority, b.priority), 233 }; 234}