this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Decentralized Library API Gateway: Technical Architecture Specification#

For conceptual overview, workflows, and non-technical explanations, see humandocs.md.

Scope: This document contains implementation-level specifications only — code patterns, schemas, configurations, threat models, and performance contracts. It is intended for engineers building or maintaining the system.


1. Authentication: DPoP-Bound AT Protocol OAuth 2.0#

Implementation (axum + async-graphql)#

pub struct AtprotoAuth {
    pds_discovery: PdsDiscoveryCache,  // TTL-cached DID → PDS mapping
    token_verifier: JwtVerifier,        // Validates DPoP-bound JWTs
    scope_enforcer: ScopeValidator,     // Enforces lexicon-level permissions
}

impl AtprotoAuth {
    pub async fn validate(&self, headers: &HeaderMap) -> Result<AuthContext, AuthError> {
        let dpop_proof = headers.get("DPoP")
            .ok_or(AuthError::MissingDpop)?;
        let token = headers.get("Authorization")
            .and_then(|v| v.to_str().ok())
            .and_then(|v| v.strip_prefix("DPoP "))
            .ok_or(AuthError::InvalidAuthHeader)?;
        
        let claims = self.token_verifier.verify_dpop_bound(token, dpop_proof).await?;
        let did = claims.sub;
        
        let did_doc = self.pds_discovery.resolve(&did).await?;
        if did_doc.signing_key != claims.key_id {
            return Err(AuthError::KeyRotationDetected);
        }
        
        self.scope_enforcer.validate(&claims.scope, requested_lexicon)?;
        
        Ok(AuthContext { did, scopes: claims.scope })
    }
}

Threat Model#

Threat Mitigation
Token replay DPoP proof binding + 15-min JWT TTL
DID takeover Real-time DID document validation + signature verification
Scope escalation Server-side lexicon permission enforcement
PDS impersonation TLS + certificate pinning for PDS endpoints

Auth Configuration#

  • DPoP binding: required on all authenticated endpoints
  • DID resolution cache TTL: 300s (forced refresh on auth failure)
  • Lexicon scope format: com.example.library.<collection>:<action> (e.g., poolItem:create, loanAgreement:read)

2. Firehose Subscription: Federation Sync Engine#

Implementation#

pub struct FirehoseSubscriber {
    relay_endpoints: Vec<Url>,
    cursor_store: CursorPersistence,    // PostgreSQL-backed
    backfill_queue: BackfillManager,    // com.atproto.sync.getRepo for gaps
    record_processor: AsyncRecordPipeline,
}

impl FirehoseSubscriber {
    pub async fn run(&mut self) -> Result<(), SubscriptionError> {
        loop {
            let mut ws = self.connect_with_retry().await?;
            let cursor = self.cursor_store.load().await?;
            ws.send(SubscribeRepos { cursor: Some(cursor) }).await?;
            
            while let Some(event) = ws.next().await {
                match event {
                    Event::Commit(commit) => {
                        if !self.verify_commit_signature(&commit).await {
                            log::warn!("Invalid signature for DID {}", commit.repo);
                            continue;
                        }
                        if self.cursor_store.is_processed(&commit.repo, &commit.rev).await {
                            continue;
                        }
                        self.process_commit_ops(commit).await?;
                        self.cursor_store.save(&commit.repo, &commit.rev).await?;
                    }
                    Event::GapDetected { since, to } => {
                        self.backfill_queue.enqueue(commit.repo, since, to).await;
                    }
                }
            }
            
            self.relay_endpoints.rotate_left(1);
            tokio::time::sleep(Duration::from_secs(5)).await;
        }
    }
}

Cursor Persistence Schema#

CREATE TABLE firehose_cursors (
    did TEXT PRIMARY KEY,
    rev TEXT NOT NULL,
    seq BIGINT NOT NULL,
    processed_at TIMESTAMPTZ DEFAULT now(),
    UNIQUE(did, rev)
);

-- Idempotent upsert
INSERT INTO firehose_cursors (did, rev, seq)
VALUES ($1, $2, $3)
ON CONFLICT (did) DO UPDATE SET rev = $2, seq = $3, processed_at = now()
WHERE firehose_cursors.seq < $3;

