atproto user agency toolkit for individuals and groups
1/**
2 * Manages replication offers: publishing, discovery, agreement detection,
3 * and policy generation from mutual agreements.
4 */
5
6import type { PeerDiscovery } from "./peer-discovery.js";
7import type { PolicyEngine } from "../policy/engine.js";
8import type { Policy } from "../policy/types.js";
9import {
10 OFFER_NSID,
11 didToRkey,
12 type OfferRecord,
13} from "./types.js";
14
15/**
16 * Interface for record read/write operations.
17 * Both RepoManager (local) and PdsClient (remote) satisfy this interface.
18 */
19export interface RecordWriter {
20 putRecord(collection: string, rkey: string, record: unknown): Promise<unknown>;
21 deleteRecord(collection: string, rkey: string): Promise<unknown>;
22 listRecords(collection: string, opts: { limit: number }): Promise<{
23 records: Array<{ uri: string; cid: string; value: unknown }>;
24 }>;
25}
26
27/** A detected mutual replication agreement between two peers. */
28export interface Agreement {
29 counterpartyDid: string;
30 localOffer: OfferRecord;
31 remoteOffer: OfferRecord;
32 effectiveParams: { minCopies: number; intervalSec: number; priority: number };
33}
34
35/** Prefix for policy IDs generated from P2P agreements. */
36const P2P_POLICY_PREFIX = "p2p:";
37
38export class OfferManager {
39 constructor(
40 private recordWriter: RecordWriter,
41 private peerDiscovery: PeerDiscovery,
42 private policyEngine: PolicyEngine,
43 private localDid: string,
44 ) {}
45
46 /**
47 * Publish (or update) a replication offer for a subject DID.
48 */
49 async publishOffer(
50 subject: string,
51 params?: { minCopies?: number; intervalSec?: number; priority?: number },
52 ): Promise<OfferRecord> {
53 const record: OfferRecord = {
54 $type: OFFER_NSID,
55 subject,
56 minCopies: params?.minCopies ?? 2,
57 intervalSec: params?.intervalSec ?? 600,
58 priority: params?.priority ?? 50,
59 createdAt: new Date().toISOString(),
60 };
61
62 await this.recordWriter.putRecord(OFFER_NSID, didToRkey(subject), record);
63 return record;
64 }
65
66 /**
67 * Revoke a replication offer and remove any derived policy.
68 */
69 async revokeOffer(subject: string): Promise<void> {
70 await this.recordWriter.deleteRecord(OFFER_NSID, didToRkey(subject));
71 // Remove the P2P policy derived from this offer
72 this.policyEngine.removePolicy(`${P2P_POLICY_PREFIX}${subject}`);
73 }
74
75 /**
76 * List all local offers from our repo.
77 */
78 async getLocalOffers(): Promise<OfferRecord[]> {
79 const result = await this.recordWriter.listRecords(OFFER_NSID, {
80 limit: 100,
81 });
82 return result.records
83 .map((r) => r.value)
84 .filter(
85 (v): v is OfferRecord =>
86 typeof v === "object" &&
87 v !== null &&
88 (v as Record<string, unknown>).$type === OFFER_NSID,
89 );
90 }
91
92 /**
93 * Discover mutual agreements with a set of peers.
94 *
95 * A mutual agreement exists when:
96 * 1. The remote peer has an offer where subject === our localDid
97 * 2. We have a local offer where subject === the remote peer's DID
98 */
99 async discoverAgreements(
100 peers: Array<{ did: string; pdsEndpoint: string }>,
101 ): Promise<Agreement[]> {
102 const localOffers = await this.getLocalOffers();
103 const localOffersBySubject = new Map<string, OfferRecord>();
104 for (const offer of localOffers) {
105 localOffersBySubject.set(offer.subject, offer);
106 }
107
108 const agreements: Agreement[] = [];
109
110 for (const peer of peers) {
111 // Check if we have an offer for this peer's DID
112 const localOffer = localOffersBySubject.get(peer.did);
113 if (!localOffer) continue;
114
115 // Discover their offers
116 let remoteOffers: OfferRecord[];
117 try {
118 remoteOffers = await this.peerDiscovery.discoverOffers(
119 peer.did,
120 peer.pdsEndpoint,
121 );
122 } catch {
123 continue; // Skip peers we can't reach
124 }
125
126 // Check if they have an offer for our DID
127 const remoteOffer = remoteOffers.find(
128 (o) => o.subject === this.localDid,
129 );
130 if (!remoteOffer) continue;
131
132 // Mutual agreement detected
133 agreements.push({
134 counterpartyDid: peer.did,
135 localOffer,
136 remoteOffer,
137 effectiveParams: mergeOfferParams(localOffer, remoteOffer),
138 });
139 }
140
141 return agreements;
142 }
143
144 /**
145 * Sync policies from detected agreements.
146 *
147 * - Adds P2P policies for new agreements
148 * - Updates existing P2P policies if params changed
149 * - Removes stale P2P policies whose agreements no longer exist
150 */
151 syncPolicies(agreements: Agreement[]): void {
152 const activeAgreementIds = new Set<string>();
153
154 for (const agreement of agreements) {
155 const policyId = `${P2P_POLICY_PREFIX}${agreement.counterpartyDid}`;
156 activeAgreementIds.add(policyId);
157
158 const policy: Policy = {
159 id: policyId,
160 name: `P2P agreement with ${agreement.counterpartyDid}`,
161 target: { type: "list", dids: [agreement.counterpartyDid] },
162 replication: {
163 minCopies: agreement.effectiveParams.minCopies,
164 },
165 sync: {
166 intervalSec: agreement.effectiveParams.intervalSec,
167 },
168 retention: {
169 maxAgeSec: 0,
170 keepHistory: false,
171 },
172 priority: agreement.effectiveParams.priority,
173 enabled: true,
174 };
175
176 // Check if policy already exists with same params
177 const existing = this.policyEngine
178 .getPolicies()
179 .find((p) => p.id === policyId);
180
181 if (existing) {
182 // Update only if params changed
183 if (
184 existing.replication.minCopies !== policy.replication.minCopies ||
185 existing.sync.intervalSec !== policy.sync.intervalSec ||
186 existing.priority !== policy.priority
187 ) {
188 this.policyEngine.removePolicy(policyId);
189 this.policyEngine.addPolicy(policy);
190 }
191 } else {
192 this.policyEngine.addPolicy(policy);
193 }
194 }
195
196 // Remove stale P2P policies
197 for (const existing of this.policyEngine.getPolicies()) {
198 if (
199 existing.id.startsWith(P2P_POLICY_PREFIX) &&
200 !activeAgreementIds.has(existing.id)
201 ) {
202 this.policyEngine.removePolicy(existing.id);
203 }
204 }
205 }
206
207 /**
208 * Run the full discover-and-sync cycle: discover agreements, then sync policies.
209 */
210 async discoverAndSync(
211 peers: Array<{ did: string; pdsEndpoint: string }>,
212 ): Promise<Agreement[]> {
213 const agreements = await this.discoverAgreements(peers);
214 this.syncPolicies(agreements);
215 return agreements;
216 }
217}
218
219/**
220 * Merge parameters from two offers into effective params.
221 * - minCopies: max (most protective)
222 * - intervalSec: min (most frequent)
223 * - priority: max (highest importance)
224 */
225function mergeOfferParams(
226 a: OfferRecord,
227 b: OfferRecord,
228): { minCopies: number; intervalSec: number; priority: number } {
229 return {
230 minCopies: Math.max(a.minCopies, b.minCopies),
231 intervalSec: Math.min(a.intervalSec, b.intervalSec),
232 priority: Math.max(a.priority, b.priority),
233 };
234}