this repo has no description
Decentralized Library API Gateway: Technical Architecture Specification#
For conceptual overview, workflows, and non-technical explanations, see
humandocs.md.
Scope: This document contains implementation-level specifications only — code patterns, schemas, configurations, threat models, and performance contracts. It is intended for engineers building or maintaining the system.
1. Authentication: DPoP-Bound AT Protocol OAuth 2.0#
Implementation (axum + async-graphql)#
pub struct AtprotoAuth {
pds_discovery: PdsDiscoveryCache, // TTL-cached DID → PDS mapping
token_verifier: JwtVerifier, // Validates DPoP-bound JWTs
scope_enforcer: ScopeValidator, // Enforces lexicon-level permissions
}
impl AtprotoAuth {
pub async fn validate(&self, headers: &HeaderMap) -> Result<AuthContext, AuthError> {
let dpop_proof = headers.get("DPoP")
.ok_or(AuthError::MissingDpop)?;
let token = headers.get("Authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("DPoP "))
.ok_or(AuthError::InvalidAuthHeader)?;
let claims = self.token_verifier.verify_dpop_bound(token, dpop_proof).await?;
let did = claims.sub;
let did_doc = self.pds_discovery.resolve(&did).await?;
if did_doc.signing_key != claims.key_id {
return Err(AuthError::KeyRotationDetected);
}
self.scope_enforcer.validate(&claims.scope, requested_lexicon)?;
Ok(AuthContext { did, scopes: claims.scope })
}
}
Threat Model#
| Threat | Mitigation |
|---|---|
| Token replay | DPoP proof binding + 15-min JWT TTL |
| DID takeover | Real-time DID document validation + signature verification |
| Scope escalation | Server-side lexicon permission enforcement |
| PDS impersonation | TLS + certificate pinning for PDS endpoints |
Auth Configuration#
- DPoP binding: required on all authenticated endpoints
- DID resolution cache TTL: 300s (forced refresh on auth failure)
- Lexicon scope format:
com.example.library.<collection>:<action>(e.g.,poolItem:create,loanAgreement:read)
2. Firehose Subscription: Federation Sync Engine#
Implementation#
pub struct FirehoseSubscriber {
relay_endpoints: Vec<Url>,
cursor_store: CursorPersistence, // PostgreSQL-backed
backfill_queue: BackfillManager, // com.atproto.sync.getRepo for gaps
record_processor: AsyncRecordPipeline,
}
impl FirehoseSubscriber {
pub async fn run(&mut self) -> Result<(), SubscriptionError> {
loop {
let mut ws = self.connect_with_retry().await?;
let cursor = self.cursor_store.load().await?;
ws.send(SubscribeRepos { cursor: Some(cursor) }).await?;
while let Some(event) = ws.next().await {
match event {
Event::Commit(commit) => {
if !self.verify_commit_signature(&commit).await {
log::warn!("Invalid signature for DID {}", commit.repo);
continue;
}
if self.cursor_store.is_processed(&commit.repo, &commit.rev).await {
continue;
}
self.process_commit_ops(commit).await?;
self.cursor_store.save(&commit.repo, &commit.rev).await?;
}
Event::GapDetected { since, to } => {
self.backfill_queue.enqueue(commit.repo, since, to).await;
}
}
}
self.relay_endpoints.rotate_left(1);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
Cursor Persistence Schema#
CREATE TABLE firehose_cursors (
did TEXT PRIMARY KEY,
rev TEXT NOT NULL,
seq BIGINT NOT NULL,
processed_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(did, rev)
);
-- Idempotent upsert
INSERT INTO firehose_cursors (did, rev, seq)
VALUES ($1, $2, $3)
ON CONFLICT (did) DO UPDATE SET rev = $2, seq = $3, processed_at = now()
WHERE firehose_cursors.seq < $3;
Operational Metrics#
atproto_firehose_events_processed_total{lexicon="com.example.library.*"}
atproto_firehose_cursor_lag_seconds{did="..."}
atproto_firehose_signature_failures_total
atproto_backfill_queue_depth
atproto_backfill_success_rate
Reliability Constraints#
- Cursor persistence: PostgreSQL with
ON CONFLICTidempotency - Relay failover: 2-3 endpoints, rotate on connection failure
- Backfill:
com.atproto.sync.getRepofor CAR file retrieval on gap detection - Signature verification: mandatory pre-processing gate
- Ordering: per-DID event ordering via Tokio task pools
3. Data Model & Conflict Resolution#
GraphQL Schema#
type Book {
workId: String! # e.g., "wikidata:Q190192" or "isbn-group:978-0-441"
edition: BookEdition!
availableEditions: [BookEdition!]!
}
type BookEdition {
uri: String! # AT-URI: at://did:plc:.../com.example.library.book/...
isbn: String
editionLabel: String
publisher: String
publicationYear: Int
coverArtCid: String
}
Book Canonicalization Resolution Logic#
- On
addBookToPool: Query Wikidata/OpenLibrary for ISBN → extract ```_workId_TICK - Check local DB for existing
Bookwith same ```_workId_TICK - If found: link
poolItemto existingBook, create new ```_BookEdition_TICK - If not: create
Book+ ```_BookEdition_TICK
Pool Query (SQL)#
SELECT b.work_id, array_agg(e.edition_label) as editions,
COUNT(pi.uri) as available_copies
FROM books b
JOIN book_editions e ON b.id = e.book_id
JOIN pool_items pi ON e.uri = pi.book_edition_uri
WHERE pi.status = 'AVAILABLE'
GROUP BY b.work_id
ORDER BY available_copies DESC;
Loan Status State Machine#
pub enum LoanStatus {
PendingApproval,
Active,
Returned,
Overdue, // Auto-set when expected_return_date < now()
ReportedLost, // Lender-initiated
Disputed, // Borrower contests loss
Resolved, // Terminal: compensated or forgiven
}
Lost Book Mutation#
pub async fn report_book_lost(
ctx: &Context,
loan_uri: String,
resolution: LossResolution, // Compensate | Forgive | Investigate
) -> Result<LoanAgreement> {
ensure!(ctx.did == loan.lender_did, AuthError::Forbidden);
let record = LoanAgreementRecord {
status: LoanStatus::ReportedLost,
loss_reported_at: Some(Utc::now()),
resolution: Some(resolution),
..loan.record
};
self.pds_client.put_record(&loan_uri, &record).await?;
if matches!(resolution, LossResolution::Compensate) {
self.reputation_service.apply_penalty(
&loan.borrower_did,
ReputationPenalty::LostBook
).await;
}
self.indexer.remove_pool_item(&loan.book_copy_uri).await;
Ok(loan)
}
Consistency Guarantees#
- Primary key: AT-URI (
at://did:plc:.../collection/rkey) - Canonicalization: work-level grouping, edition-specific loans
- Idempotency:
rev-based deduplication on all record writes
4. Caching & Rate Limiting#
Three-Tier Cache#
pub struct CacheLayer {
l1_cache: DashMap<CacheKey, CachedValue>, // In-memory LRU, 10k entries, 5-min TTL
l2_cache: RedisPool, // Distributed, 1-hour TTL
db: PgPool, // Source of truth
}
impl CacheLayer {
pub async fn get_pool_items(&self, query: PoolQuery) -> Result<Vec<PoolItem>> {
if let Some(items) = self.l1_cache.get(&query) {
return Ok(items.clone());
}
if let Some(items) = self.l2_cache.get(&query).await? {
self.l1_cache.insert(query.clone(), items.clone());
return Ok(items);
}
let items = self.db.query_pool_items(query.clone(), LIMIT).await?;
let items_clone = items.clone();
tokio::spawn(async move {
self.l2_cache.set(&query, &items_clone, TTL_1H).await.ok();
self.l1_cache.insert(query, items_clone);
});
Ok(items)
}
}
Rate Limit Configuration#
[rate_limits]
per_did = { requests_per_minute = 60, burst = 10 }
per_ip = { requests_per_second = 10, burst = 50 }
mutations = { addBookToPool = "10/hour", requestBookLoan = "20/hour" }
firehose = { events_per_second = 1000, max_backlog = 10000 }
Rate Limit Middleware#
pub async fn rate_limit_middleware(
ctx: &Context,
operation: &str,
) -> Result<(), RateLimitError> {
let key = format!("{}:{}", ctx.did, operation);
governor::RateLimiter::direct(&key)
.map_err(|_| RateLimitError::TooManyRequests)?;
if operation == "requestBookLoan" {
let recent = db.count_recent_loans(&ctx.did, Duration::hours(1)).await?;
if recent > 5 {
return Err(RateLimitError::Custom("Too many loan requests"));
}
}
Ok(())
}
Performance Contracts#
| Operation | P95 Latency | Throughput |
|---|---|---|
| GraphQL query (cached) | <50ms | 1000 req/s |
| GraphQL mutation (ATProto write) | <500ms | 50 req/s |
| Firehose event processing | <100ms/event | 500 events/s |
| DID resolution (cached) | <20ms | 2000 req/s |
5. Operational Resilience#
Health Check Endpoint#
async fn health_check(
db: State<PgPool>,
firehose: State<FirehoseSubscriber>,
pds: State<PdsClient>,
) -> impl IntoResponse {
let checks = futures::join!(
db.ping(),
firehose.connection_status(),
pds.health_check()
);
let status = if checks.iter().all(|r| r.is_ok()) {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
Json(json!({
"status": if status == StatusCode::OK { "healthy" } else { "degraded" },
"checks": {
"database": checks.0.is_ok(),
"firehose": checks.1.is_ok(),
"pds_connectivity": checks.2.is_ok(),
}
})).status(status)
}
Graceful Degradation: Offline-First Write Queue#
pub async fn add_book_to_pool_fallback(
ctx: &Context,
input: AddBookInput,
) -> Result<PoolItem> {
let book = self.resolve_book_metadata(&input).await?;
let local_item = self.db.create_pending_pool_item(&ctx.did, &book).await?;
self.sync_queue.enqueue(SyncJob::PublishPoolItem {
did: ctx.did.clone(),
item_uri: local_item.uri.clone(),
}).await?;
Ok(PoolItem {
sync_status: SyncStatus::Pending,
..local_item.into()
})
}
async fn sync_worker(sync_queue: SyncQueue, pds: PdsClient) {
while let Some(job) = sync_queue.dequeue().await {
match job {
SyncJob::PublishPoolItem { did, item_uri } => {
let result = retry_with_backoff(3, || async {
pds.put_record(&item_uri, &record).await
}).await;
match result {
Ok(_) => db.mark_synced(&item_uri).await,
Err(e) => {
log::error!("Sync failed for {}: {}", item_uri, e);
}
}
}
}
}
}
Audit Logging#
pub struct AuditEvent {
pub timestamp: DateTime<Utc>,
pub actor_did: String,
pub operation: String,
pub target_uri: Option<String>,
pub ip_address: Option<String>,
pub result: OperationResult,
pub error_context: Option<String>,
}
pub async fn audit_mutation_middleware(
ctx: &Context,
info: &ResolveInfo<'_>,
result: &Result<Value, Error>,
) {
let event = AuditEvent {
timestamp: Utc::now(),
actor_did: ctx.did.clone(),
operation: info.field_name().to_string(),
target_uri: extract_target_uri(info),
ip_address: ctx.remote_addr.clone(),
result: match result {
Ok(_) => OperationResult::Success,
Err(e) => OperationResult::Failed,
},
error_context: result.as_ref().err().map(|e| sanitize_error(e)),
};
tokio::spawn(async move {
audit_logger.log(event).await.ok();
});
}
Observability Stack#
- Metrics: Prometheus exporters — request rates, error rates, firehose lag
- Tracing: OpenTelemetry spans for GraphQL resolvers + ATProto calls
- Logging: Structured JSON with correlation IDs
- Alerts: PagerDuty for firehose disconnection >5min, auth failure spikes
6. Cover Art: Hybrid Blob Strategy#
pub async fn handle_cover_art(
&self,
image_url: String,
user_did: String,
) -> Result<CoverArtReference> {
let image_bytes = self.fetch_and_validate_image(&image_url).await?;
let blob_ref = self.pds_client
.upload_blob(&user_did, &image_bytes, "image/jpeg")
.await?;
Ok(CoverArtReference::BlobRef(blob_ref))
}
async fn resolve_cover_art(&self, cid: String) -> Result<impl IntoResponse> {
if let Ok(blob) = self.pds_client.get_blob(&cid).await {
return Ok(Response::new(Body::from(blob.bytes))
.header("content-type", blob.mime_type));
}
if let Some(cached) = self.local_cache.get(&cid).await {
return Ok(Response::new(Body::from(cached.bytes))
.header("content-type", cached.mime_type));
}
Err(ApiError::NotFound)
}
Resolution order: PDS blob → local cache → 404.
7. Implementation Checklist#
Security#
- DPoP-bound OAuth 2.0 flow
- DID document validation per request
- Lexicon-scoped permission enforcement
- ATProto signature verification pre-processing
- Rate limiting per-DID and per-IP
Federation#
- Firehose cursor persistence (idempotent)
- Multi-relay failover with exponential backoff
- Backfill via
com.atproto.sync.getRepo - Per-DID event ordering
Data#
- AT-URI canonical identifiers
- Work-level grouping with edition-specific loans
- Lost-book state machine with resolution paths
- Reputation signal propagation via firehose
Performance#
- L1/L2/L3 cache hierarchy
- Async ATProto write queue
- Tokio task pool parallelization
- Cursor-based query pagination
Operations#
- Health check endpoints (DB, firehose, PDS)
- Graceful degradation on ATProto unavailability
- Structured audit logging (all mutations)
- Prometheus + OpenTelemetry instrumentation
Edge Cases#
- DID key rotation → re-auth flow
- Firehose gap detection → backfill
- Offline mutation queue → retry with backoff
- Loan dispute resolution workflow
8. Deployment Configuration#
[instance]
handle = "library.yourdomain.org"
admin_did = "did:plc:..."
[atproto]
relay_endpoints = [
"wss://bsky.network",
"wss://relay.example.org",
]
pds_discovery_ttl_seconds = 300
[firehose]
cursor_persistence = "postgresql"
backfill_enabled = true
max_concurrent_backfills = 5
[cache]
l1_max_entries = 10000
l1_ttl_seconds = 300
redis_cluster_nodes = ["redis-1:6379", "redis-2:6379"]
[rate_limits]
enabled = true
per_did_requests_per_minute = 60
mutation_burst_limit = 10
[observability]
prometheus_endpoint = "/metrics"
opentelemetry_exporter = "jaeger"
audit_log_sink = "postgresql"
Infrastructure Requirements#
atproto-subscriber: separate binary for independent scaling- Kubernetes liveness/readiness probes →
/healthendpoint - PostgreSQL WAL archiving for point-in-time cursor recovery
- TLS mutual auth for inter-service communication
Next Steps#
- Implement DPoP authentication middleware
- Build firehose subscriber with cursor persistence PoC
- Define lexicon schemas in
./lexicons/com/example/library/ - Load-test GraphQL resolvers with federation traffic patterns