Operational Metrics#

atproto_firehose_events_processed_total{lexicon="com.example.library.*"}
atproto_firehose_cursor_lag_seconds{did="..."}
atproto_firehose_signature_failures_total
atproto_backfill_queue_depth
atproto_backfill_success_rate

Reliability Constraints#

  • Cursor persistence: PostgreSQL with ON CONFLICT idempotency
  • Relay failover: 2-3 endpoints, rotate on connection failure
  • Backfill: com.atproto.sync.getRepo for CAR file retrieval on gap detection
  • Signature verification: mandatory pre-processing gate
  • Ordering: per-DID event ordering via Tokio task pools

3. Data Model & Conflict Resolution#

GraphQL Schema#

type Book {
  workId: String!      # e.g., "wikidata:Q190192" or "isbn-group:978-0-441"
  edition: BookEdition!
  availableEditions: [BookEdition!]!
}

type BookEdition {
  uri: String!         # AT-URI: at://did:plc:.../com.example.library.book/...
  isbn: String
  editionLabel: String
  publisher: String
  publicationYear: Int
  coverArtCid: String
}

Book Canonicalization Resolution Logic#

  1. On addBookToPool: Query Wikidata/OpenLibrary for ISBN → extract ```_workId_TICK
  2. Check local DB for existing Book with same ```_workId_TICK
  3. If found: link poolItem to existing Book, create new ```_BookEdition_TICK
  4. If not: create Book + ```_BookEdition_TICK

Pool Query (SQL)#

SELECT b.work_id, array_agg(e.edition_label) as editions, 
       COUNT(pi.uri) as available_copies
FROM books b
JOIN book_editions e ON b.id = e.book_id
JOIN pool_items pi ON e.uri = pi.book_edition_uri
WHERE pi.status = 'AVAILABLE'
GROUP BY b.work_id
ORDER BY available_copies DESC;

Loan Status State Machine#

pub enum LoanStatus {
    PendingApproval,
    Active,
    Returned,
    Overdue,          // Auto-set when expected_return_date < now()
    ReportedLost,     // Lender-initiated
    Disputed,         // Borrower contests loss
    Resolved,         // Terminal: compensated or forgiven
}

Lost Book Mutation#

pub async fn report_book_lost(
    ctx: &Context,
    loan_uri: String,
    resolution: LossResolution, // Compensate | Forgive | Investigate
) -> Result<LoanAgreement> {
    ensure!(ctx.did == loan.lender_did, AuthError::Forbidden);
    
    let record = LoanAgreementRecord {
        status: LoanStatus::ReportedLost,
        loss_reported_at: Some(Utc::now()),
        resolution: Some(resolution),
        ..loan.record
    };
    self.pds_client.put_record(&loan_uri, &record).await?;
    
    if matches!(resolution, LossResolution::Compensate) {
        self.reputation_service.apply_penalty(
            &loan.borrower_did, 
            ReputationPenalty::LostBook
        ).await;
    }
    
    self.indexer.remove_pool_item(&loan.book_copy_uri).await;
    Ok(loan)
}

