···1931931941941. Cloudflare account with Workers enabled
1951952. [Wrangler CLI](https://developers.cloudflare.com/workers/wrangler/install-and-update/) installed
196196+ `npx wrangler` works here as well.
1961973. Rust toolchain with `wasm32-unknown-unknown` target
198198+4. Crate `worker-build`
197199198200### Quick Start
199201···228230[[d1_databases]]
229231binding = "DB"
230232database_name = "personal-activity-db"
231231-database_id = "your-database-id-here" # Replace with actual ID
233233+database_id = "your-database-id-here" # Replace with returned database_id
232234```
233235234236Then copy to the active config:
···240242#### 3. Initialize Database Schema
241243242244```sh
243243-wrangler d1 execute personal-activity-db --file=schema.sql
245245+wrangler d1 execute personal-activity-db --remote --file=schema.sql
244246```
245247248248+Note that you can omit `--remote` for local development.
249249+246250#### 4. Build and Deploy
247251248252```sh
249253# Build the worker
250254cd ..
251255cargo install worker-build
252252-worker-build --release -p pai-worker
256256+worker-build --release worker
257257+```
258258+259259+#### 5. Patch Generated Code
260260+261261+The worker-build output requires two patches for compatibility with wrangler:
262262+263263+```sh
264264+# 1. Fix import syntax (remove 'source' keyword)
265265+sed -i.bak 's/import source wasmModule/import wasmModule/' worker/build/index.js
266266+267267+# 2. Add default export for ES module format (required for D1 bindings)
268268+echo -e "\nexport default { fetch, scheduled };" >> worker/build/index.js
269269+```
270270+271271+On macOS, use `sed -i '' ...` instead of `sed -i.bak ...`.
272272+273273+#### 6. Deploy
253274254254-# Deploy
275275+```sh
255276cd cloudflare-deployment
256277wrangler deploy
257278```
···292313293314### API Endpoints
294315295295-The Worker exposes the same API as the self-hosted server:
316316+The Worker exposes the following API:
296317297297-- `GET /api/feed?source_kind=bluesky&limit=20` - List items
298298-- `GET /api/item/{id}` - Get single item
299299-- `GET /status` - Health check
318318+- `GET /` - API documentation (JSON)
319319+- `GET /api/feed?source_kind=bluesky&limit=20` - List items with optional filters
320320+- `GET /api/item/{id}` - Get single item by ID
321321+- `POST /api/sync` - Manually trigger synchronization from all configured sources
322322+- `GET /status` - Health check and version info
300323301324### Local Development
302325
+7-1
core/Cargo.toml
···99serde_json = "1.0"
1010toml = "0.9"
1111reqwest = { version = "0.12", features = ["json"] }
1212-tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }
1312feed-rs = "2.2"
1413chrono = "0.4"
1414+1515+[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
1616+tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }
1717+1818+[target.'cfg(target_arch = "wasm32")'.dependencies]
1919+tokio = { version = "1.0", features = ["macros", "sync"] }
2020+uuid = { version = "1.18", features = ["v4", "js"] }
+3
core/src/lib.rs
···11+#[cfg(not(target_arch = "wasm32"))]
12mod fetchers;
2334use serde::{Deserialize, Serialize};
···56use std::{fmt, str::FromStr};
67use thiserror::Error;
7899+#[cfg(not(target_arch = "wasm32"))]
810pub use fetchers::{BearBlogFetcher, BlueskyFetcher, LeafletFetcher, SubstackFetcher};
9111012/// Errors that can occur in the Personal Activity Index
···227229/// Returns the number of sources successfully synced.
228230///
229231/// Filters sources based on optional kind and source_id parameters.
232232+#[cfg(not(target_arch = "wasm32"))]
230233pub fn sync_all_sources(
231234 config: &Config, storage: &dyn Storage, kind: Option<SourceKind>, source_id: Option<&str>,
232235) -> Result<usize> {
+1-1
worker/Cargo.toml
···8899[dependencies]
1010pai-core = { path = "../core" }
1111-worker = { version = "0.6", features = ["d1"] }
1111+worker = { version = "0.7", features = ["d1"] }
1212serde = { version = "1.0", features = ["derive"] }
1313serde_json = "1.0"
1414serde_urlencoded = "0.7"
+110
worker/api-docs.json
···11+{
22+ "name": "Personal Activity Index API",
33+ "version": "0.0.0",
44+ "description": "Aggregate and query your personal activity across multiple platforms",
55+ "endpoints": [
66+ {
77+ "method": "GET",
88+ "path": "/",
99+ "description": "API documentation",
1010+ "response": "This documentation in JSON format"
1111+ },
1212+ {
1313+ "method": "GET",
1414+ "path": "/status",
1515+ "description": "Health check and version information",
1616+ "response": {
1717+ "status": "ok",
1818+ "version": "string"
1919+ }
2020+ },
2121+ {
2222+ "method": "GET",
2323+ "path": "/api/feed",
2424+ "description": "List items from all sources with optional filtering",
2525+ "parameters": [
2626+ {
2727+ "name": "source_kind",
2828+ "type": "string",
2929+ "required": false,
3030+ "description": "Filter by source type",
3131+ "values": ["substack", "bluesky", "leaflet", "bearblog"]
3232+ },
3333+ {
3434+ "name": "source_id",
3535+ "type": "string",
3636+ "required": false,
3737+ "description": "Filter by specific source identifier (domain or handle)"
3838+ },
3939+ {
4040+ "name": "limit",
4141+ "type": "integer",
4242+ "required": false,
4343+ "default": 20,
4444+ "description": "Maximum number of items to return"
4545+ },
4646+ {
4747+ "name": "since",
4848+ "type": "string",
4949+ "required": false,
5050+ "description": "ISO 8601 timestamp - only return items published after this time"
5151+ },
5252+ {
5353+ "name": "q",
5454+ "type": "string",
5555+ "required": false,
5656+ "description": "Search query - matches against title and summary"
5757+ }
5858+ ],
5959+ "response": {
6060+ "items": [
6161+ {
6262+ "id": "string",
6363+ "source_kind": "bluesky|substack|leaflet|bearblog",
6464+ "source_id": "string",
6565+ "author": "string?",
6666+ "title": "string?",
6767+ "summary": "string?",
6868+ "url": "string",
6969+ "content_html": "string?",
7070+ "published_at": "ISO 8601 timestamp",
7171+ "created_at": "ISO 8601 timestamp"
7272+ }
7373+ ]
7474+ }
7575+ },
7676+ {
7777+ "method": "GET",
7878+ "path": "/api/item/:id",
7979+ "description": "Get a single item by its unique ID",
8080+ "parameters": [
8181+ {
8282+ "name": "id",
8383+ "type": "string",
8484+ "required": true,
8585+ "description": "The unique identifier of the item"
8686+ }
8787+ ],
8888+ "response": "Single item object or 404 if not found"
8989+ },
9090+ {
9191+ "method": "POST",
9292+ "path": "/api/sync",
9393+ "description": "Manually trigger synchronization from all configured sources",
9494+ "response": {
9595+ "status": "success",
9696+ "message": "Sync completed successfully"
9797+ }
9898+ }
9999+ ],
100100+ "sources": {
101101+ "substack": "RSS feeds from Substack publications",
102102+ "bluesky": "Posts from Bluesky via AT Protocol API",
103103+ "leaflet": "Publications from Leaflet (Bluesky-based blogging)",
104104+ "bearblog": "Posts from Bear Blog RSS feeds"
105105+ },
106106+ "scheduled_sync": {
107107+ "description": "Automatic synchronization runs on a scheduled basis",
108108+ "schedule": "Configured via cron triggers in wrangler.toml"
109109+ }
110110+}
+213-84
worker/src/lib.rs
···33use wasm_bindgen::JsValue;
44use worker::*;
5566+#[derive(Serialize, Deserialize)]
77+struct ApiDocumentation {
88+ name: String,
99+ version: String,
1010+ description: String,
1111+ endpoints: Vec<Endpoint>,
1212+ sources: Sources,
1313+ scheduled_sync: ScheduledSync,
1414+}
1515+1616+#[derive(Serialize, Deserialize)]
1717+struct Endpoint {
1818+ method: String,
1919+ path: String,
2020+ url: Option<String>,
2121+ description: String,
2222+ #[serde(skip_serializing_if = "Option::is_none")]
2323+ parameters: Option<Vec<Parameter>>,
2424+ #[serde(skip_serializing_if = "Option::is_none")]
2525+ examples: Option<Vec<String>>,
2626+ response: serde_json::Value,
2727+}
2828+2929+#[derive(Serialize, Deserialize)]
3030+struct Parameter {
3131+ name: String,
3232+ r#type: String,
3333+ required: bool,
3434+ #[serde(skip_serializing_if = "Option::is_none")]
3535+ default: Option<serde_json::Value>,
3636+ description: String,
3737+ #[serde(skip_serializing_if = "Option::is_none")]
3838+ values: Option<Vec<String>>,
3939+}
4040+4141+#[derive(Serialize, Deserialize)]
4242+struct Sources {
4343+ substack: String,
4444+ bluesky: String,
4545+ leaflet: String,
4646+ bearblog: String,
4747+}
4848+4949+#[derive(Serialize, Deserialize)]
5050+struct ScheduledSync {
5151+ description: String,
5252+ schedule: String,
5353+}
5454+655#[derive(Deserialize)]
756struct SyncConfig {
857 substack: Option<SubstackConfig>,
···51100struct StatusResponse {
52101 status: &'static str,
53102 version: &'static str,
103103+ total_items: usize,
104104+ sources: std::collections::HashMap<String, usize>,
54105}
5510656107#[event(fetch)]
57108async fn fetch(req: Request, env: Env, _ctx: Context) -> Result<Response> {
58109 let router = Router::new();
59110 router
111111+ .get_async("/", |req, _ctx| async move {
112112+ let url = req
113113+ .url()
114114+ .map_err(|e| Error::RustError(format!("Failed to get URL: {e}")))?;
115115+ let base_url = url.origin().unicode_serialization();
116116+117117+ let docs_template = include_str!("../api-docs.json");
118118+ let mut docs: ApiDocumentation = serde_json::from_str(docs_template)
119119+ .map_err(|e| Error::RustError(format!("Failed to parse API docs: {e}")))?;
120120+121121+ docs.version = env!("CARGO_PKG_VERSION").to_string();
122122+123123+ for endpoint in &mut docs.endpoints {
124124+ endpoint.url = Some(format!("{}{}", base_url, endpoint.path));
125125+126126+ if endpoint.path == "/api/feed" {
127127+ endpoint.examples = Some(vec![
128128+ format!("{}/api/feed", base_url),
129129+ format!("{}/api/feed?source_kind=bluesky&limit=10", base_url),
130130+ format!("{}/api/feed?q=rust&limit=5", base_url),
131131+ ]);
132132+ }
133133+ }
134134+135135+ Response::from_json(&docs)
136136+ })
60137 .get_async("/api/feed", |req, ctx| async move { handle_feed(req, ctx).await })
61138 .get_async("/api/item/:id", |_req, ctx| async move {
62139 let id = ctx
···64141 .ok_or_else(|| Error::RustError("Missing id parameter".into()))?;
65142 handle_item(id, &ctx).await
66143 })
6767- .get("/status", |_req, _ctx| {
6868- let version = env!("CARGO_PKG_VERSION");
6969- let status = StatusResponse { status: "ok", version };
144144+ .post_async("/api/sync", |_req, ctx| async move {
145145+ match run_sync(&ctx.env).await {
146146+ Ok(_) => Response::from_json(&serde_json::json!({
147147+ "status": "success",
148148+ "message": "Sync completed successfully"
149149+ })),
150150+ Err(e) => Response::error(format!("Sync failed: {e}"), 500),
151151+ }
152152+ })
153153+ .get_async("/status", |_req, ctx| async move {
154154+ let db = ctx.env.d1("DB")?;
155155+156156+ let total_result = db
157157+ .prepare("SELECT COUNT(*) as count FROM items")
158158+ .first::<serde_json::Value>(None)
159159+ .await?;
160160+161161+ let total_items = total_result.and_then(|v| v.get("count")?.as_u64()).unwrap_or(0) as usize;
162162+163163+ let sources_result = db
164164+ .prepare("SELECT source_kind, COUNT(*) as count FROM items GROUP BY source_kind")
165165+ .all()
166166+ .await?;
167167+168168+ let mut sources = std::collections::HashMap::new();
169169+ if let Ok(results) = sources_result.results::<serde_json::Value>() {
170170+ for result in results {
171171+ if let (Some(kind), Some(count)) = (
172172+ result.get("source_kind").and_then(|v| v.as_str()),
173173+ result.get("count").and_then(|v| v.as_u64()),
174174+ ) {
175175+ sources.insert(kind.to_string(), count as usize);
176176+ }
177177+ }
178178+ }
179179+180180+ let status = StatusResponse { status: "ok", version: env!("CARGO_PKG_VERSION"), total_items, sources };
70181 Response::from_json(&status)
71182 })
72183 .run(req, env)
···144255145256 if let Some(limit) = filter.limit {
146257 query.push_str(" LIMIT ?");
147147- bindings.push((limit as i64).into());
258258+ bindings.push((limit as f64).into());
148259 }
149260150150- let mut stmt = db.prepare(&query);
151151- for binding in bindings {
152152- stmt = stmt.bind(&[binding])?;
153153- }
261261+ let stmt = if bindings.is_empty() { db.prepare(&query) } else { db.prepare(&query).bind(&bindings)? };
154262155263 let results = stmt.all().await?;
156264 let items: Vec<Item> = results.results()?;
···377485}
378486379487async fn sync_leaflet(config: &LeafletConfig, db: &D1Database) -> Result<usize> {
380380- let host = normalize_source_id(&config.base_url);
381381- let subdomain = host.split('.').next().unwrap_or(&host);
382382- let did = format!("{subdomain}.bsky.social");
488488+ let feed_url = format!("{}/rss", config.base_url.trim_end_matches('/'));
383489384384- let api_url = format!(
385385- "https://public.api.bsky.app/xrpc/com.atproto.repo.listRecords?repo={did}&collection=pub.leaflet.post&limit=50"
386386- );
387387-388388- let mut req = Request::new(&api_url, Method::Get)?;
490490+ let mut req = Request::new(&feed_url, Method::Get)?;
389491 req.headers_mut()?.set("User-Agent", "pai-worker/0.1.0")?;
390492391493 let mut resp = Fetch::Request(req).send().await?;
392392- let json: serde_json::Value = resp.json().await?;
494494+ let body = resp.text().await?;
393495394394- let records = json["records"]
395395- .as_array()
396396- .ok_or_else(|| Error::RustError("Invalid Leaflet response".into()))?;
496496+ let channel =
497497+ rss::Channel::read_from(body.as_bytes()).map_err(|e| Error::RustError(format!("Failed to parse RSS: {e}")))?;
397498398499 let mut count = 0;
399500400400- for record in records {
401401- let uri = record["uri"]
402402- .as_str()
403403- .ok_or_else(|| Error::RustError("Missing URI".into()))?;
404404- let value = &record["value"];
501501+ for item in channel.items() {
502502+ let id = item.guid().map(|g| g.value()).unwrap_or(item.link().unwrap_or(""));
503503+ let url = item.link().unwrap_or(id);
504504+ let title = item.title();
505505+ let summary = item.description();
506506+ let author = item.author();
507507+ let content_html = item.content();
405508406406- let title = value["title"].as_str().unwrap_or("Untitled");
407407- let summary = value["summary"].as_str().or(value["content"].as_str()).unwrap_or("");
408408- let slug = value["slug"].as_str().unwrap_or("");
409409-410410- let url = if !slug.is_empty() {
411411- format!("{}/{}", config.base_url, slug)
412412- } else {
413413- format!("{}/post/{}", config.base_url, uri.split('/').next_back().unwrap_or(""))
414414- };
415415-416416- let published_at = value["publishedAt"]
417417- .as_str()
418418- .or(value["createdAt"].as_str())
419419- .unwrap_or("")
420420- .to_string();
509509+ let published_at = item
510510+ .pub_date()
511511+ .and_then(|s| chrono::DateTime::parse_from_rfc2822(s).ok())
512512+ .map(|dt| dt.to_rfc3339())
513513+ .unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
421514422515 let created_at = chrono::Utc::now().to_rfc3339();
423516···427520 );
428521429522 stmt.bind(&[
430430- uri.into(),
523523+ id.into(),
431524 "leaflet".into(),
432525 config.id.clone().into(),
433433- JsValue::NULL,
434434- title.into(),
435435- summary.into(),
526526+ author.map(|s| s.into()).unwrap_or(JsValue::NULL),
527527+ title.map(|s| s.into()).unwrap_or(JsValue::NULL),
528528+ summary.map(|s| s.into()).unwrap_or(JsValue::NULL),
436529 url.into(),
437437- JsValue::NULL,
530530+ content_html.map(|s| s.into()).unwrap_or(JsValue::NULL),
438531 published_at.into(),
439532 created_at.into(),
440533 ])?
···448541}
449542450543async fn sync_bearblog(config: &BearBlogConfig, db: &D1Database) -> Result<usize> {
451451- let feed_url = format!("{}/feed/", config.base_url.trim_end_matches('/'));
544544+ let feed_url = format!("{}/feed/?type=rss", config.base_url.trim_end_matches('/'));
452545453546 let mut req = Request::new(&feed_url, Method::Get)?;
454547 req.headers_mut()?.set("User-Agent", "pai-worker/0.1.0")?;
···516609 use super::*;
517610518611 #[test]
612612+ fn test_api_docs_json_is_valid() {
613613+ let docs_str = include_str!("../api-docs.json");
614614+ let result = serde_json::from_str::<ApiDocumentation>(docs_str);
615615+ assert!(result.is_ok(), "API docs JSON should be valid: {:?}", result.err());
616616+617617+ let docs = result.unwrap();
618618+ assert_eq!(docs.name, "Personal Activity Index API");
619619+ assert!(!docs.description.is_empty());
620620+ assert!(!docs.endpoints.is_empty());
621621+ }
622622+623623+ #[test]
624624+ fn test_api_docs_has_all_endpoints() {
625625+ let docs_str = include_str!("../api-docs.json");
626626+ let docs: ApiDocumentation = serde_json::from_str(docs_str).unwrap();
627627+628628+ let paths: Vec<&str> = docs.endpoints.iter().map(|e| e.path.as_str()).collect();
629629+630630+ assert!(paths.contains(&"/"));
631631+ assert!(paths.contains(&"/status"));
632632+ assert!(paths.contains(&"/api/feed"));
633633+ assert!(paths.contains(&"/api/item/:id"));
634634+ assert!(paths.contains(&"/api/sync"));
635635+ }
636636+637637+ #[test]
638638+ fn test_api_docs_feed_endpoint_parameters() {
639639+ let docs_str = include_str!("../api-docs.json");
640640+ let docs: ApiDocumentation = serde_json::from_str(docs_str).unwrap();
641641+642642+ let feed_endpoint = docs.endpoints.iter().find(|e| e.path == "/api/feed").unwrap();
643643+ let params = feed_endpoint.parameters.as_ref().unwrap();
644644+ let param_names: Vec<&str> = params.iter().map(|p| p.name.as_str()).collect();
645645+646646+ assert!(param_names.contains(&"source_kind"));
647647+ assert!(param_names.contains(&"source_id"));
648648+ assert!(param_names.contains(&"limit"));
649649+ assert!(param_names.contains(&"since"));
650650+ assert!(param_names.contains(&"q"));
651651+ }
652652+653653+ #[test]
654654+ fn test_api_docs_has_source_descriptions() {
655655+ let docs_str = include_str!("../api-docs.json");
656656+ let docs: ApiDocumentation = serde_json::from_str(docs_str).unwrap();
657657+658658+ assert!(!docs.sources.substack.is_empty());
659659+ assert!(!docs.sources.bluesky.is_empty());
660660+ assert!(!docs.sources.leaflet.is_empty());
661661+ assert!(!docs.sources.bearblog.is_empty());
662662+ }
663663+664664+ #[test]
665665+ fn test_api_docs_url_generation() {
666666+ let docs_str = include_str!("../api-docs.json");
667667+ let mut docs: ApiDocumentation = serde_json::from_str(docs_str).unwrap();
668668+669669+ let base_url = "https://example.workers.dev";
670670+ for endpoint in &mut docs.endpoints {
671671+ endpoint.url = Some(format!("{}{}", base_url, endpoint.path));
672672+ }
673673+674674+ let root = docs.endpoints.iter().find(|e| e.path == "/").unwrap();
675675+ assert_eq!(root.url.as_ref().unwrap(), "https://example.workers.dev/");
676676+677677+ let feed = docs.endpoints.iter().find(|e| e.path == "/api/feed").unwrap();
678678+ assert_eq!(feed.url.as_ref().unwrap(), "https://example.workers.dev/api/feed");
679679+ }
680680+681681+ #[test]
519682 fn test_normalize_source_id_https() {
520683 assert_eq!(
521684 normalize_source_id("https://patternmatched.substack.com"),
···561724 }
562725563726 #[test]
564564- fn test_leaflet_url_with_slug() {
565565- let base_url = "https://test.leaflet.pub";
566566- let slug = "my-post";
567567- let url = if !slug.is_empty() {
568568- format!("{base_url}/{slug}")
569569- } else {
570570- format!("{}/post/{}", base_url, "fallback")
571571- };
572572- assert_eq!(url, "https://test.leaflet.pub/my-post");
573573- }
574574-575575- #[test]
576576- fn test_leaflet_url_without_slug() {
577577- let base_url = "https://test.leaflet.pub";
578578- let slug = "";
579579- let uri = "at://did:plc:abc123/pub.leaflet.post/xyz789";
580580- let post_id = uri.split('/').next_back().unwrap_or("");
581581- let url = if !slug.is_empty() { format!("{base_url}/{slug}") } else { format!("{base_url}/post/{post_id}") };
582582- assert_eq!(url, "https://test.leaflet.pub/post/xyz789");
583583- }
584584-585585- #[test]
586727 fn test_bluesky_post_id_extraction() {
587728 let uri = "at://did:plc:abc123/app.bsky.feed.post/3ld7xyqnvqk2a";
588729 let post_id = uri.split('/').next_back().unwrap_or("");
···653794 }
654795655796 #[test]
656656- fn test_leaflet_did_construction() {
657657- let subdomain = "desertthunder";
658658- let did = format!("{subdomain}.bsky.social");
659659- assert_eq!(did, "desertthunder.bsky.social");
660660- }
661661-662662- #[test]
663663- fn test_leaflet_api_url_construction() {
664664- let did = "desertthunder.bsky.social";
665665- let api_url = format!(
666666- "https://public.api.bsky.app/xrpc/com.atproto.repo.listRecords?repo={did}&collection=pub.leaflet.post&limit=50"
667667- );
668668- assert_eq!(
669669- api_url,
670670- "https://public.api.bsky.app/xrpc/com.atproto.repo.listRecords?repo=desertthunder.bsky.social&collection=pub.leaflet.post&limit=50"
671671- );
797797+ fn test_leaflet_feed_url_construction() {
798798+ let base_url = "https://desertthunder.leaflet.pub";
799799+ let feed_url = format!("{}/rss", base_url.trim_end_matches('/'));
800800+ assert_eq!(feed_url, "https://desertthunder.leaflet.pub/rss");
672801 }
673802674803 #[test]
+8-1
worker/wrangler.example.toml
···22# Copy this file to wrangler.toml and update with your values
3344name = "personal-activity-index"
55-main = "build/worker/index.js"
55+main = "build/index.js"
66compatibility_date = "2025-01-15"
7788# D1 Database Binding
···38383939# Optional: Logging level
4040# LOG_LEVEL = "info"
4141+4242+[observability]
4343+[observability.logs]
4444+enabled = true
4545+head_sampling_rate = 1
4646+invocation_logs = true
4747+persist = true