···11+package atp
22+33+import (
44+ "bytes"
55+ "context"
66+ "encoding/json"
77+ "fmt"
88+ "io"
99+ "net/http"
1010+1111+ "github.com/bluesky-social/indigo/api/atproto"
1212+ "github.com/bluesky-social/indigo/atproto/atclient"
1313+ "github.com/bluesky-social/indigo/atproto/syntax"
1414+)
1515+1616+// Maximum PDS blob size (1 MB).
1717+const MaxBlobSize = 1024 * 1024
1818+1919+// Client wraps an authenticated atclient.APIClient for PDS CRUD operations.
2020+// Create one per-user session via `ResumeSession` or [NewClient].
2121+type Client struct {
2222+ api *atclient.APIClient
2323+ did syntax.DID
2424+}
2525+2626+// NewClient wraps an existing indigo APIClient. Use this when you manage
2727+// OAuth sessions yourself (e.g. CLI apps that call ResumeSession directly).
2828+func NewClient(api *atclient.APIClient, did syntax.DID) *Client {
2929+ return &Client{api: api, did: did}
3030+}
3131+3232+// DID returns the authenticated user's DID.
3333+func (c *Client) DID() syntax.DID { return c.did }
3434+3535+// APIClient returns the underlying indigo APIClient for advanced usage
3636+// (custom XRPC calls, service proxying, etc.).
3737+func (c *Client) APIClient() *atclient.APIClient { return c.api }
3838+3939+// CreateRecord creates a new record with an auto-generated TID key.
4040+func (c *Client) CreateRecord(ctx context.Context, collection string, record any) (uri, cid string, err error) {
4141+ body := map[string]any{
4242+ "repo": c.did.String(),
4343+ "collection": collection,
4444+ "record": record,
4545+ }
4646+4747+ var result struct {
4848+ URI string `json:"uri"`
4949+ CID string `json:"cid"`
5050+ }
5151+ if err := c.api.Post(ctx, "com.atproto.repo.createRecord", body, &result); err != nil {
5252+ return "", "", WrapPDSError(fmt.Errorf("create record in %s: %w", collection, err))
5353+ }
5454+ return result.URI, result.CID, nil
5555+}
5656+5757+// CreateRecordWithRKey creates a new record with a specific record key.
5858+func (c *Client) CreateRecordWithRKey(ctx context.Context, collection, rkey string, record any) (uri, cid string, err error) {
5959+ body := map[string]any{
6060+ "repo": c.did.String(),
6161+ "collection": collection,
6262+ "rkey": rkey,
6363+ "record": record,
6464+ }
6565+6666+ var result struct {
6767+ URI string `json:"uri"`
6868+ CID string `json:"cid"`
6969+ }
7070+ if err := c.api.Post(ctx, "com.atproto.repo.createRecord", body, &result); err != nil {
7171+ return "", "", WrapPDSError(fmt.Errorf("create record %s/%s: %w", collection, rkey, err))
7272+ }
7373+ return result.URI, result.CID, nil
7474+}
7575+7676+// GetRecord retrieves a single record by collection and record key.
7777+func (c *Client) GetRecord(ctx context.Context, collection, rkey string) (*Record, error) {
7878+ params := map[string]any{
7979+ "repo": c.did.String(),
8080+ "collection": collection,
8181+ "rkey": rkey,
8282+ }
8383+8484+ var result struct {
8585+ URI string `json:"uri"`
8686+ CID string `json:"cid"`
8787+ Value map[string]any `json:"value"`
8888+ }
8989+ if err := c.api.Get(ctx, "com.atproto.repo.getRecord", params, &result); err != nil {
9090+ return nil, WrapPDSError(fmt.Errorf("get record %s/%s: %w", collection, rkey, err))
9191+ }
9292+ return &Record{URI: result.URI, CID: result.CID, Value: result.Value}, nil
9393+}
9494+9595+// ListRecords retrieves a single page of records from a collection.
9696+// Pass limit <= 0 for the server default (usually 50). Pass empty cursor for the first page.
9797+func (c *Client) ListRecords(ctx context.Context, collection string, limit int, cursor string) (*ListResult, error) {
9898+ params := map[string]any{
9999+ "repo": c.did.String(),
100100+ "collection": collection,
101101+ }
102102+ if limit > 0 {
103103+ params["limit"] = limit
104104+ }
105105+ if cursor != "" {
106106+ params["cursor"] = cursor
107107+ }
108108+109109+ var result struct {
110110+ Records []struct {
111111+ URI string `json:"uri"`
112112+ CID string `json:"cid"`
113113+ Value map[string]any `json:"value"`
114114+ } `json:"records"`
115115+ Cursor *string `json:"cursor,omitempty"`
116116+ }
117117+ if err := c.api.Get(ctx, "com.atproto.repo.listRecords", params, &result); err != nil {
118118+ return nil, WrapPDSError(fmt.Errorf("list records %s: %w", collection, err))
119119+ }
120120+121121+ records := make([]Record, len(result.Records))
122122+ for i, r := range result.Records {
123123+ records[i] = Record{URI: r.URI, CID: r.CID, Value: r.Value}
124124+ }
125125+126126+ out := &ListResult{Records: records}
127127+ if result.Cursor != nil && *result.Cursor != "" {
128128+ out.Cursor = *result.Cursor
129129+ }
130130+ return out, nil
131131+}
132132+133133+// ListAllRecords fetches every record in a collection, handling cursor
134134+// pagination automatically. Returns all records at once.
135135+func (c *Client) ListAllRecords(ctx context.Context, collection string) ([]Record, error) {
136136+ var all []Record
137137+ cursor := ""
138138+139139+ for {
140140+ select {
141141+ case <-ctx.Done():
142142+ return nil, ctx.Err()
143143+ default:
144144+ }
145145+146146+ page, err := c.ListRecords(ctx, collection, 100, cursor)
147147+ if err != nil {
148148+ return nil, err
149149+ }
150150+ all = append(all, page.Records...)
151151+152152+ if page.Cursor == "" {
153153+ break
154154+ }
155155+ cursor = page.Cursor
156156+ }
157157+ return all, nil
158158+}
159159+160160+// PutRecord creates or updates a record at a specific record key.
161161+func (c *Client) PutRecord(ctx context.Context, collection, rkey string, record any) (uri, cid string, err error) {
162162+ body := map[string]any{
163163+ "repo": c.did.String(),
164164+ "collection": collection,
165165+ "rkey": rkey,
166166+ "record": record,
167167+ }
168168+169169+ var result struct {
170170+ URI string `json:"uri"`
171171+ CID string `json:"cid"`
172172+ }
173173+ if err := c.api.Post(ctx, "com.atproto.repo.putRecord", body, &result); err != nil {
174174+ return "", "", WrapPDSError(fmt.Errorf("put record %s/%s: %w", collection, rkey, err))
175175+ }
176176+ return result.URI, result.CID, nil
177177+}
178178+179179+// DeleteRecord removes a record from the user's repository.
180180+func (c *Client) DeleteRecord(ctx context.Context, collection, rkey string) error {
181181+ body := map[string]any{
182182+ "repo": c.did.String(),
183183+ "collection": collection,
184184+ "rkey": rkey,
185185+ }
186186+187187+ var result struct{}
188188+ if err := c.api.Post(ctx, "com.atproto.repo.deleteRecord", body, &result); err != nil {
189189+ return WrapPDSError(fmt.Errorf("delete record %s/%s: %w", collection, rkey, err))
190190+ }
191191+ return nil
192192+}
193193+194194+// UploadBlob uploads a blob to the user's PDS.
195195+// Data must be at most 1 MB (MaxBlobSize). The mimeType should match the blob content.
196196+func (c *Client) UploadBlob(ctx context.Context, data []byte, mimeType string) (*BlobRef, error) {
197197+ if len(data) > MaxBlobSize {
198198+ return nil, fmt.Errorf("blob size %d exceeds maximum %d bytes", len(data), MaxBlobSize)
199199+ }
200200+201201+ endpoint, err := syntax.ParseNSID("com.atproto.repo.uploadBlob")
202202+ if err != nil {
203203+ return nil, err
204204+ }
205205+206206+ reader := bytes.NewReader(data)
207207+ req := atclient.NewAPIRequest(http.MethodPost, endpoint, reader)
208208+ req.Headers = http.Header{}
209209+ req.Headers.Set("Content-Type", mimeType)
210210+ req.GetBody = func() (io.ReadCloser, error) {
211211+ return io.NopCloser(bytes.NewReader(data)), nil
212212+ }
213213+214214+ resp, err := c.api.Do(ctx, req)
215215+ if err != nil {
216216+ return nil, WrapPDSError(fmt.Errorf("upload blob: %w", err))
217217+ }
218218+ defer resp.Body.Close()
219219+220220+ respBody, err := io.ReadAll(resp.Body)
221221+ if err != nil {
222222+ return nil, fmt.Errorf("read upload response: %w", err)
223223+ }
224224+225225+ var output atproto.RepoUploadBlob_Output
226226+ if err := json.Unmarshal(respBody, &output); err != nil {
227227+ return nil, fmt.Errorf("unmarshal upload response: %w", err)
228228+ }
229229+230230+ return &BlobRef{
231231+ Type: "blob",
232232+ MimeType: output.Blob.MimeType,
233233+ Size: int(output.Blob.Size),
234234+ Ref: CIDLink{Link: output.Blob.Ref.String()},
235235+ }, nil
236236+}
237237+238238+// GetBlob downloads a blob from the user's PDS by its CID.
239239+func (c *Client) GetBlob(ctx context.Context, cid string) ([]byte, error) {
240240+ data, err := atproto.SyncGetBlob(ctx, c.api, cid, c.did.String())
241241+ if err != nil {
242242+ return nil, WrapPDSError(fmt.Errorf("get blob %s: %w", cid, err))
243243+ }
244244+ return data, nil
245245+}
+27
client_test.go
···11+package atp
22+33+import (
44+ "testing"
55+66+ "github.com/bluesky-social/indigo/atproto/syntax"
77+)
88+99+func TestNewClient(t *testing.T) {
1010+ did, err := syntax.ParseDID("did:plc:testuser123")
1111+ if err != nil {
1212+ t.Fatal(err)
1313+ }
1414+1515+ c := NewClient(nil, did)
1616+ if c.DID() != did {
1717+ t.Fatalf("got DID %v, want %v", c.DID(), did)
1818+ }
1919+}
2020+2121+func TestClient_APIClient(t *testing.T) {
2222+ did, _ := syntax.ParseDID("did:plc:testuser123")
2323+ c := NewClient(nil, did)
2424+ if c.APIClient() != nil {
2525+ t.Fatal("expected nil APIClient")
2626+ }
2727+}
+26
errors.go
···11+package atp
22+33+import (
44+ "errors"
55+ "fmt"
66+ "strings"
77+)
88+99+// ErrSessionExpired indicates the OAuth session can no longer be resumed.
1010+// Callers should prompt the user to re-authenticate.
1111+var ErrSessionExpired = errors.New("oauth session expired")
1212+1313+// WrapPDSError inspects an XRPC error for signals that the OAuth grant is no
1414+// longer valid and, if so, wraps it with ErrSessionExpired.
1515+func WrapPDSError(err error) error {
1616+ if err == nil {
1717+ return nil
1818+ }
1919+ msg := err.Error()
2020+ if strings.Contains(msg, "invalid_grant") ||
2121+ strings.Contains(msg, "failed to refresh OAuth tokens") ||
2222+ strings.Contains(msg, "token is expired") {
2323+ return fmt.Errorf("%w: %w", ErrSessionExpired, err)
2424+ }
2525+ return err
2626+}
···11+package atp
22+33+// Record represents a single record returned from a PDS.
44+type Record struct {
55+ URI string
66+ CID string
77+ Value map[string]any
88+}
99+1010+// ListResult holds a page of records and an optional cursor for the next page.
1111+type ListResult struct {
1212+ Records []Record
1313+ Cursor string // empty when there are no more pages
1414+}
1515+1616+// BlobRef represents a reference to an uploaded blob.
1717+type BlobRef struct {
1818+ Type string `json:"$type"`
1919+ Ref CIDLink `json:"ref"`
2020+ MimeType string `json:"mimeType"`
2121+ Size int `json:"size"`
2222+}
2323+2424+// CIDLink is the ref field inside a BlobRef.
2525+type CIDLink struct {
2626+ Link string `json:"$link"`
2727+}
+16
scopes.go
···11+package atp
22+33+// ScopesForCollections builds an OAuth scope list from collection NSIDs.
44+// It always includes "atproto" as the first scope, then adds "repo:<collection>"
55+// for each collection provided.
66+//
77+// ScopesForCollections("x.y.bean", "x.y.brew")
88+// // => ["atproto", "repo:x.y.bean", "repo:x.y.brew"]
99+func ScopesForCollections(collections ...string) []string {
1010+ scopes := make([]string, 0, 1+len(collections))
1111+ scopes = append(scopes, "atproto")
1212+ for _, c := range collections {
1313+ scopes = append(scopes, "repo:"+c)
1414+ }
1515+ return scopes
1616+}