Consistency Guarantees#

  • Primary key: AT-URI (at://did:plc:.../collection/rkey)
  • Canonicalization: work-level grouping, edition-specific loans
  • Idempotency: rev-based deduplication on all record writes

4. Caching & Rate Limiting#

Three-Tier Cache#

pub struct CacheLayer {
    l1_cache: DashMap<CacheKey, CachedValue>,  // In-memory LRU, 10k entries, 5-min TTL
    l2_cache: RedisPool,                        // Distributed, 1-hour TTL
    db: PgPool,                                 // Source of truth
}

impl CacheLayer {
    pub async fn get_pool_items(&self, query: PoolQuery) -> Result<Vec<PoolItem>> {
        if let Some(items) = self.l1_cache.get(&query) {
            return Ok(items.clone());
        }
        if let Some(items) = self.l2_cache.get(&query).await? {
            self.l1_cache.insert(query.clone(), items.clone());
            return Ok(items);
        }
        let items = self.db.query_pool_items(query.clone(), LIMIT).await?;
        let items_clone = items.clone();
        tokio::spawn(async move {
            self.l2_cache.set(&query, &items_clone, TTL_1H).await.ok();
            self.l1_cache.insert(query, items_clone);
        });
        Ok(items)
    }
}

Rate Limit Configuration#

[rate_limits]
per_did = { requests_per_minute = 60, burst = 10 }
per_ip = { requests_per_second = 10, burst = 50 }
mutations = { addBookToPool = "10/hour", requestBookLoan = "20/hour" }
firehose = { events_per_second = 1000, max_backlog = 10000 }

Rate Limit Middleware#

pub async fn rate_limit_middleware(
    ctx: &Context,
    operation: &str,
) -> Result<(), RateLimitError> {
    let key = format!("{}:{}", ctx.did, operation);
    governor::RateLimiter::direct(&key)
        .map_err(|_| RateLimitError::TooManyRequests)?;
    
    if operation == "requestBookLoan" {
        let recent = db.count_recent_loans(&ctx.did, Duration::hours(1)).await?;
        if recent > 5 {
            return Err(RateLimitError::Custom("Too many loan requests"));
        }
    }
    Ok(())
}

Performance Contracts#

Operation P95 Latency Throughput
GraphQL query (cached) <50ms 1000 req/s
GraphQL mutation (ATProto write) <500ms 50 req/s
Firehose event processing <100ms/event 500 events/s
DID resolution (cached) <20ms 2000 req/s

5. Operational Resilience#

Health Check Endpoint#

async fn health_check(
    db: State<PgPool>,
    firehose: State<FirehoseSubscriber>,
    pds: State<PdsClient>,
) -> impl IntoResponse {
    let checks = futures::join!(
        db.ping(),
        firehose.connection_status(),
        pds.health_check()
    );
    let status = if checks.iter().all(|r| r.is_ok()) {
        StatusCode::OK
    } else {
        StatusCode::SERVICE_UNAVAILABLE
    };
    Json(json!({
        "status": if status == StatusCode::OK { "healthy" } else { "degraded" },
        "checks": {
            "database": checks.0.is_ok(),
            "firehose": checks.1.is_ok(),
            "pds_connectivity": checks.2.is_ok(),
        }
    })).status(status)
}

Graceful Degradation: Offline-First Write Queue#

pub async fn add_book_to_pool_fallback(
    ctx: &Context,
    input: AddBookInput,
) -> Result<PoolItem> {
    let book = self.resolve_book_metadata(&input).await?;
    let local_item = self.db.create_pending_pool_item(&ctx.did, &book).await?;
    self.sync_queue.enqueue(SyncJob::PublishPoolItem {
        did: ctx.did.clone(),
        item_uri: local_item.uri.clone(),
    }).await?;
    Ok(PoolItem {
        sync_status: SyncStatus::Pending,
        ..local_item.into()
    })
}

async fn sync_worker(sync_queue: SyncQueue, pds: PdsClient) {
    while let Some(job) = sync_queue.dequeue().await {
        match job {
            SyncJob::PublishPoolItem { did, item_uri } => {
                let result = retry_with_backoff(3, || async {
                    pds.put_record(&item_uri, &record).await
                }).await;
                match result {
                    Ok(_) => db.mark_synced(&item_uri).await,
                    Err(e) => {
                        log::error!("Sync failed for {}: {}", item_uri, e);
                    }
                }
            }
        }
    }
}

Audit Logging#

pub struct AuditEvent {
    pub timestamp: DateTime<Utc>,
    pub actor_did: String,
    pub operation: String,
    pub target_uri: Option<String>,
    pub ip_address: Option<String>,
    pub result: OperationResult,
    pub error_context: Option<String>,
}

pub async fn audit_mutation_middleware(
    ctx: &Context,
    info: &ResolveInfo<'_>,
    result: &Result<Value, Error>,
) {
    let event = AuditEvent {
        timestamp: Utc::now(),
        actor_did: ctx.did.clone(),
        operation: info.field_name().to_string(),
        target_uri: extract_target_uri(info),
        ip_address: ctx.remote_addr.clone(),
        result: match result {
            Ok(_) => OperationResult::Success,
            Err(e) => OperationResult::Failed,
        },
        error_context: result.as_ref().err().map(|e| sanitize_error(e)),
    };
    tokio::spawn(async move {
        audit_logger.log(event).await.ok();
    });
}

Observability Stack#

  • Metrics: Prometheus exporters — request rates, error rates, firehose lag
  • Tracing: OpenTelemetry spans for GraphQL resolvers + ATProto calls
  • Logging: Structured JSON with correlation IDs
  • Alerts: PagerDuty for firehose disconnection >5min, auth failure spikes

6. Cover Art: Hybrid Blob Strategy#

pub async fn handle_cover_art(
    &self,
    image_url: String,
    user_did: String,
) -> Result<CoverArtReference> {
    let image_bytes = self.fetch_and_validate_image(&image_url).await?;
    let blob_ref = self.pds_client
        .upload_blob(&user_did, &image_bytes, "image/jpeg")
        .await?;
    Ok(CoverArtReference::BlobRef(blob_ref))
}

async fn resolve_cover_art(&self, cid: String) -> Result<impl IntoResponse> {
    if let Ok(blob) = self.pds_client.get_blob(&cid).await {
        return Ok(Response::new(Body::from(blob.bytes))
            .header("content-type", blob.mime_type));
    }
    if let Some(cached) = self.local_cache.get(&cid).await {
        return Ok(Response::new(Body::from(cached.bytes))
            .header("content-type", cached.mime_type));
    }
    Err(ApiError::NotFound)
}

Resolution order: PDS blob → local cache → 404.


7. Implementation Checklist#

Security#

  • DPoP-bound OAuth 2.0 flow
  • DID document validation per request
  • Lexicon-scoped permission enforcement
  • ATProto signature verification pre-processing
  • Rate limiting per-DID and per-IP

Federation#

  • Firehose cursor persistence (idempotent)
  • Multi-relay failover with exponential backoff
  • Backfill via com.atproto.sync.getRepo
  • Per-DID event ordering

Data#

  • AT-URI canonical identifiers
  • Work-level grouping with edition-specific loans
  • Lost-book state machine with resolution paths
  • Reputation signal propagation via firehose

Performance#

  • L1/L2/L3 cache hierarchy
  • Async ATProto write queue
  • Tokio task pool parallelization
  • Cursor-based query pagination

Operations#

  • Health check endpoints (DB, firehose, PDS)
  • Graceful degradation on ATProto unavailability
  • Structured audit logging (all mutations)
  • Prometheus + OpenTelemetry instrumentation

Edge Cases#

  • DID key rotation → re-auth flow
  • Firehose gap detection → backfill
  • Offline mutation queue → retry with backoff
  • Loan dispute resolution workflow

8. Deployment Configuration#

[instance]
handle = "library.yourdomain.org"
admin_did = "did:plc:..."

[atproto]
relay_endpoints = [
  "wss://bsky.network",
  "wss://relay.example.org",
]
pds_discovery_ttl_seconds = 300

[firehose]
cursor_persistence = "postgresql"
backfill_enabled = true
max_concurrent_backfills = 5

[cache]
l1_max_entries = 10000
l1_ttl_seconds = 300
redis_cluster_nodes = ["redis-1:6379", "redis-2:6379"]

[rate_limits]
enabled = true
per_did_requests_per_minute = 60
mutation_burst_limit = 10

[observability]
prometheus_endpoint = "/metrics"
opentelemetry_exporter = "jaeger"
audit_log_sink = "postgresql"

Infrastructure Requirements#

  • atproto-subscriber: separate binary for independent scaling
  • Kubernetes liveness/readiness probes → /health endpoint
  • PostgreSQL WAL archiving for point-in-time cursor recovery
  • TLS mutual auth for inter-service communication

Next Steps#

  1. Implement DPoP authentication middleware
  2. Build firehose subscriber with cursor persistence PoC
  3. Define lexicon schemas in ./lexicons/com/example/library/
  4. Load-test GraphQL resolvers with federation traffic patterns