···156156## Development Gotchas
157157158158- **Do NOT run `npm run css:build` or `npm run js:build` manually** — Air handles these on file change
159159+- **Do NOT edit `icons.svg` directly** — SVG icon sprite sheets (`pkg/appview/public/icons.svg`, `pkg/hold/admin/public/icons.svg`) are auto-generated from template icon references during build. Just reference icons by name in templates and the build will include them.
159160- **RoutingRepository is created fresh on EVERY request** (no caching). Previous caching caused stale OAuth sessions and "invalid refresh token" errors. The OAuth refresher caches efficiently already (in-memory + DB).
160161- **Storage driver import**: `_ "github.com/distribution/distribution/v3/registry/storage/driver/s3-aws"` — blank import required
161162- **Hold DID lookups use database** (`manifests` table), not in-memory cache — persistent across restarts
+10-4
config-appview.example.yaml
···5757 check_interval: 15m0s
5858# ATProto Jetstream event stream settings.
5959jetstream:
6060- # ATProto firehose endpoint for real-time manifest/tag events.
6161- url: wss://jetstream2.us-west.bsky.network/subscribe
6060+ # Jetstream WebSocket endpoints, tried in order on failure.
6161+ urls:
6262+ - wss://jetstream2.us-west.bsky.network/subscribe
6363+ - wss://jetstream1.us-west.bsky.network/subscribe
6464+ - wss://jetstream2.us-east.bsky.network/subscribe
6565+ - wss://jetstream1.us-east.bsky.network/subscribe
6266 # Sync existing records from PDS on startup.
6367 backfill_enabled: true
6464- # Relay to query for repository sync (backfill source).
6565- relay_endpoint: https://relay1.us-east.bsky.network
6868+ # Relay endpoints for backfill, tried in order on failure.
6969+ relay_endpoints:
7070+ - https://relay1.us-east.bsky.network
7171+ - https://relay1.us-west.bsky.network
6672# JWT authentication settings.
6773auth:
6874 # RSA private key for signing registry JWTs issued to Docker clients.
+1-1
config-hold.example.yaml
···6868# Admin panel settings.
6969admin:
7070 # Enable the web-based admin panel for crew and storage management.
7171- enabled: false
7171+ enabled: true
7272# Storage quota tiers. Empty disables quota enforcement.
7373quota:
7474 # Quota tiers keyed by rank name. Each tier has a human-readable quota limit.
···11+# Known ATProto Relays
22+33+Reference list of known public ATProto relays and their capabilities, relevant to ATCR hold discovery and appview backfill.
44+55+There is no relay discovery protocol in ATProto — this list is manually maintained.
66+77+Last verified: 2026-02-08
88+99+## Relay List
1010+1111+### Bluesky (Official)
1212+1313+| Relay | URL | requestCrawl | listReposByCollection | Notes |
1414+|-------|-----|:---:|:---:|-------|
1515+| Bluesky (load balancer) | `https://bsky.network` | Yes | No (400 — not proxied) | Load balancer, proxies to regional relays |
1616+| Bluesky US-East | `https://relay1.us-east.bsky.network` | Yes | Yes | Regional relay with full collection directory |
1717+| Bluesky US-West | `https://relay1.us-west.bsky.network` | Yes | Yes | Regional relay with full collection directory |
1818+1919+### Community
2020+2121+| Relay | URL | requestCrawl | listReposByCollection | Notes |
2222+|-------|-----|:---:|:---:|-------|
2323+| Firehose NA | `https://northamerica.firehose.network` | Yes | No (404) | 72h replay buffer |
2424+| Firehose EU | `https://europe.firehose.network` | Yes | No (404) | 72h replay buffer |
2525+| Firehose Asia | `https://asia.firehose.network` | Yes | No (404) | 72h replay buffer |
2626+| Microcosm Montreal | `https://relay.fire.hose.cam` | Yes | No (404) | |
2727+| Microcosm France | `https://relay3.fr.hose.cam` | Yes | No (404) | |
2828+| Upcloud | `https://relay.upcloud.world` | Yes | No (404) | |
2929+| Blacksky | `https://atproto.africa` | Down (502) | Down (502) | Was offline as of 2026-02-08 |
3030+3131+## ATCR Usage
3232+3333+### Hold service (`requestCrawl`)
3434+3535+The hold announces its embedded PDS to relays on startup via `com.atproto.sync.requestCrawl`. Currently configured as a single relay in `server.relay_endpoint`. All healthy relays above accept `requestCrawl`.
3636+3737+### Appview backfill (`listReposByCollection`)
3838+3939+The appview uses `com.atproto.sync.listReposByCollection` to discover DIDs with `io.atcr.*` records during backfill. Only Bluesky's regional relays support this endpoint. The appview defaults to `relay1.us-east.bsky.network`.
4040+4141+## Why most relays lack `listReposByCollection`
4242+4343+The `listReposByCollection` endpoint is not part of the relay core. It's served by a separate microservice called [collectiondir](https://github.com/bluesky-social/indigo/tree/main/cmd/collectiondir) that maintains an index of `(collection, timestamp, DID)` tuples.
4444+4545+Community relays running the [Rainbow](https://github.com/bluesky-social/indigo/tree/main/cmd/rainbow) relay can optionally proxy to a collectiondir instance via `--collectiondir-host`, but most don't deploy one — likely because maintaining that index across the full network is expensive relative to just fan-out relaying.
4646+4747+## Other useful relay endpoints
4848+4949+These are standard XRPC endpoints that relays may implement:
5050+5151+- `com.atproto.sync.listRepos` — paginated list of all known repos (all tested relays support this)
5252+- `com.atproto.sync.getRepo` — all tested relays 302 redirect to the source PDS
5353+- `com.atproto.sync.getRepoStatus` — check if a relay knows about a specific DID
5454+- `com.atproto.sync.subscribeRepos` — WebSocket firehose subscription
5555+5656+## Sources
5757+5858+- [Bluesky indigo relay (Rainbow)](https://github.com/bluesky-social/indigo/tree/main/cmd/rainbow)
5959+- [Bluesky indigo collectiondir](https://github.com/bluesky-social/indigo/tree/main/cmd/collectiondir)
6060+- [firehose.network](https://firehose.network/)
6161+- [PDS debug tool relay list](https://tangled.org/microcosm.blue/pds-debug/raw/main/index.html)
6262+- [Sri's relay writeup](https://sri.leaflet.pub/3mddrqk5ays27)
hold
This is a binary file and will not be displayed.
+4-21
lexicons/io/atcr/sailor/star.json
···44 "defs": {
55 "main": {
66 "type": "record",
77- "description": "A star (like) on a container image repository. Stored in the starrer's PDS, similar to Bluesky likes.",
77+ "description": "A star (like) on a container image repository. Stored in the starrer's PDS, similar to Bluesky likes. Subject is an AT URI pointing to the repo page record being starred.",
88 "key": "any",
99 "record": {
1010 "type": "object",
1111 "required": ["subject", "createdAt"],
1212 "properties": {
1313 "subject": {
1414- "type": "ref",
1515- "ref": "#subject",
1616- "description": "The repository being starred"
1414+ "type": "string",
1515+ "format": "at-uri",
1616+ "description": "AT URI of the repository page being starred (e.g., at://did:plc:abc/io.atcr.repo.page/myapp)"
1717 },
1818 "createdAt": {
1919 "type": "string",
2020 "format": "datetime",
2121 "description": "Star creation timestamp"
2222 }
2323- }
2424- }
2525- },
2626- "subject": {
2727- "type": "object",
2828- "description": "Reference to a repository owned by a user",
2929- "required": ["did", "repository"],
3030- "properties": {
3131- "did": {
3232- "type": "string",
3333- "format": "did",
3434- "description": "DID of the repository owner"
3535- },
3636- "repository": {
3737- "type": "string",
3838- "description": "Repository name (e.g., 'myapp')",
3939- "maxLength": 255
4023 }
4124 }
4225 }
+14-6
pkg/appview/config.go
···91919292// JetstreamConfig defines ATProto Jetstream settings
9393type JetstreamConfig struct {
9494- // ATProto firehose endpoint for real-time events.
9595- URL string `yaml:"url" comment:"ATProto firehose endpoint for real-time manifest/tag events."`
9494+ // Jetstream WebSocket endpoints, tried in order on failure.
9595+ URLs []string `yaml:"urls" comment:"Jetstream WebSocket endpoints, tried in order on failure."`
96969797 // Sync existing records from PDS on startup.
9898 BackfillEnabled bool `yaml:"backfill_enabled" comment:"Sync existing records from PDS on startup."`
9999100100- // Relay to query for repository sync.
101101- RelayEndpoint string `yaml:"relay_endpoint" comment:"Relay to query for repository sync (backfill source)."`
100100+ // Relay endpoints for backfill, tried in order on failure.
101101+ RelayEndpoints []string `yaml:"relay_endpoints" comment:"Relay endpoints for backfill, tried in order on failure."`
102102}
103103104104// AuthConfig defines authentication settings
···159159 v.SetDefault("health.check_interval", "15m")
160160161161 // Jetstream defaults
162162- v.SetDefault("jetstream.url", "wss://jetstream2.us-west.bsky.network/subscribe")
162162+ v.SetDefault("jetstream.urls", []string{
163163+ "wss://jetstream2.us-west.bsky.network/subscribe",
164164+ "wss://jetstream1.us-west.bsky.network/subscribe",
165165+ "wss://jetstream2.us-east.bsky.network/subscribe",
166166+ "wss://jetstream1.us-east.bsky.network/subscribe",
167167+ })
163168 v.SetDefault("jetstream.backfill_enabled", true)
164164- v.SetDefault("jetstream.relay_endpoint", "https://relay1.us-east.bsky.network")
169169+ v.SetDefault("jetstream.relay_endpoints", []string{
170170+ "https://relay1.us-east.bsky.network",
171171+ "https://relay1.us-west.bsky.network",
172172+ })
165173166174 // Auth defaults
167175 v.SetDefault("auth.key_path", "/var/lib/atcr/auth/private-key.pem")
+6-2
pkg/appview/config_test.go
···219219 t.Errorf("health cache TTL = %v, want 15m", got.Health.CacheTTL)
220220 }
221221222222- if got.Jetstream.URL != "wss://jetstream2.us-west.bsky.network/subscribe" {
223223- t.Errorf("jetstream URL = %v, want default", got.Jetstream.URL)
222222+ if len(got.Jetstream.URLs) != 4 || got.Jetstream.URLs[0] != "wss://jetstream2.us-west.bsky.network/subscribe" {
223223+ t.Errorf("jetstream URLs = %v, want 4 endpoints starting with us-west-2", got.Jetstream.URLs)
224224+ }
225225+226226+ if len(got.Jetstream.RelayEndpoints) != 2 || got.Jetstream.RelayEndpoints[0] != "https://relay1.us-east.bsky.network" {
227227+ t.Errorf("jetstream RelayEndpoints = %v, want 2 endpoints starting with us-east", got.Jetstream.RelayEndpoints)
224228 }
225229226230 // Verify distribution config was built
-33
pkg/appview/db/models.go
···6060 CreatedAt time.Time
6161}
62626363-// Push represents a combined tag and manifest for the recent pushes view
6464-type Push struct {
6565- DID string
6666- Handle string
6767- Repository string
6868- Tag string
6969- Digest string
7070- Title string
7171- Description string
7272- IconURL string
7373- StarCount int
7474- PullCount int
7575- IsStarred bool // Whether the current user has starred this repository
7676- CreatedAt time.Time
7777- HoldEndpoint string // Hold endpoint for health checking
7878- Reachable bool // Whether the hold endpoint is reachable
7979- ArtifactType string // container-image, helm-chart, unknown
8080-}
8181-8263// Repository represents an aggregated view of a user's repository
8364type Repository struct {
8465 Name string
···10687 LastPull *time.Time `json:"last_pull,omitempty"`
10788 PushCount int `json:"push_count"`
10889 LastPush *time.Time `json:"last_push,omitempty"`
109109-}
110110-111111-// FeaturedRepository represents a repository in the featured section
112112-type FeaturedRepository struct {
113113- OwnerDID string
114114- OwnerHandle string
115115- Repository string
116116- Title string
117117- Description string
118118- IconURL string
119119- StarCount int
120120- PullCount int
121121- IsStarred bool // Whether the current user has starred this repository
122122- ArtifactType string // container-image, helm-chart, unknown
12390}
1249112592// RepositoryWithStats combines repository data with statistics
+5-168
pkg/appview/db/queries.go
···5353 return strings.TrimSpace(s)
5454}
55555656-// GetRecentPushes fetches recent pushes with pagination
5757-func GetRecentPushes(db *sql.DB, limit, offset int, userFilter string, currentUserDID string) ([]Push, int, error) {
5858- query := `
5959- SELECT
6060- u.did,
6161- u.handle,
6262- t.repository,
6363- t.tag,
6464- t.digest,
6565- COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'org.opencontainers.image.title'), ''),
6666- COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'org.opencontainers.image.description'), ''),
6767- COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'io.atcr.icon'), ''),
6868- COALESCE(rs.pull_count, 0),
6969- COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = u.did AND repository = t.repository), 0),
7070- COALESCE((SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = u.did AND repository = t.repository), 0),
7171- t.created_at,
7272- m.hold_endpoint,
7373- COALESCE(rp.avatar_cid, ''),
7474- COALESCE(m.artifact_type, 'container-image')
7575- FROM tags t
7676- JOIN users u ON t.did = u.did
7777- JOIN manifests m ON t.did = m.did AND t.repository = m.repository AND t.digest = m.digest
7878- LEFT JOIN repository_stats rs ON t.did = rs.did AND t.repository = rs.repository
7979- LEFT JOIN repo_pages rp ON t.did = rp.did AND t.repository = rp.repository
8080- `
8181-8282- args := []any{currentUserDID}
8383-8484- if userFilter != "" {
8585- query += " WHERE u.handle = ? OR u.did = ?"
8686- args = append(args, userFilter, userFilter)
8787- }
8888-8989- query += " ORDER BY t.created_at DESC LIMIT ? OFFSET ?"
9090- args = append(args, limit, offset)
9191-9292- rows, err := db.Query(query, args...)
9393- if err != nil {
9494- return nil, 0, err
9595- }
9696- defer rows.Close()
9797-9898- var pushes []Push
9999- for rows.Next() {
100100- var p Push
101101- var isStarredInt int
102102- var avatarCID string
103103- if err := rows.Scan(&p.DID, &p.Handle, &p.Repository, &p.Tag, &p.Digest, &p.Title, &p.Description, &p.IconURL, &p.PullCount, &p.StarCount, &isStarredInt, &p.CreatedAt, &p.HoldEndpoint, &avatarCID, &p.ArtifactType); err != nil {
104104- return nil, 0, err
105105- }
106106- p.IsStarred = isStarredInt > 0
107107- // Prefer repo page avatar over annotation icon
108108- if avatarCID != "" {
109109- p.IconURL = BlobCDNURL(p.DID, avatarCID)
110110- }
111111- pushes = append(pushes, p)
112112- }
113113-114114- // Get total count
115115- countQuery := "SELECT COUNT(*) FROM tags t JOIN users u ON t.did = u.did"
116116- countArgs := []any{}
117117-118118- if userFilter != "" {
119119- countQuery += " WHERE u.handle = ? OR u.did = ?"
120120- countArgs = append(countArgs, userFilter, userFilter)
121121- }
122122-123123- var total int
124124- if err := db.QueryRow(countQuery, countArgs...).Scan(&total); err != nil {
125125- return nil, 0, err
126126- }
127127-128128- return pushes, total, nil
129129-}
130130-13156// SearchRepositories searches for repositories matching the query across handles, DIDs, repositories, and annotations
13257// Returns RepoCardData (one per repository) instead of individual pushes/tags
13358func SearchRepositories(db *sql.DB, query string, limit, offset int, currentUserDID string) ([]RepoCardData, int, error) {
···16961621 return GetLatestHoldDIDForRepo(h.db, did, repository)
16971622}
1698162316991699-// GetFeaturedRepositories fetches top repositories sorted by stars and pulls
17001700-func GetFeaturedRepositories(db *sql.DB, limit int, currentUserDID string) ([]FeaturedRepository, error) {
17011701- query := `
17021702- WITH latest_manifests AS (
17031703- SELECT did, repository, MAX(id) as latest_id
17041704- FROM manifests
17051705- GROUP BY did, repository
17061706- ),
17071707- repo_stats AS (
17081708- SELECT
17091709- lm.did,
17101710- lm.repository,
17111711- COALESCE(rs.pull_count, 0) as pull_count,
17121712- COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = lm.did AND repository = lm.repository), 0) as star_count,
17131713- (COALESCE(rs.pull_count, 0) + COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = lm.did AND repository = lm.repository), 0) * 10) as score
17141714- FROM latest_manifests lm
17151715- LEFT JOIN repository_stats rs ON lm.did = rs.did AND lm.repository = rs.repository
17161716- )
17171717- SELECT
17181718- m.did,
17191719- u.handle,
17201720- m.repository,
17211721- COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'org.opencontainers.image.title'), ''),
17221722- COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'org.opencontainers.image.description'), ''),
17231723- COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'io.atcr.icon'), ''),
17241724- rs.pull_count,
17251725- rs.star_count,
17261726- COALESCE((SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = m.did AND repository = m.repository), 0),
17271727- COALESCE(rp.avatar_cid, ''),
17281728- COALESCE(m.artifact_type, 'container-image')
17291729- FROM latest_manifests lm
17301730- JOIN manifests m ON lm.latest_id = m.id
17311731- JOIN users u ON m.did = u.did
17321732- JOIN repo_stats rs ON m.did = rs.did AND m.repository = rs.repository
17331733- LEFT JOIN repo_pages rp ON m.did = rp.did AND m.repository = rp.repository
17341734- ORDER BY rs.score DESC, rs.star_count DESC, rs.pull_count DESC, m.created_at DESC
17351735- LIMIT ?
17361736- `
17371737-17381738- rows, err := db.Query(query, currentUserDID, limit)
17391739- if err != nil {
17401740- return nil, err
17411741- }
17421742- defer rows.Close()
17431743-17441744- var featured []FeaturedRepository
17451745- for rows.Next() {
17461746- var f FeaturedRepository
17471747- var isStarredInt int
17481748- var avatarCID string
17491749-17501750- if err := rows.Scan(&f.OwnerDID, &f.OwnerHandle, &f.Repository,
17511751- &f.Title, &f.Description, &f.IconURL, &f.PullCount, &f.StarCount, &isStarredInt, &avatarCID, &f.ArtifactType); err != nil {
17521752- return nil, err
17531753- }
17541754- f.IsStarred = isStarredInt > 0
17551755- // Prefer repo page avatar over annotation icon
17561756- if avatarCID != "" {
17571757- f.IconURL = BlobCDNURL(f.OwnerDID, avatarCID)
17581758- }
17591759-17601760- featured = append(featured, f)
17611761- }
17621762-17631763- return featured, nil
17641764-}
17651765-17661624// RepoCardSortOrder specifies how repo cards should be sorted
17671625type RepoCardSortOrder string
17681626···17811639 case SortByLastUpdate:
17821640 orderBy = "COALESCE(rs.last_push, m.created_at) DESC"
17831641 default: // SortByScore
17841784- orderBy = "repo_stats.score DESC, repo_stats.star_count DESC, repo_stats.pull_count DESC, m.created_at DESC"
16421642+ orderBy = "(COALESCE(rs.pull_count, 0) + COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = m.did AND repository = m.repository), 0) * 10) DESC, m.created_at DESC"
17851643 }
1786164417871645 query := `
···17891647 SELECT did, repository, MAX(id) as latest_id
17901648 FROM manifests
17911649 GROUP BY did, repository
17921792- ),
17931793- repo_stats AS (
17941794- SELECT
17951795- lm.did,
17961796- lm.repository,
17971797- COALESCE(rs.pull_count, 0) as pull_count,
17981798- COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = lm.did AND repository = lm.repository), 0) as star_count,
17991799- (COALESCE(rs.pull_count, 0) + COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = lm.did AND repository = lm.repository), 0) * 10) as score
18001800- FROM latest_manifests lm
18011801- LEFT JOIN repository_stats rs ON lm.did = rs.did AND lm.repository = rs.repository
18021650 )
18031651 SELECT
18041652 m.did,
···18081656 COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'org.opencontainers.image.title'), ''),
18091657 COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'org.opencontainers.image.description'), ''),
18101658 COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'io.atcr.icon'), ''),
18111811- repo_stats.star_count,
18121812- repo_stats.pull_count,
16591659+ COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = m.did AND repository = m.repository), 0),
16601660+ COALESCE(rs.pull_count, 0),
18131661 COALESCE((SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = m.did AND repository = m.repository), 0),
18141662 COALESCE(m.artifact_type, 'container-image'),
18151663 COALESCE((SELECT tag FROM tags WHERE did = m.did AND repository = m.repository ORDER BY created_at DESC LIMIT 1), ''),
···18191667 FROM latest_manifests lm
18201668 JOIN manifests m ON lm.latest_id = m.id
18211669 JOIN users u ON m.did = u.did
18221822- JOIN repo_stats ON m.did = repo_stats.did AND m.repository = repo_stats.repository
18231670 LEFT JOIN repository_stats rs ON m.did = rs.did AND m.repository = rs.repository
18241671 LEFT JOIN repo_pages rp ON m.did = rp.did AND m.repository = rp.repository
18251672 ORDER BY ` + orderBy + `
···18731720 FROM manifests
18741721 WHERE did = ?
18751722 GROUP BY did, repository
18761876- ),
18771877- repo_stats AS (
18781878- SELECT
18791879- lm.did,
18801880- lm.repository,
18811881- COALESCE(rs.pull_count, 0) as pull_count,
18821882- COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = lm.did AND repository = lm.repository), 0) as star_count
18831883- FROM latest_manifests lm
18841884- LEFT JOIN repository_stats rs ON lm.did = rs.did AND lm.repository = rs.repository
18851723 )
18861724 SELECT
18871725 m.did,
···18911729 COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'org.opencontainers.image.title'), ''),
18921730 COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'org.opencontainers.image.description'), ''),
18931731 COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'io.atcr.icon'), ''),
18941894- repo_stats.star_count,
18951895- repo_stats.pull_count,
17321732+ COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = m.did AND repository = m.repository), 0),
17331733+ COALESCE(rs.pull_count, 0),
18961734 COALESCE((SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = m.did AND repository = m.repository), 0),
18971735 COALESCE(m.artifact_type, 'container-image'),
18981736 COALESCE((SELECT tag FROM tags WHERE did = m.did AND repository = m.repository ORDER BY created_at DESC LIMIT 1), ''),
···19021740 FROM latest_manifests lm
19031741 JOIN manifests m ON lm.latest_id = m.id
19041742 JOIN users u ON m.did = u.did
19051905- JOIN repo_stats ON m.did = repo_stats.did AND m.repository = repo_stats.repository
19061743 LEFT JOIN repository_stats rs ON m.did = rs.did AND m.repository = rs.repository
19071744 LEFT JOIN repo_pages rp ON m.did = rp.did AND m.repository = rp.repository
19081745 ORDER BY COALESCE(rs.last_push, m.created_at) DESC
···2121// BackfillWorker uses com.atproto.sync.listReposByCollection to backfill historical data
2222type BackfillWorker struct {
2323 db *sql.DB
2424- client *atproto.Client
2424+ endpoints *EndpointRotator
2525 processor *Processor // Shared processor for DB operations
2626 defaultHoldDID string // Default hold DID from AppView config (e.g., "did:web:hold01.atcr.io")
2727 testMode bool // If true, suppress warnings for external holds
···4343// defaultHoldDID should be in format "did:web:hold01.atcr.io"
4444// To find a hold's DID, visit: https://hold-url/.well-known/did.json
4545// refresher is optional - if provided, backfill will try to update PDS records when fetching README content
4646-func NewBackfillWorker(database *sql.DB, relayEndpoint, defaultHoldDID string, testMode bool, refresher *oauth.Refresher) (*BackfillWorker, error) {
4747- // Create client for relay - used only for listReposByCollection
4848- client := atproto.NewClient(relayEndpoint, "", "")
4646+func NewBackfillWorker(database *sql.DB, relayEndpoints []string, defaultHoldDID string, testMode bool, refresher *oauth.Refresher) (*BackfillWorker, error) {
4747+ if len(relayEndpoints) == 0 {
4848+ relayEndpoints = []string{"https://relay1.us-east.bsky.network"}
4949+ }
49505051 return &BackfillWorker{
5152 db: database,
5252- client: client, // This points to the relay
5353+ endpoints: NewEndpointRotator(relayEndpoints),
5354 processor: NewProcessor(database, false, NewStatsCache()), // Stats cache for aggregation
5455 defaultHoldDID: defaultHoldDID,
5556 testMode: testMode,
···100101 return nil
101102}
102103104104+// listReposByCollectionWithFailover tries all relay endpoints to list repos for a collection.
105105+// On failure it advances to the next endpoint. Returns error only if all endpoints fail.
106106+func (b *BackfillWorker) listReposByCollectionWithFailover(ctx context.Context, collection string, limit int, cursor string) (*atproto.ListReposByCollectionResult, error) {
107107+ var lastErr error
108108+ for i := 0; i < b.endpoints.Len(); i++ {
109109+ endpoint := b.endpoints.Current()
110110+ client := atproto.NewClient(endpoint, "", "")
111111+112112+ result, err := client.ListReposByCollection(ctx, collection, limit, cursor)
113113+ if err == nil {
114114+ return result, nil
115115+ }
116116+117117+ lastErr = err
118118+ nextEndpoint := b.endpoints.Next()
119119+ slog.Warn("Backfill relay failed, trying next",
120120+ "failed_endpoint", endpoint,
121121+ "next_endpoint", nextEndpoint,
122122+ "error", err)
123123+ }
124124+ return nil, fmt.Errorf("all relay endpoints failed: %w", lastErr)
125125+}
126126+103127// backfillCollection backfills a single collection
104128func (b *BackfillWorker) backfillCollection(ctx context.Context, collection string) error {
105129 var repoCursor string
···108132109133 // Paginate through all repos with this collection
110134 for {
111111- // List repos that have records in this collection
112112- result, err := b.client.ListReposByCollection(ctx, collection, 1000, repoCursor)
135135+ // List repos that have records in this collection (with relay failover)
136136+ result, err := b.listReposByCollectionWithFailover(ctx, collection, 1000, repoCursor)
113137 if err != nil {
114138 return fmt.Errorf("failed to list repos: %w", err)
115139 }
···204228 case atproto.StarCollection:
205229 var starRecord atproto.StarRecord
206230 if err := json.Unmarshal(record.Value, &starRecord); err == nil {
207207- key := fmt.Sprintf("%s/%s", starRecord.Subject.DID, starRecord.Subject.Repository)
208208- foundStars[key] = starRecord.CreatedAt
231231+ if ownerDID, repository, err := starRecord.GetSubjectDIDAndRepository(); err == nil {
232232+ key := fmt.Sprintf("%s/%s", ownerDID, repository)
233233+ foundStars[key] = starRecord.CreatedAt
234234+ }
209235 }
210236 }
211237
+49
pkg/appview/jetstream/endpoint_rotator.go
···11+package jetstream
22+33+import "sync"
44+55+// EndpointRotator cycles through a list of endpoint URLs for failover.
66+// Thread-safe via sync.Mutex.
77+type EndpointRotator struct {
88+ mu sync.Mutex
99+ endpoints []string
1010+ index int
1111+}
1212+1313+// NewEndpointRotator creates a rotator from a list of endpoints.
1414+// Panics if endpoints is empty.
1515+func NewEndpointRotator(endpoints []string) *EndpointRotator {
1616+ if len(endpoints) == 0 {
1717+ panic("EndpointRotator requires at least one endpoint")
1818+ }
1919+ return &EndpointRotator{
2020+ endpoints: endpoints,
2121+ }
2222+}
2323+2424+// Current returns the current endpoint without advancing.
2525+func (r *EndpointRotator) Current() string {
2626+ r.mu.Lock()
2727+ defer r.mu.Unlock()
2828+ return r.endpoints[r.index]
2929+}
3030+3131+// Next advances to the next endpoint (wrapping around) and returns it.
3232+func (r *EndpointRotator) Next() string {
3333+ r.mu.Lock()
3434+ defer r.mu.Unlock()
3535+ r.index = (r.index + 1) % len(r.endpoints)
3636+ return r.endpoints[r.index]
3737+}
3838+3939+// Reset returns to the first endpoint.
4040+func (r *EndpointRotator) Reset() {
4141+ r.mu.Lock()
4242+ defer r.mu.Unlock()
4343+ r.index = 0
4444+}
4545+4646+// Len returns the number of endpoints.
4747+func (r *EndpointRotator) Len() int {
4848+ return len(r.endpoints)
4949+}
+88
pkg/appview/jetstream/endpoint_rotator_test.go
···11+package jetstream
22+33+import (
44+ "sync"
55+ "testing"
66+)
77+88+func TestEndpointRotator_Current(t *testing.T) {
99+ r := NewEndpointRotator([]string{"a", "b", "c"})
1010+ if got := r.Current(); got != "a" {
1111+ t.Errorf("Current() = %q, want %q", got, "a")
1212+ }
1313+ // Calling Current again should not advance
1414+ if got := r.Current(); got != "a" {
1515+ t.Errorf("Current() second call = %q, want %q", got, "a")
1616+ }
1717+}
1818+1919+func TestEndpointRotator_Next(t *testing.T) {
2020+ r := NewEndpointRotator([]string{"a", "b", "c"})
2121+ if got := r.Next(); got != "b" {
2222+ t.Errorf("Next() = %q, want %q", got, "b")
2323+ }
2424+ if got := r.Next(); got != "c" {
2525+ t.Errorf("Next() = %q, want %q", got, "c")
2626+ }
2727+}
2828+2929+func TestEndpointRotator_WrapAround(t *testing.T) {
3030+ r := NewEndpointRotator([]string{"a", "b"})
3131+ r.Next() // -> b
3232+ got := r.Next()
3333+ if got != "a" {
3434+ t.Errorf("Next() after wrap = %q, want %q", got, "a")
3535+ }
3636+}
3737+3838+func TestEndpointRotator_Reset(t *testing.T) {
3939+ r := NewEndpointRotator([]string{"a", "b", "c"})
4040+ r.Next() // -> b
4141+ r.Next() // -> c
4242+ r.Reset()
4343+ if got := r.Current(); got != "a" {
4444+ t.Errorf("Current() after Reset() = %q, want %q", got, "a")
4545+ }
4646+}
4747+4848+func TestEndpointRotator_Len(t *testing.T) {
4949+ r := NewEndpointRotator([]string{"a", "b", "c"})
5050+ if got := r.Len(); got != 3 {
5151+ t.Errorf("Len() = %d, want 3", got)
5252+ }
5353+}
5454+5555+func TestEndpointRotator_SingleEndpoint(t *testing.T) {
5656+ r := NewEndpointRotator([]string{"only"})
5757+ if got := r.Current(); got != "only" {
5858+ t.Errorf("Current() = %q, want %q", got, "only")
5959+ }
6060+ // Next on single endpoint wraps back to itself
6161+ if got := r.Next(); got != "only" {
6262+ t.Errorf("Next() = %q, want %q", got, "only")
6363+ }
6464+}
6565+6666+func TestEndpointRotator_PanicsOnEmpty(t *testing.T) {
6767+ defer func() {
6868+ if r := recover(); r == nil {
6969+ t.Error("NewEndpointRotator(nil) did not panic")
7070+ }
7171+ }()
7272+ NewEndpointRotator(nil)
7373+}
7474+7575+func TestEndpointRotator_Concurrent(t *testing.T) {
7676+ r := NewEndpointRotator([]string{"a", "b", "c", "d"})
7777+ var wg sync.WaitGroup
7878+ for i := 0; i < 100; i++ {
7979+ wg.Add(1)
8080+ go func() {
8181+ defer wg.Done()
8282+ _ = r.Current()
8383+ _ = r.Next()
8484+ }()
8585+ }
8686+ wg.Wait()
8787+ // No race detector failures = success
8888+}
+9-4
pkg/appview/jetstream/processor.go
···405405406406// ProcessStar processes a star record and stores it in the database
407407func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error {
408408- // Unmarshal star record
408408+ // Unmarshal star record (handles both old object and new AT URI subject formats)
409409 var starRecord atproto.StarRecord
410410 if err := json.Unmarshal(recordData, &starRecord); err != nil {
411411 return fmt.Errorf("failed to unmarshal star: %w", err)
412412 }
413413414414+ // Extract owner DID and repository from subject AT URI
415415+ ownerDID, repository, err := starRecord.GetSubjectDIDAndRepository()
416416+ if err != nil {
417417+ return fmt.Errorf("failed to parse star subject: %w", err)
418418+ }
419419+414420 // Ensure the starred repository's owner exists in the users table
415421 // (the starrer is already ensured by ProcessRecord, but the owner
416422 // may not have been processed yet during backfill or live events)
417417- if err := p.EnsureUser(ctx, starRecord.Subject.DID); err != nil {
423423+ if err := p.EnsureUser(ctx, ownerDID); err != nil {
418424 return fmt.Errorf("failed to ensure star subject user: %w", err)
419425 }
420426421427 // Upsert the star record (idempotent - won't duplicate)
422428 // The DID here is the starrer (user who starred)
423423- // The subject contains the owner DID and repository
424429 // Star count will be calculated on demand from the stars table
425425- return db.UpsertStar(p.db, did, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt)
430430+ return db.UpsertStar(p.db, did, ownerDID, repository, starRecord.CreatedAt)
426431}
427432428433// ProcessSailorProfile processes a sailor profile record
+39-8
pkg/appview/jetstream/processor_test.go
···438438 p := NewProcessor(database, false, nil)
439439 ctx := context.Background()
440440441441- // Create test star record
442442- starRecord := &atproto.StarRecord{
443443- Subject: atproto.StarSubject{
444444- DID: "did:plc:owner123",
445445- Repository: "test-app",
446446- },
447447- CreatedAt: time.Now(),
448448- }
441441+ // Create test star record (new AT URI format)
442442+ starRecord := atproto.NewStarRecord("did:plc:owner123", "test-app")
449443450444 // Marshal to bytes for ProcessStar
451445 recordBytes, err := json.Marshal(starRecord)
···488482 }
489483 if count != 1 {
490484 t.Errorf("Expected 1 star after upsert, got %d", count)
485485+ }
486486+}
487487+488488+func TestProcessStar_OldFormat(t *testing.T) {
489489+ database := setupTestDB(t)
490490+ defer database.Close()
491491+492492+ // Insert test users
493493+ for _, did := range []string{"did:plc:starrer456", "did:plc:owner456"} {
494494+ _, err := database.Exec(
495495+ `INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`,
496496+ did, did+".test", "https://pds.example.com", time.Now())
497497+ if err != nil {
498498+ t.Fatalf("Failed to insert test user %s: %v", did, err)
499499+ }
500500+ }
501501+502502+ p := NewProcessor(database, false, nil)
503503+ ctx := context.Background()
504504+505505+ // Old format JSON (object subject with did + repository)
506506+ oldFormatJSON := `{"$type":"io.atcr.sailor.star","subject":{"did":"did:plc:owner456","repository":"legacy-app"},"createdAt":"2025-06-01T00:00:00Z"}`
507507+508508+ err := p.ProcessStar(ctx, "did:plc:starrer456", []byte(oldFormatJSON))
509509+ if err != nil {
510510+ t.Fatalf("ProcessStar (old format) failed: %v", err)
511511+ }
512512+513513+ // Verify star was inserted with correct owner/repo
514514+ var count int
515515+ err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
516516+ "did:plc:starrer456", "did:plc:owner456", "legacy-app").Scan(&count)
517517+ if err != nil {
518518+ t.Fatalf("Failed to query stars: %v", err)
519519+ }
520520+ if count != 1 {
521521+ t.Errorf("Expected 1 star from old format, got %d", count)
491522 }
492523}
493524
+88-4
pkg/appview/jetstream/worker.go
···3131type Worker struct {
3232 db *sql.DB
3333 jetstreamURL string
3434+ endpoints *EndpointRotator
3435 startCursor int64
3536 wantedCollections []string
3637 debugCollectionCount int
···52535354// NewWorker creates a new Jetstream worker
5455// startCursor: Unix microseconds timestamp to start from (0 = start from now)
5555-func NewWorker(database *sql.DB, jetstreamURL string, startCursor int64) *Worker {
5656- if jetstreamURL == "" {
5757- jetstreamURL = "wss://jetstream2.us-west.bsky.network/subscribe"
5656+func NewWorker(database *sql.DB, urls []string, startCursor int64) *Worker {
5757+ if len(urls) == 0 {
5858+ urls = []string{"wss://jetstream2.us-west.bsky.network/subscribe"}
5859 }
59606161+ rotator := NewEndpointRotator(urls)
6262+6063 // Create shared stats cache for aggregating across holds
6164 statsCache := NewStatsCache()
62656366 return &Worker{
6467 db: database,
6565- jetstreamURL: jetstreamURL,
6868+ jetstreamURL: rotator.Current(),
6969+ endpoints: rotator,
6670 startCursor: startCursor,
6771 wantedCollections: []string{
6872 "io.atcr.*", // Subscribe to all ATCR collections
···260264 } else {
261265 eventCount++
262266 }
267267+ }
268268+ }
269269+}
270270+271271+// StartWithFailover runs the Jetstream worker with automatic failover across endpoints.
272272+// On disconnect it retries the same endpoint with escalating delays (1s, 5s, 10s).
273273+// If all retries fail, it fails over to the next endpoint and rewinds the cursor
274274+// 30 seconds to avoid missing events (events are idempotent DB upserts).
275275+// Cycles through all endpoints indefinitely and never gives up.
276276+func (w *Worker) StartWithFailover(ctx context.Context) {
277277+ retryDelays := []time.Duration{1 * time.Second, 5 * time.Second, 10 * time.Second}
278278+279279+ for {
280280+ currentURL := w.endpoints.Current()
281281+ w.jetstreamURL = currentURL
282282+283283+ slog.Info("Jetstream connecting", "url", currentURL)
284284+ err := w.Start(ctx)
285285+ if ctx.Err() != nil {
286286+ return // Context cancelled, clean shutdown
287287+ }
288288+289289+ // Capture cursor at disconnect time for rewind calculation
290290+ disconnectCursor := w.GetLastCursor()
291291+292292+ // Retry same endpoint with escalating delays
293293+ recovered := false
294294+ for i, delay := range retryDelays {
295295+ slog.Warn("Jetstream disconnected, retrying same endpoint",
296296+ "url", currentURL,
297297+ "attempt", i+1,
298298+ "delay", delay,
299299+ "error", err)
300300+ time.Sleep(delay)
301301+302302+ if ctx.Err() != nil {
303303+ return
304304+ }
305305+306306+ w.jetstreamURL = currentURL
307307+ err = w.Start(ctx)
308308+ if ctx.Err() != nil {
309309+ return
310310+ }
311311+ if err == nil {
312312+ recovered = true
313313+ break
314314+ }
315315+ // Update disconnect cursor if we got further
316316+ if latest := w.GetLastCursor(); latest > disconnectCursor {
317317+ disconnectCursor = latest
318318+ }
319319+ }
320320+321321+ if recovered {
322322+ continue
323323+ }
324324+325325+ // All retries failed — failover to next endpoint
326326+ failedURL := currentURL
327327+ nextURL := w.endpoints.Next()
328328+329329+ // Rewind cursor 30 seconds (30M microseconds) to avoid gaps
330330+ if disconnectCursor > 0 {
331331+ rewound := disconnectCursor - 30_000_000
332332+ if rewound < 0 {
333333+ rewound = 0
334334+ }
335335+ w.cursorMutex.Lock()
336336+ w.lastCursor = rewound
337337+ w.startCursor = rewound
338338+ w.cursorMutex.Unlock()
339339+ slog.Warn("Jetstream failing over to next endpoint",
340340+ "failed_url", failedURL,
341341+ "next_url", nextURL,
342342+ "cursor_rewound_by", "30s")
343343+ } else {
344344+ slog.Warn("Jetstream failing over to next endpoint",
345345+ "failed_url", failedURL,
346346+ "next_url", nextURL)
263347 }
264348 }
265349}
···33//go:generate go run generate.go
4455import (
66+ "context"
67 "crypto/sha256"
78 "encoding/base32"
89 "encoding/base64"
···388389 }
389390}
390391391391-// StarSubject represents the subject of a star (the repository being starred)
392392-type StarSubject struct {
393393- // DID is the DID of the repository owner
394394- DID string `json:"did"`
395395-396396- // Repository is the name of the repository
397397- Repository string `json:"repository"`
398398-}
399399-400392// StarRecord represents a user starring a repository
401393// Stored in the starrer's PDS (like Bluesky likes)
394394+// Subject is an AT URI pointing to the repo page record being starred
402395type StarRecord struct {
403396 // Type should be "io.atcr.sailor.star"
404397 Type string `json:"$type"`
405398406406- // Subject is the repository being starred
407407- Subject StarSubject `json:"subject"`
399399+ // Subject is the AT URI of the repo page being starred
400400+ // e.g., "at://did:plc:abc/io.atcr.repo.page/myapp"
401401+ Subject string `json:"subject"`
408402409403 // CreatedAt timestamp
410404 CreatedAt time.Time `json:"createdAt"`
411405}
412406413413-// NewStarRecord creates a new star record
407407+// UnmarshalJSON handles both old format (object subject) and new format (AT URI string subject)
408408+func (s *StarRecord) UnmarshalJSON(data []byte) error {
409409+ // Use a raw type to inspect the subject field
410410+ type starRecordRaw struct {
411411+ Type string `json:"$type"`
412412+ Subject json.RawMessage `json:"subject"`
413413+ CreatedAt time.Time `json:"createdAt"`
414414+ }
415415+ var raw starRecordRaw
416416+ if err := json.Unmarshal(data, &raw); err != nil {
417417+ return fmt.Errorf("failed to unmarshal star record: %w", err)
418418+ }
419419+420420+ s.Type = raw.Type
421421+ s.CreatedAt = raw.CreatedAt
422422+423423+ // Try new format: subject is a string AT URI
424424+ var subjectStr string
425425+ if err := json.Unmarshal(raw.Subject, &subjectStr); err == nil && strings.HasPrefix(subjectStr, "at://") {
426426+ s.Subject = subjectStr
427427+ return nil
428428+ }
429429+430430+ // Fall back to old format: subject is an object with did + repository
431431+ var oldSubject struct {
432432+ DID string `json:"did"`
433433+ Repository string `json:"repository"`
434434+ }
435435+ if err := json.Unmarshal(raw.Subject, &oldSubject); err != nil {
436436+ return fmt.Errorf("failed to unmarshal star subject (neither AT URI string nor {did, repository} object): %w", err)
437437+ }
438438+439439+ s.Subject = BuildRepoPageURI(oldSubject.DID, oldSubject.Repository)
440440+ return nil
441441+}
442442+443443+// GetSubjectDIDAndRepository extracts the owner DID and repository name
444444+// from the star record's subject AT URI. UnmarshalJSON normalizes old format
445445+// to AT URI, so this always works with ParseRepoPageURI.
446446+func (s *StarRecord) GetSubjectDIDAndRepository() (ownerDID, repository string, err error) {
447447+ return ParseRepoPageURI(s.Subject)
448448+}
449449+450450+// NewStarRecord creates a new star record with an AT URI subject
414451func NewStarRecord(ownerDID, repository string) *StarRecord {
415452 return &StarRecord{
416416- Type: StarCollection,
417417- Subject: StarSubject{
418418- DID: ownerDID,
419419- Repository: repository,
420420- },
453453+ Type: StarCollection,
454454+ Subject: BuildRepoPageURI(ownerDID, repository),
421455 CreatedAt: time.Now(),
422456 }
423457}
424458459459+// BuildRepoPageURI creates an AT URI for a repo page record
460460+// e.g., BuildRepoPageURI("did:plc:abc", "myapp") → "at://did:plc:abc/io.atcr.repo.page/myapp"
461461+func BuildRepoPageURI(ownerDID, repository string) string {
462462+ return fmt.Sprintf("at://%s/%s/%s", ownerDID, RepoPageCollection, repository)
463463+}
464464+465465+// ParseRepoPageURI extracts the owner DID and repository from a repo page AT URI
466466+func ParseRepoPageURI(uri string) (ownerDID, repository string, err error) {
467467+ if !strings.HasPrefix(uri, "at://") {
468468+ return "", "", fmt.Errorf("invalid AT URI: must start with 'at://'")
469469+ }
470470+ remainder := strings.TrimPrefix(uri, "at://")
471471+ parts := strings.SplitN(remainder, "/", 3)
472472+ if len(parts) != 3 {
473473+ return "", "", fmt.Errorf("invalid AT URI: expected 3 parts (did/collection/rkey), got %d", len(parts))
474474+ }
475475+ if parts[1] != RepoPageCollection {
476476+ return "", "", fmt.Errorf("invalid AT URI: expected collection %s, got %s", RepoPageCollection, parts[1])
477477+ }
478478+ return parts[0], parts[2], nil
479479+}
480480+425481// StarRecordKey generates a record key for a star
426482// Uses a simple hash to ensure uniqueness and prevent duplicate stars
427483func StarRecordKey(ownerDID, repository string) string {
···444500 }
445501446502 return parts[0], parts[1], nil
503503+}
504504+505505+// MigrateStarRecords lists the user's star records and rewrites any old-format
506506+// records (object subject with did/repository) to the new AT URI format.
507507+// Returns the number of records migrated.
508508+func MigrateStarRecords(ctx context.Context, client *Client) (int, error) {
509509+ migrated := 0
510510+ cursor := ""
511511+512512+ for {
513513+ records, nextCursor, err := client.ListRecordsWithCursor(ctx, StarCollection, 100, cursor)
514514+ if err != nil {
515515+ return migrated, fmt.Errorf("failed to list star records: %w", err)
516516+ }
517517+518518+ for _, rec := range records {
519519+ // Try to unmarshal as new format (string subject) — skip if already migrated
520520+ var subjectStr string
521521+ if err := json.Unmarshal(rec.Value, &struct {
522522+ Subject *string `json:"subject"`
523523+ }{Subject: &subjectStr}); err == nil && strings.HasPrefix(subjectStr, "at://") {
524524+ continue
525525+ }
526526+527527+ // Old format — unmarshal via StarRecord (which normalizes to AT URI)
528528+ var starRecord StarRecord
529529+ if err := json.Unmarshal(rec.Value, &starRecord); err != nil {
530530+ continue
531531+ }
532532+533533+ // Extract rkey from the record URI (at://did/collection/rkey)
534534+ uriParts := strings.Split(rec.URI, "/")
535535+ if len(uriParts) < 2 {
536536+ continue
537537+ }
538538+ rkey := uriParts[len(uriParts)-1]
539539+540540+ // Rewrite with new format at the same rkey
541541+ newRecord := &StarRecord{
542542+ Type: StarCollection,
543543+ Subject: starRecord.Subject, // Already normalized to AT URI by UnmarshalJSON
544544+ CreatedAt: starRecord.CreatedAt,
545545+ }
546546+ if _, err := client.PutRecord(ctx, StarCollection, rkey, newRecord); err != nil {
547547+ return migrated, fmt.Errorf("failed to migrate star record %s: %w", rec.URI, err)
548548+ }
549549+ migrated++
550550+ }
551551+552552+ if nextCursor == "" {
553553+ break
554554+ }
555555+ cursor = nextCursor
556556+ }
557557+558558+ return migrated, nil
447559}
448560449561// ResolveHoldDIDFromURL converts a hold endpoint URL to a did:web DID
···66package hold
7788import (
99- "bytes"
109 "context"
1111- "encoding/json"
1210 "fmt"
1311 "log/slog"
1414- "net/http"
1515- "net/url"
1612 "path/filepath"
1713 "time"
1814···139135140136 // PDS signing key path.
141137 KeyPath string `yaml:"key_path" comment:"PDS signing key path. Defaults to {database.path}/signing.key."`
138138+139139+ // libSQL sync URL for embedded replica mode.
140140+ LibsqlSyncURL string `yaml:"libsql_sync_url" comment:"libSQL sync URL (libsql://...). Works with Turso cloud, Bunny DB, or self-hosted libsql-server. Leave empty for local-only SQLite."`
141141+142142+ // Auth token for libSQL sync.
143143+ LibsqlAuthToken string `yaml:"libsql_auth_token" comment:"Auth token for libSQL sync. Required if libsql_sync_url is set."`
144144+145145+ // How often to sync with remote libSQL server.
146146+ LibsqlSyncInterval time.Duration `yaml:"libsql_sync_interval" comment:"How often to sync with remote libSQL server. Default: 60s."`
142147}
143148144149// setHoldDefaults registers all default values on the given Viper instance.
···164169 // Database defaults
165170 v.SetDefault("database.path", "/var/lib/atcr-hold")
166171 v.SetDefault("database.key_path", "")
172172+ v.SetDefault("database.libsql_sync_url", "")
173173+ v.SetDefault("database.libsql_auth_token", "")
174174+ v.SetDefault("database.libsql_sync_interval", "60s")
167175168176 // Admin defaults
169177 v.SetDefault("admin.enabled", false)
···285293 return storageCfg
286294}
287295288288-// RequestCrawl sends a crawl request to the ATProto relay for the given hostname.
289289-// This makes the hold's PDS discoverable by the relay network.
290290-func RequestCrawl(relayEndpoint, publicURL string) error {
291291- if relayEndpoint == "" {
292292- return nil // No relay configured, skip
293293- }
294294-295295- // Extract hostname from public URL
296296- parsed, err := url.Parse(publicURL)
297297- if err != nil {
298298- return fmt.Errorf("failed to parse public URL: %w", err)
299299- }
300300- hostname := parsed.Host
301301-302302- // Build the request URL
303303- requestURL := relayEndpoint + "/xrpc/com.atproto.sync.requestCrawl"
304304-305305- // Create request body
306306- body := map[string]string{"hostname": hostname}
307307- bodyJSON, err := json.Marshal(body)
308308- if err != nil {
309309- return fmt.Errorf("failed to marshal request body: %w", err)
310310- }
311311-312312- // Make the request
313313- client := &http.Client{Timeout: 10 * time.Second}
314314- req, err := http.NewRequest("POST", requestURL, bytes.NewReader(bodyJSON))
315315- if err != nil {
316316- return fmt.Errorf("failed to create request: %w", err)
317317- }
318318- req.Header.Set("Content-Type", "application/json")
319319-320320- resp, err := client.Do(req)
321321- if err != nil {
322322- return fmt.Errorf("failed to send request: %w", err)
323323- }
324324- defer resp.Body.Close()
325325-326326- if resp.StatusCode < 200 || resp.StatusCode >= 300 {
327327- return fmt.Errorf("relay returned status %d", resp.StatusCode)
328328- }
329329-330330- return nil
331331-}
+101
pkg/hold/db/hold_db.go
···11+package db
22+33+import (
44+ "database/sql"
55+ "fmt"
66+ "io"
77+ "log/slog"
88+ "strings"
99+ "time"
1010+1111+ "github.com/tursodatabase/go-libsql"
1212+)
1313+1414+// LibsqlConfig holds optional libSQL sync settings for embedded replicas.
1515+// When SyncURL is empty, the database operates in local-only mode.
1616+type LibsqlConfig struct {
1717+ SyncURL string
1818+ AuthToken string
1919+ SyncInterval time.Duration
2020+}
2121+2222+// HoldDB wraps the shared *sql.DB and optional connector for lifecycle management.
2323+type HoldDB struct {
2424+ DB *sql.DB
2525+ connector io.Closer // non-nil only in embedded replica mode
2626+}
2727+2828+// Close closes the database connection and connector (if any).
2929+// The connector must be closed to release file locks.
3030+func (h *HoldDB) Close() error {
3131+ var dbErr, connErr error
3232+ if h.DB != nil {
3333+ dbErr = h.DB.Close()
3434+ }
3535+ if h.connector != nil {
3636+ connErr = h.connector.Close()
3737+ }
3838+ if dbErr != nil {
3939+ return dbErr
4040+ }
4141+ return connErr
4242+}
4343+4444+// OpenHoldDB initializes the hold's shared database connection.
4545+// Uses libSQL embedded replica when cfg.SyncURL is set, otherwise local-only.
4646+// The caller must call HoldDB.Close() on shutdown.
4747+func OpenHoldDB(path string, cfg LibsqlConfig) (*HoldDB, error) {
4848+ var db *sql.DB
4949+ var connector io.Closer
5050+5151+ if cfg.SyncURL != "" {
5252+ // Embedded replica mode: local file + sync to remote
5353+ opts := []libsql.Option{
5454+ libsql.WithAuthToken(cfg.AuthToken),
5555+ }
5656+ if cfg.SyncInterval > 0 {
5757+ opts = append(opts, libsql.WithSyncInterval(cfg.SyncInterval))
5858+ }
5959+ conn, err := libsql.NewEmbeddedReplicaConnector(path, cfg.SyncURL, opts...)
6060+ if err != nil {
6161+ return nil, fmt.Errorf("failed to create libsql embedded replica connector: %w", err)
6262+ }
6363+ db = sql.OpenDB(conn)
6464+ connector = conn
6565+ slog.Info("Hold database opened in embedded replica mode", "path", path, "sync_url", cfg.SyncURL)
6666+ } else {
6767+ // Local-only mode: plain file via libsql driver
6868+ dsn := path
6969+ if !strings.HasPrefix(path, "file:") && !strings.HasPrefix(path, ":memory:") {
7070+ dsn = "file:" + path
7171+ }
7272+ var err error
7373+ db, err = sql.Open("libsql", dsn)
7474+ if err != nil {
7575+ return nil, fmt.Errorf("failed to open hold database: %w", err)
7676+ }
7777+ slog.Info("Hold database opened in local-only mode", "path", path)
7878+ }
7979+8080+ // In local-only mode, configure WAL and busy_timeout locally.
8181+ // In embedded replica mode, the remote server manages these settings
8282+ // and PRAGMA assignments are rejected as "unsupported statement"
8383+ // (observed with Bunny Database).
8484+ if cfg.SyncURL == "" {
8585+ var journalMode string
8686+ if err := db.QueryRow("PRAGMA journal_mode = WAL").Scan(&journalMode); err != nil {
8787+ return nil, fmt.Errorf("failed to set journal mode: %w", err)
8888+ }
8989+ var busyTimeout int
9090+ if err := db.QueryRow("PRAGMA busy_timeout = 5000").Scan(&busyTimeout); err != nil {
9191+ return nil, fmt.Errorf("failed to set busy_timeout: %w", err)
9292+ }
9393+ }
9494+9595+ // Foreign keys work in both modes
9696+ if _, err := db.Exec("PRAGMA foreign_keys = ON"); err != nil {
9797+ return nil, fmt.Errorf("failed to enable foreign keys: %w", err)
9898+ }
9999+100100+ return &HoldDB{DB: db, connector: connector}, nil
101101+}
+6-1
pkg/hold/db/sqlite_store.go
···5050type SQLiteStore struct {
5151 dbPath string
5252 db *sql.DB
5353+ ownsDB bool // true when this store opened the connection itself
53545455 log *slog.Logger
5556···120121 }
121122 sqs.db = db
122123 sqs.dbPath = path
124124+ sqs.ownsDB = true
123125 err = sqs.createTables()
124126 if err != nil {
125127 return fmt.Errorf("%s: sqlite could not create tables, %w", path, err)
···459461}
460462461463func (sqs *SQLiteStore) Close() error {
462462- return sqs.db.Close()
464464+ if sqs.ownsDB {
465465+ return sqs.db.Close()
466466+ }
467467+ return nil
463468}
464469465470func (sqs *SQLiteStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) {
+43-5
pkg/hold/pds/events.go
···3232 holdDID string // DID of the hold for setting repo field
3333 db *sql.DB // Database for persistent event storage
3434 dbPath string // Path to database file
3535+ ownsDB bool // true when this broadcaster opened the connection itself
3536}
36373738// Subscriber represents a WebSocket client subscribed to the firehose
···8788 return broadcaster
8889}
89909191+// NewEventBroadcasterWithDB creates an event broadcaster using an existing *sql.DB connection.
9292+// The caller is responsible for the DB lifecycle.
9393+func NewEventBroadcasterWithDB(holdDID string, maxHistory int, db *sql.DB) *EventBroadcaster {
9494+ if maxHistory <= 0 {
9595+ maxHistory = 100
9696+ }
9797+9898+ broadcaster := &EventBroadcaster{
9999+ subscribers: make(map[*Subscriber]bool),
100100+ eventSeq: 0,
101101+ eventHistory: make([]HistoricalEvent, 0, maxHistory),
102102+ maxHistory: maxHistory,
103103+ holdDID: holdDID,
104104+ db: db,
105105+ ownsDB: false,
106106+ }
107107+108108+ if db != nil {
109109+ if err := broadcaster.initSchema(); err != nil {
110110+ slog.Warn("Failed to initialize event schema", "error", err)
111111+ slog.Warn("Events will not persist across restarts")
112112+ }
113113+ }
114114+115115+ return broadcaster
116116+}
117117+90118// initDatabase opens database connection, creates table, and loads last sequence
91119func (b *EventBroadcaster) initDatabase() error {
92120 // Open database connection
···106134 }
107135108136 b.db = db
137137+ b.ownsDB = true
138138+139139+ if err := b.initSchema(); err != nil {
140140+ db.Close()
141141+ b.db = nil
142142+ return err
143143+ }
109144145145+ return nil
146146+}
147147+148148+// initSchema creates the events table and loads last sequence number
149149+func (b *EventBroadcaster) initSchema() error {
110150 // Create events table if it doesn't exist
111151 // Execute statements individually for go-libsql compatibility
112152 stmts := []string{
···123163 }
124164125165 for _, stmt := range stmts {
126126- if _, err := db.Exec(stmt); err != nil {
127127- db.Close()
128128- b.db = nil
166166+ if _, err := b.db.Exec(stmt); err != nil {
129167 return err
130168 }
131169 }
132170133171 // Load last sequence number from database
134172 var lastSeq sql.NullInt64
135135- err = db.QueryRow("SELECT MAX(seq) FROM firehose_events").Scan(&lastSeq)
173173+ err := b.db.QueryRow("SELECT MAX(seq) FROM firehose_events").Scan(&lastSeq)
136174 if err != nil {
137175 slog.Warn("Failed to load last event sequence", "error", err)
138176 } else if lastSeq.Valid {
···328366329367// Close closes the database connection
330368func (b *EventBroadcaster) Close() error {
331331- if b.db != nil {
369369+ if b.db != nil && b.ownsDB {
332370 return b.db.Close()
333371 }
334372 return nil
+37-4
pkg/hold/pds/records.go
···1818// This follows the official ATProto PDS pattern of using SQL for queries
1919// while MST is used for sync operations.
2020type RecordsIndex struct {
2121- db *sql.DB
2121+ db *sql.DB
2222+ ownsDB bool // true when this index opened the connection itself
2223}
23242425// Record represents a record in the index
···8485 }
8586 }
86878787- return &RecordsIndex{db: db}, nil
8888+ return &RecordsIndex{db: db, ownsDB: true}, nil
8889}
89909090-// Close closes the database connection
9191+// NewRecordsIndexWithDB creates a records index using an existing *sql.DB connection.
9292+// The caller is responsible for the DB lifecycle.
9393+func NewRecordsIndexWithDB(db *sql.DB) (*RecordsIndex, error) {
9494+ // Check if table exists and has the did column
9595+ needsRebuild := false
9696+ var tableName string
9797+ err := db.QueryRow(`SELECT name FROM sqlite_master WHERE type='table' AND name='records'`).Scan(&tableName)
9898+ if err == nil {
9999+ var colCount int
100100+ err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('records') WHERE name='did'`).Scan(&colCount)
101101+ if err != nil || colCount == 0 {
102102+ needsRebuild = true
103103+ slog.Info("Records index schema outdated, rebuilding with did column")
104104+ }
105105+ }
106106+107107+ if needsRebuild {
108108+ _, err = db.Exec(`DROP TABLE IF EXISTS records`)
109109+ if err != nil {
110110+ return nil, fmt.Errorf("failed to drop old records table: %w", err)
111111+ }
112112+ }
113113+114114+ for _, stmt := range splitStatements(recordsSchema) {
115115+ if _, err = db.Exec(stmt); err != nil {
116116+ return nil, fmt.Errorf("failed to create records schema: %w", err)
117117+ }
118118+ }
119119+120120+ return &RecordsIndex{db: db, ownsDB: false}, nil
121121+}
122122+123123+// Close closes the database connection if this index owns it.
91124func (ri *RecordsIndex) Close() error {
9292- if ri.db != nil {
125125+ if ri.db != nil && ri.ownsDB {
93126 return ri.db.Close()
94127 }
95128 return nil
+27-1
pkg/hold/pds/scan_broadcaster.go
···3232 pds *HoldPDS
3333 ackTimeout time.Duration
3434 secret string // Shared secret for scanner authentication
3535+ ownsDB bool // true when this broadcaster opened the connection itself
3536}
36373738// ScanSubscriber represents a connected scanner WebSocket client
···102103 pds: holdPDS,
103104 ackTimeout: 5 * time.Minute,
104105 secret: secret,
106106+ ownsDB: true,
105107 }
106108107109 if err := sb.initSchema(); err != nil {
···110112 }
111113112114 // Start re-dispatch loop for timed-out jobs
115115+ go sb.reDispatchLoop()
116116+117117+ return sb, nil
118118+}
119119+120120+// NewScanBroadcasterWithDB creates a scan job broadcaster using an existing *sql.DB connection.
121121+// The caller is responsible for the DB lifecycle.
122122+func NewScanBroadcasterWithDB(holdDID, holdEndpoint, secret string, db *sql.DB, driver storagedriver.StorageDriver, holdPDS *HoldPDS) (*ScanBroadcaster, error) {
123123+ sb := &ScanBroadcaster{
124124+ subscribers: make([]*ScanSubscriber, 0),
125125+ db: db,
126126+ holdDID: holdDID,
127127+ holdEndpoint: holdEndpoint,
128128+ driver: driver,
129129+ pds: holdPDS,
130130+ ackTimeout: 5 * time.Minute,
131131+ secret: secret,
132132+ ownsDB: false,
133133+ }
134134+135135+ if err := sb.initSchema(); err != nil {
136136+ return nil, fmt.Errorf("failed to initialize scan_jobs schema: %w", err)
137137+ }
138138+113139 go sb.reDispatchLoop()
114140115141 return sb, nil
···590616591617// Close closes the scan broadcaster's database connection
592618func (sb *ScanBroadcaster) Close() error {
593593- if sb.db != nil {
619619+ if sb.db != nil && sb.ownsDB {
594620 return sb.db.Close()
595621 }
596622 return nil
+46
pkg/hold/pds/server.go
···2233import (
44 "context"
55+ "database/sql"
56 "fmt"
67 "log/slog"
78 "os"
···108109 if err != nil {
109110 return nil, fmt.Errorf("failed to create records index: %w", err)
110111 }
112112+ }
113113+114114+ return &HoldPDS{
115115+ did: did,
116116+ PublicURL: publicURL,
117117+ carstore: cs,
118118+ repomgr: rm,
119119+ dbPath: dbPath,
120120+ uid: uid,
121121+ signingKey: signingKey,
122122+ enableBlueskyPosts: enableBlueskyPosts,
123123+ recordsIndex: recordsIndex,
124124+ }, nil
125125+}
126126+127127+// NewHoldPDSWithDB creates or opens a hold PDS using an existing *sql.DB connection.
128128+// The caller is responsible for the DB lifecycle. Used when the database is
129129+// centrally managed (e.g., with libsql embedded replicas).
130130+func NewHoldPDSWithDB(ctx context.Context, did, publicURL, dbPath, keyPath string, enableBlueskyPosts bool, db *sql.DB) (*HoldPDS, error) {
131131+ signingKey, err := oauth.GenerateOrLoadPDSKey(keyPath)
132132+ if err != nil {
133133+ return nil, fmt.Errorf("failed to initialize signing key: %w", err)
134134+ }
135135+136136+ // Use shared DB for carstore
137137+ sqlStore, err := holddb.NewSQLiteStoreWithDB(dbPath, db)
138138+ if err != nil {
139139+ return nil, fmt.Errorf("failed to create sqlite store with shared DB: %w", err)
140140+ }
141141+142142+ cs := sqlStore
143143+ uid := models.Uid(1)
144144+ kmgr := NewHoldKeyManager(signingKey)
145145+ rm := NewRepoManager(cs, kmgr)
146146+147147+ head, err := cs.GetUserRepoHead(ctx, uid)
148148+ hasValidRepo := (err == nil && head.Defined())
149149+ if !hasValidRepo {
150150+ slog.Info("New hold repo - will be initialized in Bootstrap")
151151+ }
152152+153153+ // Use shared DB for records index
154154+ recordsIndex, err := NewRecordsIndexWithDB(db)
155155+ if err != nil {
156156+ return nil, fmt.Errorf("failed to create records index with shared DB: %w", err)
111157 }
112158113159 return &HoldPDS{
+52-17
pkg/hold/server.go
···1010 "syscall"
1111 "time"
12121313+ "atcr.io/pkg/atproto"
1314 "atcr.io/pkg/hold/admin"
1415 "atcr.io/pkg/hold/billing"
1616+ holddb "atcr.io/pkg/hold/db"
1517 "atcr.io/pkg/hold/gc"
1618 "atcr.io/pkg/hold/oci"
1719 "atcr.io/pkg/hold/pds"
···4749 scanBroadcaster *pds.ScanBroadcaster
4850 garbageCollector *gc.GarbageCollector
4951 adminUI *admin.AdminUI
5252+ holdDB *holddb.HoldDB // shared database connection (nil for :memory:)
5053}
51545255// NewHoldServer initializes PDS, storage, quota, XRPC handlers, and returns
···75787679 ctx := context.Background()
7780 var err error
7878- s.PDS, err = pds.NewHoldPDS(ctx, holdDID, cfg.Server.PublicURL, cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts)
7979- if err != nil {
8080- return nil, fmt.Errorf("failed to initialize embedded PDS: %w", err)
8181+8282+ if cfg.Database.Path != ":memory:" {
8383+ // File mode: open centralized shared DB (supports embedded replica sync)
8484+ dbFilePath := cfg.Database.Path + "/db.sqlite3"
8585+ libsqlCfg := holddb.LibsqlConfig{
8686+ SyncURL: cfg.Database.LibsqlSyncURL,
8787+ AuthToken: cfg.Database.LibsqlAuthToken,
8888+ SyncInterval: cfg.Database.LibsqlSyncInterval,
8989+ }
9090+ s.holdDB, err = holddb.OpenHoldDB(dbFilePath, libsqlCfg)
9191+ if err != nil {
9292+ return nil, fmt.Errorf("failed to open hold database: %w", err)
9393+ }
9494+9595+ // Use shared DB for all subsystems
9696+ s.PDS, err = pds.NewHoldPDSWithDB(ctx, holdDID, cfg.Server.PublicURL, cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts, s.holdDB.DB)
9797+ if err != nil {
9898+ return nil, fmt.Errorf("failed to initialize embedded PDS: %w", err)
9999+ }
100100+101101+ s.broadcaster = pds.NewEventBroadcasterWithDB(holdDID, 100, s.holdDB.DB)
102102+ } else {
103103+ // In-memory mode (tests): each subsystem opens its own connection
104104+ s.PDS, err = pds.NewHoldPDS(ctx, holdDID, cfg.Server.PublicURL, cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts)
105105+ if err != nil {
106106+ return nil, fmt.Errorf("failed to initialize embedded PDS: %w", err)
107107+ }
108108+109109+ s.broadcaster = pds.NewEventBroadcaster(holdDID, 100, ":memory:")
81110 }
8211183112 // Create storage driver from config (needed for bootstrap profile avatar)
···91120 return nil, fmt.Errorf("failed to bootstrap PDS: %w", err)
92121 }
931229494- // Create event broadcaster for subscribeRepos firehose
9595- var dbPath string
9696- if cfg.Database.Path != ":memory:" {
9797- dbPath = cfg.Database.Path + "/db.sqlite3"
9898- } else {
9999- dbPath = ":memory:"
100100- }
101101- s.broadcaster = pds.NewEventBroadcaster(holdDID, 100, dbPath)
102102-103123 // Bootstrap events from existing repo records (one-time migration)
104124 if err := s.broadcaster.BootstrapFromRepo(s.PDS); err != nil {
105125 slog.Warn("Failed to bootstrap events from repo", "error", err)
···151171 // Initialize scan broadcaster if scanner secret is configured
152172 if cfg.Scanner.Secret != "" {
153173 holdDID := pds.GenerateDIDFromURL(cfg.Server.PublicURL)
154154- scanDBPath := cfg.Database.Path + "/db.sqlite3"
155155- sb, err := pds.NewScanBroadcaster(holdDID, cfg.Server.PublicURL, cfg.Scanner.Secret, scanDBPath, driver, s.PDS)
174174+ var sb *pds.ScanBroadcaster
175175+ if s.holdDB != nil {
176176+ sb, err = pds.NewScanBroadcasterWithDB(holdDID, cfg.Server.PublicURL, cfg.Scanner.Secret, s.holdDB.DB, driver, s.PDS)
177177+ } else {
178178+ scanDBPath := cfg.Database.Path + "/db.sqlite3"
179179+ sb, err = pds.NewScanBroadcaster(holdDID, cfg.Server.PublicURL, cfg.Scanner.Secret, scanDBPath, driver, s.PDS)
180180+ }
156181 if err != nil {
157182 return nil, fmt.Errorf("failed to initialize scan broadcaster: %w", err)
158183 }
···200225 // Initialize and register admin panel if enabled
201226 if cfg.Admin.Enabled && s.PDS != nil {
202227 adminCfg := admin.AdminConfig{
203203- Enabled: true,
204204- PublicURL: cfg.Server.PublicURL,
228228+ Enabled: true,
229229+ PublicURL: cfg.Server.PublicURL,
230230+ ConfigPath: cfg.ConfigPath(),
205231 }
206232207233 s.adminUI, err = admin.NewAdminUI(context.Background(), s.PDS, s.QuotaManager, adminCfg)
···269295 // Request crawl from relay to make PDS discoverable
270296 if s.Config.Server.RelayEndpoint != "" {
271297 slog.Info("Requesting crawl from relay", "relay", s.Config.Server.RelayEndpoint)
272272- if err := RequestCrawl(s.Config.Server.RelayEndpoint, s.Config.Server.PublicURL); err != nil {
298298+ if err := atproto.RequestCrawl(s.Config.Server.RelayEndpoint, s.Config.Server.PublicURL); err != nil {
273299 slog.Warn("Failed to request crawl from relay", "error", err)
274300 } else {
275301 slog.Info("Crawl requested successfully")
···336362 slog.Warn("Failed to close admin panel", "error", err)
337363 } else {
338364 slog.Info("Admin panel closed")
365365+ }
366366+ }
367367+368368+ // Close shared database connection and connector (after all subsystems)
369369+ if s.holdDB != nil {
370370+ if err := s.holdDB.Close(); err != nil {
371371+ slog.Warn("Failed to close hold database", "error", err)
372372+ } else {
373373+ slog.Info("Hold database closed")
339374 }
340375 }
341376