(READ ONLY) Margin is an open annotation layer for the internet. Powered by the AT Protocol. margin.at
extension web atproto comments
98
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement user data synchronization from PDS and add DID resolution utility.

scanash00 ed69b9e6 44a2bde7

+636 -606
+7 -4
backend/cmd/server/main.go
··· 20 20 "margin.at/internal/db" 21 21 "margin.at/internal/firehose" 22 22 "margin.at/internal/oauth" 23 + "margin.at/internal/sync" 23 24 ) 24 25 25 26 func main() { ··· 35 36 log.Fatalf("Failed to run migrations: %v", err) 36 37 } 37 38 38 - oauthHandler, err := oauth.NewHandler(database) 39 + syncSvc := sync.NewService(database) 40 + 41 + oauthHandler, err := oauth.NewHandler(database, syncSvc) 39 42 if err != nil { 40 43 log.Fatalf("Failed to initialize OAuth: %v", err) 41 44 } 42 45 43 - ingester := firehose.NewIngester(database) 44 - firehose.RelayURL = getEnv("BLOCK_RELAY_URL", "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos") 46 + ingester := firehose.NewIngester(database, syncSvc) 47 + firehose.RelayURL = getEnv("BLOCK_RELAY_URL", "wss://jetstream2.us-east.bsky.network/subscribe") 45 48 log.Printf("Firehose URL: %s", firehose.RelayURL) 46 49 47 50 go func() { ··· 71 74 tokenRefresher := api.NewTokenRefresher(database, oauthHandler.GetPrivateKey()) 72 75 annotationSvc := api.NewAnnotationService(database, tokenRefresher) 73 76 74 - handler := api.NewHandler(database, annotationSvc, tokenRefresher) 77 + handler := api.NewHandler(database, annotationSvc, tokenRefresher, syncSvc) 75 78 handler.RegisterRoutes(r) 76 79 77 80 r.Post("/api/annotations", annotationSvc.CreateAnnotation)
-14
backend/go.mod
··· 3 3 go 1.24.0 4 4 5 5 require ( 6 - github.com/fxamacker/cbor/v2 v2.9.0 7 6 github.com/go-chi/chi/v5 v5.1.0 8 7 github.com/go-chi/cors v1.2.1 9 8 github.com/go-jose/go-jose/v4 v4.0.4 10 9 github.com/gorilla/websocket v1.5.3 11 - github.com/ipfs/go-cid v0.6.0 12 10 github.com/joho/godotenv v1.5.1 13 11 github.com/lib/pq v1.10.9 14 12 github.com/mattn/go-sqlite3 v1.14.22 ··· 17 15 18 16 require ( 19 17 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect 20 - github.com/klauspost/cpuid/v2 v2.0.9 // indirect 21 - github.com/minio/sha256-simd v1.0.0 // indirect 22 - github.com/mr-tron/base58 v1.2.0 // indirect 23 - github.com/multiformats/go-base32 v0.0.3 // indirect 24 - github.com/multiformats/go-base36 v0.1.0 // indirect 25 - github.com/multiformats/go-multibase v0.2.0 // indirect 26 - github.com/multiformats/go-multihash v0.2.3 // indirect 27 - github.com/multiformats/go-varint v0.1.0 // indirect 28 18 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect 29 - github.com/spaolacci/murmur3 v1.1.0 // indirect 30 19 github.com/stretchr/testify v1.10.0 // indirect 31 - github.com/x448/float16 v0.8.4 // indirect 32 20 golang.org/x/crypto v0.35.0 // indirect 33 - golang.org/x/sys v0.30.0 // indirect 34 21 golang.org/x/text v0.32.0 // indirect 35 - lukechampine.com/blake3 v1.1.6 // indirect 36 22 )
-29
backend/go.sum
··· 1 1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= 2 2 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 3 - github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM= 4 - github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= 5 3 github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= 6 4 github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= 7 5 github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4= ··· 12 10 github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= 13 11 github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= 14 12 github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= 15 - github.com/ipfs/go-cid v0.6.0 h1:DlOReBV1xhHBhhfy/gBNNTSyfOM6rLiIx9J7A4DGf30= 16 - github.com/ipfs/go-cid v0.6.0/go.mod h1:NC4kS1LZjzfhK40UGmpXv5/qD2kcMzACYJNntCUiDhQ= 17 13 github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= 18 14 github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= 19 - github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= 20 - github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= 21 - github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= 22 15 github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= 23 16 github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= 24 17 github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= 25 18 github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= 26 - github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= 27 - github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= 28 - github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= 29 - github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= 30 - github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI= 31 - github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= 32 - github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ89tUg4F4= 33 - github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM= 34 - github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= 35 - github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk= 36 - github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= 37 - github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= 38 - github.com/multiformats/go-varint v0.1.0 h1:i2wqFp4sdl3IcIxfAonHQV9qU5OsZ4Ts9IOoETFs5dI= 39 - github.com/multiformats/go-varint v0.1.0/go.mod h1:5KVAVXegtfmNQQm/lCY+ATvDzvJJhSkUlGQV9wgObdI= 40 19 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= 41 20 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 42 - github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= 43 - github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= 44 21 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= 45 22 github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= 46 - github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= 47 - github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= 48 23 golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= 49 24 golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= 50 25 golang.org/x/image v0.34.0 h1:33gCkyw9hmwbZJeZkct8XyR11yH889EQt/QH4VmXMn8= 51 26 golang.org/x/image v0.34.0/go.mod h1:2RNFBZRB+vnwwFil8GkMdRvrJOFd1AzdZI6vOY+eJVU= 52 - golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= 53 - golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= 54 27 golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= 55 28 golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= 56 29 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= 57 30 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= 58 - lukechampine.com/blake3 v1.1.6 h1:H3cROdztr7RCfoaTpGZFQsrqvweFLrqS73j7L7cmR5c= 59 - lukechampine.com/blake3 v1.1.6/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=
-30
backend/internal/api/annotations.go
··· 505 505 json.NewEncoder(w).Encode(map[string]bool{"success": true}) 506 506 } 507 507 508 - func resolveDIDToPDS(did string) (string, error) { 509 - if strings.HasPrefix(did, "did:plc:") { 510 - client := &http.Client{ 511 - Timeout: 10 * time.Second, 512 - } 513 - resp, err := client.Get("https://plc.directory/" + did) 514 - if err != nil { 515 - return "", err 516 - } 517 - defer resp.Body.Close() 518 - 519 - var doc struct { 520 - Service []struct { 521 - Type string `json:"type"` 522 - ServiceEndpoint string `json:"serviceEndpoint"` 523 - } `json:"service"` 524 - } 525 - if err := json.NewDecoder(resp.Body).Decode(&doc); err != nil { 526 - return "", err 527 - } 528 - 529 - for _, svc := range doc.Service { 530 - if svc.Type == "AtprotoPersonalDataServer" { 531 - return svc.ServiceEndpoint, nil 532 - } 533 - } 534 - } 535 - return "", nil 536 - } 537 - 538 508 type CreateHighlightRequest struct { 539 509 URL string `json:"url"` 540 510 Title string `json:"title,omitempty"`
+1 -1
backend/internal/api/apikey.go
··· 480 480 return nil, fmt.Errorf("invalid session DPoP key: %w", err) 481 481 } 482 482 483 - pds, err := resolveDIDToPDS(sessDID) 483 + pds, err := xrpc.ResolveDIDToPDS(sessDID) 484 484 if err != nil { 485 485 return nil, fmt.Errorf("failed to resolve PDS: %w", err) 486 486 }
+4 -1
backend/internal/api/handler.go
··· 14 14 "github.com/go-chi/chi/v5" 15 15 16 16 "margin.at/internal/db" 17 + internal_sync "margin.at/internal/sync" 17 18 "margin.at/internal/xrpc" 18 19 ) 19 20 ··· 22 23 annotationService *AnnotationService 23 24 refresher *TokenRefresher 24 25 apiKeys *APIKeyHandler 26 + syncService *internal_sync.Service 25 27 } 26 28 27 - func NewHandler(database *db.DB, annotationService *AnnotationService, refresher *TokenRefresher) *Handler { 29 + func NewHandler(database *db.DB, annotationService *AnnotationService, refresher *TokenRefresher, syncService *internal_sync.Service) *Handler { 28 30 return &Handler{ 29 31 db: database, 30 32 annotationService: annotationService, 31 33 refresher: refresher, 32 34 apiKeys: NewAPIKeyHandler(database, refresher), 35 + syncService: syncService, 33 36 } 34 37 } 35 38
+8 -325
backend/internal/api/sync.go
··· 1 1 package api 2 2 3 3 import ( 4 + "context" 4 5 "encoding/json" 5 - "fmt" 6 - "io" 7 6 "net/http" 8 - "time" 9 7 10 - "margin.at/internal/db" 11 8 "margin.at/internal/xrpc" 12 9 ) 13 10 ··· 18 15 return 19 16 } 20 17 21 - collections := []string{ 22 - xrpc.CollectionAnnotation, 23 - xrpc.CollectionHighlight, 24 - xrpc.CollectionBookmark, 25 - xrpc.CollectionReply, 26 - xrpc.CollectionLike, 27 - xrpc.CollectionCollection, 28 - xrpc.CollectionCollectionItem, 29 - } 30 - 31 - results := make(map[string]string) 32 - 33 - err = h.refresher.ExecuteWithAutoRefresh(r, session, func(client *xrpc.Client, did string) error { 34 - for _, collectionNSID := range collections { 35 - count := 0 36 - cursor := "" 37 - fetchedURIs := make(map[string]bool) 38 - 39 - for { 40 - url := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s&limit=100", client.PDS, did, collectionNSID) 41 - if cursor != "" { 42 - url += "&cursor=" + cursor 43 - } 44 - 45 - req, _ := http.NewRequestWithContext(r.Context(), "GET", url, nil) 46 - req.Header.Set("Authorization", "Bearer "+client.AccessToken) 47 - 48 - resp, err := http.DefaultClient.Do(req) 49 - if err != nil { 50 - return fmt.Errorf("failed to fetch %s: %w", collectionNSID, err) 51 - } 52 - defer resp.Body.Close() 53 - 54 - if resp.StatusCode != 200 { 55 - body, _ := io.ReadAll(resp.Body) 56 - results[collectionNSID] = fmt.Sprintf("error: %s", string(body)) 57 - break 58 - } 59 - 60 - var output struct { 61 - Records []struct { 62 - URI string `json:"uri"` 63 - CID string `json:"cid"` 64 - Value json.RawMessage `json:"value"` 65 - } `json:"records"` 66 - Cursor string `json:"cursor"` 67 - } 68 - 69 - if err := json.NewDecoder(resp.Body).Decode(&output); err != nil { 70 - return err 71 - } 72 - 73 - for _, rec := range output.Records { 74 - err := h.upsertRecord(did, collectionNSID, rec.URI, rec.CID, rec.Value) 75 - if err != nil { 76 - fmt.Printf("Error upserting %s: %v\n", rec.URI, err) 77 - } else { 78 - count++ 79 - fetchedURIs[rec.URI] = true 80 - } 81 - } 82 - 83 - if output.Cursor == "" { 84 - break 85 - } 86 - cursor = output.Cursor 87 - } 88 - 89 - deletedCount := 0 90 - if results[collectionNSID] == "" { 91 - var localURIs []string 92 - var err error 93 - 94 - switch collectionNSID { 95 - case xrpc.CollectionAnnotation: 96 - localURIs, err = h.db.GetAnnotationURIs(did) 97 - case xrpc.CollectionHighlight: 98 - localURIs, err = h.db.GetHighlightURIs(did) 99 - case xrpc.CollectionBookmark: 100 - localURIs, err = h.db.GetBookmarkURIs(did) 101 - } 102 - 103 - if err == nil { 104 - for _, uri := range localURIs { 105 - if !fetchedURIs[uri] { 106 - switch collectionNSID { 107 - case xrpc.CollectionAnnotation: 108 - _ = h.db.DeleteAnnotation(uri) 109 - case xrpc.CollectionHighlight: 110 - _ = h.db.DeleteHighlight(uri) 111 - case xrpc.CollectionBookmark: 112 - _ = h.db.DeleteBookmark(uri) 113 - } 114 - deletedCount++ 115 - } 116 - } 117 - } 118 - } 119 - 120 - if results[collectionNSID] == "" { 121 - results[collectionNSID] = fmt.Sprintf("synced %d records, deleted %d stale", count, deletedCount) 122 - } 123 - } 124 - return nil 18 + results, err := h.syncService.PerformSync(r.Context(), session.DID, func(ctx context.Context, did string) (*xrpc.Client, error) { 19 + var client *xrpc.Client 20 + err := h.refresher.ExecuteWithAutoRefresh(r, session, func(c *xrpc.Client, d string) error { 21 + client = c 22 + return nil 23 + }) 24 + return client, err 125 25 }) 126 26 127 27 if err != nil { ··· 132 32 w.WriteHeader(http.StatusOK) 133 33 json.NewEncoder(w).Encode(results) 134 34 } 135 - 136 - func strPtr(s string) *string { 137 - if s == "" { 138 - return nil 139 - } 140 - return &s 141 - } 142 - 143 - func (h *Handler) upsertRecord(did, collection, uri, cid string, value json.RawMessage) error { 144 - cidPtr := strPtr(cid) 145 - switch collection { 146 - case xrpc.CollectionAnnotation: 147 - var record xrpc.AnnotationRecord 148 - if err := json.Unmarshal(value, &record); err != nil { 149 - return err 150 - } 151 - 152 - createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 153 - 154 - targetSource := record.Target.Source 155 - if targetSource == "" { 156 - 157 - } 158 - 159 - targetHash := record.Target.SourceHash 160 - if targetHash == "" && targetSource != "" { 161 - targetHash = db.HashURL(targetSource) 162 - } 163 - 164 - motivation := record.Motivation 165 - if motivation == "" { 166 - motivation = "commenting" 167 - } 168 - 169 - var bodyValuePtr, bodyFormatPtr, bodyURIPtr, targetTitlePtr, selectorJSONPtr, tagsJSONPtr *string 170 - if record.Body != nil { 171 - if record.Body.Value != "" { 172 - val := record.Body.Value 173 - bodyValuePtr = &val 174 - } 175 - if record.Body.Format != "" { 176 - fmt := record.Body.Format 177 - bodyFormatPtr = &fmt 178 - } 179 - } 180 - if record.Target.Title != "" { 181 - t := record.Target.Title 182 - targetTitlePtr = &t 183 - } 184 - if len(record.Target.Selector) > 0 { 185 - selectorStr := string(record.Target.Selector) 186 - selectorJSONPtr = &selectorStr 187 - } 188 - if len(record.Tags) > 0 { 189 - tagsBytes, _ := json.Marshal(record.Tags) 190 - tagsStr := string(tagsBytes) 191 - tagsJSONPtr = &tagsStr 192 - } 193 - 194 - return h.db.CreateAnnotation(&db.Annotation{ 195 - URI: uri, 196 - AuthorDID: did, 197 - Motivation: motivation, 198 - BodyValue: bodyValuePtr, 199 - BodyFormat: bodyFormatPtr, 200 - BodyURI: bodyURIPtr, 201 - TargetSource: targetSource, 202 - TargetHash: targetHash, 203 - TargetTitle: targetTitlePtr, 204 - SelectorJSON: selectorJSONPtr, 205 - TagsJSON: tagsJSONPtr, 206 - CreatedAt: createdAt, 207 - IndexedAt: time.Now(), 208 - CID: cidPtr, 209 - }) 210 - 211 - case xrpc.CollectionHighlight: 212 - var record xrpc.HighlightRecord 213 - if err := json.Unmarshal(value, &record); err != nil { 214 - return err 215 - } 216 - 217 - createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 218 - if createdAt.IsZero() { 219 - createdAt = time.Now() 220 - } 221 - 222 - targetHash := record.Target.SourceHash 223 - if targetHash == "" && record.Target.Source != "" { 224 - targetHash = db.HashURL(record.Target.Source) 225 - } 226 - 227 - var titlePtr, selectorJSONPtr, colorPtr, tagsJSONPtr *string 228 - if record.Target.Title != "" { 229 - t := record.Target.Title 230 - titlePtr = &t 231 - } 232 - if len(record.Target.Selector) > 0 { 233 - selectorStr := string(record.Target.Selector) 234 - selectorJSONPtr = &selectorStr 235 - } 236 - if record.Color != "" { 237 - c := record.Color 238 - colorPtr = &c 239 - } 240 - if len(record.Tags) > 0 { 241 - tagsBytes, _ := json.Marshal(record.Tags) 242 - tagsStr := string(tagsBytes) 243 - tagsJSONPtr = &tagsStr 244 - } 245 - 246 - return h.db.CreateHighlight(&db.Highlight{ 247 - URI: uri, 248 - AuthorDID: did, 249 - TargetSource: record.Target.Source, 250 - TargetHash: targetHash, 251 - TargetTitle: titlePtr, 252 - SelectorJSON: selectorJSONPtr, 253 - Color: colorPtr, 254 - TagsJSON: tagsJSONPtr, 255 - CreatedAt: createdAt, 256 - IndexedAt: time.Now(), 257 - CID: cidPtr, 258 - }) 259 - 260 - case xrpc.CollectionBookmark: 261 - var record xrpc.BookmarkRecord 262 - if err := json.Unmarshal(value, &record); err != nil { 263 - return err 264 - } 265 - 266 - createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 267 - 268 - sourceHash := record.SourceHash 269 - if sourceHash == "" && record.Source != "" { 270 - sourceHash = db.HashURL(record.Source) 271 - } 272 - 273 - var titlePtr, descPtr, tagsJSONPtr *string 274 - if record.Title != "" { 275 - t := record.Title 276 - titlePtr = &t 277 - } 278 - if record.Description != "" { 279 - d := record.Description 280 - descPtr = &d 281 - } 282 - if len(record.Tags) > 0 { 283 - tagsBytes, _ := json.Marshal(record.Tags) 284 - tagsStr := string(tagsBytes) 285 - tagsJSONPtr = &tagsStr 286 - } 287 - 288 - return h.db.CreateBookmark(&db.Bookmark{ 289 - URI: uri, 290 - AuthorDID: did, 291 - Source: record.Source, 292 - SourceHash: sourceHash, 293 - Title: titlePtr, 294 - Description: descPtr, 295 - TagsJSON: tagsJSONPtr, 296 - CreatedAt: createdAt, 297 - IndexedAt: time.Now(), 298 - CID: cidPtr, 299 - }) 300 - 301 - case xrpc.CollectionCollection: 302 - var record xrpc.CollectionRecord 303 - if err := json.Unmarshal(value, &record); err != nil { 304 - return err 305 - } 306 - createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 307 - 308 - var descPtr, iconPtr *string 309 - if record.Description != "" { 310 - d := record.Description 311 - descPtr = &d 312 - } 313 - if record.Icon != "" { 314 - i := record.Icon 315 - iconPtr = &i 316 - } 317 - 318 - return h.db.CreateCollection(&db.Collection{ 319 - URI: uri, 320 - AuthorDID: did, 321 - Name: record.Name, 322 - Description: descPtr, 323 - Icon: iconPtr, 324 - CreatedAt: createdAt, 325 - IndexedAt: time.Now(), 326 - }) 327 - 328 - case xrpc.CollectionCollectionItem: 329 - var record xrpc.CollectionItemRecord 330 - if err := json.Unmarshal(value, &record); err != nil { 331 - return err 332 - } 333 - createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 334 - 335 - return h.db.AddToCollection(&db.CollectionItem{ 336 - URI: uri, 337 - AuthorDID: did, 338 - CollectionURI: record.Collection, 339 - AnnotationURI: record.Annotation, 340 - Position: record.Position, 341 - CreatedAt: createdAt, 342 - IndexedAt: time.Now(), 343 - }) 344 - 345 - case xrpc.CollectionReply: 346 - return nil 347 - case xrpc.CollectionLike: 348 - return nil 349 - } 350 - return nil 351 - }
+1 -1
backend/internal/api/token_refresh.go
··· 89 89 return nil, fmt.Errorf("invalid session DPoP key") 90 90 } 91 91 92 - pds, err := resolveDIDToPDS(did) 92 + pds, err := xrpc.ResolveDIDToPDS(did) 93 93 if err != nil { 94 94 return nil, fmt.Errorf("failed to resolve PDS") 95 95 }
+10 -1
backend/internal/db/tags.go
··· 1 1 package db 2 2 3 + import "fmt" 4 + 3 5 type TrendingTag struct { 4 6 Tag string `json:"tag"` 5 7 Count int `json:"count"` ··· 14 16 WHERE tags_json IS NOT NULL 15 17 AND tags_json != '' 16 18 AND tags_json != '[]' 19 + AND created_at > %s 17 20 GROUP BY tag 21 + HAVING count > 2 18 22 ORDER BY count DESC 19 23 LIMIT ? 20 24 ` 21 25 22 - rows, err := db.Query(db.Rebind(query), limit) 26 + dateFilter := "datetime('now', '-7 days')" 27 + if db.driver == "postgres" { 28 + dateFilter = "NOW() - INTERVAL '7 days'" 29 + } 30 + 31 + rows, err := db.Query(db.Rebind(fmt.Sprintf(query, dateFilter)), limit) 23 32 if err != nil { 24 33 return nil, err 25 34 }
+106 -196
backend/internal/firehose/ingester.go
··· 1 1 package firehose 2 2 3 3 import ( 4 - "bytes" 5 4 "context" 6 - "encoding/binary" 7 5 "encoding/json" 8 6 "fmt" 9 - "io" 10 7 "log" 8 + "strings" 9 + "sync" 11 10 "time" 12 11 13 - "github.com/fxamacker/cbor/v2" 14 12 "github.com/gorilla/websocket" 15 - "github.com/ipfs/go-cid" 16 - 17 13 "margin.at/internal/db" 14 + internal_sync "margin.at/internal/sync" 15 + "margin.at/internal/xrpc" 18 16 ) 19 17 20 18 const ( ··· 27 25 CollectionCollectionItem = "at.margin.collectionItem" 28 26 ) 29 27 30 - var RelayURL = "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" 28 + var RelayURL = "wss://jetstream2.us-east.bsky.network/subscribe" 31 29 32 30 type Ingester struct { 33 - db *db.DB 34 - cancel context.CancelFunc 31 + db *db.DB 32 + sync *internal_sync.Service 33 + cancel context.CancelFunc 34 + handlers map[string]RecordHandler 35 35 } 36 36 37 - func NewIngester(database *db.DB) *Ingester { 38 - return &Ingester{db: database} 37 + type RecordHandler func(event *FirehoseEvent) 38 + 39 + func NewIngester(database *db.DB, syncService *internal_sync.Service) *Ingester { 40 + i := &Ingester{ 41 + db: database, 42 + sync: syncService, 43 + handlers: make(map[string]RecordHandler), 44 + } 45 + 46 + i.RegisterHandler(CollectionAnnotation, i.handleAnnotation) 47 + i.RegisterHandler(CollectionHighlight, i.handleHighlight) 48 + i.RegisterHandler(CollectionBookmark, i.handleBookmark) 49 + i.RegisterHandler(CollectionReply, i.handleReply) 50 + i.RegisterHandler(CollectionLike, i.handleLike) 51 + i.RegisterHandler(CollectionCollection, i.handleCollection) 52 + i.RegisterHandler(CollectionCollectionItem, i.handleCollectionItem) 53 + 54 + return i 55 + } 56 + 57 + func (i *Ingester) RegisterHandler(collection string, handler RecordHandler) { 58 + i.handlers[collection] = handler 39 59 } 40 60 41 61 func (i *Ingester) Start(ctx context.Context) error { ··· 59 79 return 60 80 default: 61 81 if err := i.subscribe(ctx); err != nil { 62 - log.Printf("Firehose error: %v, reconnecting in 5s...", err) 82 + log.Printf("Jetstream error: %v, reconnecting in 5s...", err) 63 83 if ctx.Err() != nil { 64 84 return 65 85 } ··· 69 89 } 70 90 } 71 91 72 - type FrameHeader struct { 73 - Op int `cbor:"op"` 74 - T string `cbor:"t"` 75 - } 76 - type Commit struct { 77 - Repo string `cbor:"repo"` 78 - Rev string `cbor:"rev"` 79 - Seq int64 `cbor:"seq"` 80 - Prev *cid.Cid `cbor:"prev"` 81 - Time string `cbor:"time"` 82 - Blocks []byte `cbor:"blocks"` 83 - Ops []RepoOp `cbor:"ops"` 92 + type JetstreamEvent struct { 93 + Did string `json:"did"` 94 + Time int64 `json:"time_us"` 95 + Kind string `json:"kind"` 96 + Commit *JetstreamCommit `json:"commit,omitempty"` 84 97 } 85 98 86 - type RepoOp struct { 87 - Action string `cbor:"action"` 88 - Path string `cbor:"path"` 89 - Cid *cid.Cid `cbor:"cid"` 99 + type JetstreamCommit struct { 100 + Rev string `json:"rev"` 101 + Operation string `json:"operation"` 102 + Collection string `json:"collection"` 103 + Rkey string `json:"rkey"` 104 + Record json.RawMessage `json:"record,omitempty"` 105 + Cid string `json:"cid,omitempty"` 90 106 } 91 107 92 108 func (i *Ingester) subscribe(ctx context.Context) error { 93 109 cursor := i.getLastCursor() 94 110 95 - url := RelayURL 111 + var collections []string 112 + for collection := range i.handlers { 113 + collections = append(collections, collection) 114 + } 115 + 116 + url := fmt.Sprintf("%s?wantedCollections=%s", RelayURL, strings.Join(collections, "&wantedCollections=")) 96 117 if cursor > 0 { 97 - url = fmt.Sprintf("%s?cursor=%d", RelayURL, cursor) 118 + url = fmt.Sprintf("%s&cursor=%d", url, cursor) 98 119 } 99 120 100 - log.Printf("Connecting to firehose: %s", url) 121 + log.Printf("Connecting to Jetstream: %s", url) 101 122 102 123 conn, _, err := websocket.DefaultDialer.DialContext(ctx, url, nil) 103 124 if err != nil { ··· 105 126 } 106 127 defer conn.Close() 107 128 108 - log.Printf("Connected to firehose") 129 + log.Printf("Connected to Jetstream") 109 130 110 131 for { 111 132 select { ··· 119 140 return fmt.Errorf("websocket read failed: %w", err) 120 141 } 121 142 122 - i.handleMessage(message) 123 - } 124 - } 125 - 126 - func (i *Ingester) handleMessage(data []byte) { 127 - reader := bytes.NewReader(data) 128 - 129 - var header FrameHeader 130 - decoder := cbor.NewDecoder(reader) 131 - if err := decoder.Decode(&header); err != nil { 132 - return 133 - } 134 - 135 - if header.Op != 1 { 136 - return 137 - } 138 - 139 - if header.T != "#commit" { 140 - return 141 - } 142 - 143 - var commit Commit 144 - if err := decoder.Decode(&commit); err != nil { 145 - return 146 - } 147 - 148 - for _, op := range commit.Ops { 149 - collection, rkey := parseOpPath(op.Path) 150 - if !isMarginCollection(collection) { 143 + var event JetstreamEvent 144 + if err := json.Unmarshal(message, &event); err != nil { 151 145 continue 152 146 } 153 147 154 - uri := fmt.Sprintf("at://%s/%s/%s", commit.Repo, collection, rkey) 148 + if event.Kind == "commit" && event.Commit != nil { 149 + i.handleCommit(event) 155 150 156 - switch op.Action { 157 - case "create", "update": 158 - if op.Cid != nil && len(commit.Blocks) > 0 { 159 - record := extractRecord(commit.Blocks, *op.Cid) 160 - if record != nil { 161 - i.handleRecord(commit.Repo, collection, rkey, record, commit.Seq) 151 + if event.Time > 0 { 152 + if err := i.db.SetCursor("firehose_cursor", event.Time); err != nil { 153 + log.Printf("Failed to save cursor: %v", err) 162 154 } 163 155 } 164 - case "delete": 165 - i.handleDelete(collection, uri) 166 156 } 167 157 } 158 + } 159 + 160 + func (i *Ingester) handleCommit(event JetstreamEvent) { 161 + commit := event.Commit 162 + uri := fmt.Sprintf("at://%s/%s/%s", event.Did, commit.Collection, commit.Rkey) 163 + 164 + switch commit.Operation { 165 + case "create", "update": 166 + if len(commit.Record) > 0 { 167 + firehoseEvent := &FirehoseEvent{ 168 + Repo: event.Did, 169 + Collection: commit.Collection, 170 + Rkey: commit.Rkey, 171 + Record: commit.Record, 172 + Operation: commit.Operation, 173 + Cursor: event.Time, 174 + } 168 175 169 - if commit.Seq > 0 { 170 - if err := i.db.SetCursor("firehose_cursor", commit.Seq); err != nil { 171 - log.Printf("Failed to save cursor: %v", err) 172 - } 173 - } 174 - } 176 + i.dispatchToHandler(firehoseEvent) 175 177 176 - func parseOpPath(path string) (collection, rkey string) { 177 - for i := len(path) - 1; i >= 0; i-- { 178 - if path[i] == '/' { 179 - return path[:i], path[i+1:] 178 + go i.triggerLazySync(event.Did) 180 179 } 180 + case "delete": 181 + i.handleDelete(commit.Collection, uri) 181 182 } 182 - return path, "" 183 183 } 184 184 185 - func isMarginCollection(collection string) bool { 186 - switch collection { 187 - case CollectionAnnotation, CollectionHighlight, CollectionBookmark, 188 - CollectionReply, CollectionLike, CollectionCollection, CollectionCollectionItem: 189 - return true 185 + func (i *Ingester) dispatchToHandler(event *FirehoseEvent) { 186 + if handler, ok := i.handlers[event.Collection]; ok { 187 + handler(event) 190 188 } 191 - return false 192 189 } 193 190 194 - func extractRecord(blocks []byte, targetCid cid.Cid) map[string]interface{} { 195 - reader := bytes.NewReader(blocks) 191 + var lastSyncAttempts sync.Map 196 192 197 - headerLen, err := binary.ReadUvarint(reader) 198 - if err != nil { 199 - return nil 200 - } 201 - reader.Seek(int64(headerLen), io.SeekCurrent) 202 - 203 - for reader.Len() > 0 { 204 - blockLen, err := binary.ReadUvarint(reader) 205 - if err != nil { 206 - break 207 - } 208 - 209 - blockData := make([]byte, blockLen) 210 - if _, err := io.ReadFull(reader, blockData); err != nil { 211 - break 212 - } 213 - 214 - blockCid, cidLen, err := parseCidFromBlock(blockData) 215 - if err != nil { 216 - continue 217 - } 218 - 219 - if blockCid.Equals(targetCid) { 220 - var record map[string]interface{} 221 - if err := cbor.Unmarshal(blockData[cidLen:], &record); err != nil { 222 - return nil 223 - } 224 - return record 193 + func (i *Ingester) triggerLazySync(did string) { 194 + lastSync, ok := lastSyncAttempts.Load(did) 195 + if ok { 196 + if time.Since(lastSync.(time.Time)) < 5*time.Minute { 197 + return 225 198 } 226 199 } 227 - 228 - return nil 229 - } 200 + lastSyncAttempts.Store(did, time.Now()) 230 201 231 - func parseCidFromBlock(data []byte) (cid.Cid, int, error) { 232 - if len(data) < 2 { 233 - return cid.Cid{}, 0, fmt.Errorf("data too short") 234 - } 235 - version, n1 := binary.Uvarint(data) 236 - if n1 <= 0 { 237 - return cid.Cid{}, 0, fmt.Errorf("invalid version varint") 202 + pds, err := xrpc.ResolveDIDToPDS(did) 203 + if err != nil || pds == "" { 204 + return 238 205 } 239 206 240 - if version == 1 { 241 - codec, n2 := binary.Uvarint(data[n1:]) 242 - if n2 <= 0 { 243 - return cid.Cid{}, 0, fmt.Errorf("invalid codec varint") 244 - } 207 + _, err = i.sync.PerformSync(context.Background(), did, func(ctx context.Context, _ string) (*xrpc.Client, error) { 208 + return &xrpc.Client{ 209 + PDS: pds, 210 + }, nil 211 + }) 245 212 246 - mhStart := n1 + n2 247 - hashType, n3 := binary.Uvarint(data[mhStart:]) 248 - if n3 <= 0 { 249 - return cid.Cid{}, 0, fmt.Errorf("invalid hash type varint") 250 - } 251 - 252 - hashLen, n4 := binary.Uvarint(data[mhStart+n3:]) 253 - if n4 <= 0 { 254 - return cid.Cid{}, 0, fmt.Errorf("invalid hash length varint") 255 - } 256 - 257 - totalCidLen := mhStart + n3 + n4 + int(hashLen) 258 - 259 - c, err := cid.Cast(data[:totalCidLen]) 260 - if err != nil { 261 - return cid.Cid{}, 0, err 262 - } 263 - 264 - _ = codec 265 - _ = hashType 266 - 267 - return c, totalCidLen, nil 213 + if err == nil { 214 + log.Printf("Auto-synced repo for active user: %s", did) 268 215 } 269 - 270 - return cid.Cid{}, 0, fmt.Errorf("unsupported CID version") 271 216 } 272 217 273 218 func (i *Ingester) handleDelete(collection, uri string) { ··· 289 234 } 290 235 } 291 236 292 - func (i *Ingester) handleRecord(repo, collection, rkey string, record map[string]interface{}, seq int64) { 293 - _ = fmt.Sprintf("at://%s/%s/%s", repo, collection, rkey) 294 - 295 - recordJSON, err := json.Marshal(record) 237 + func (i *Ingester) getLastCursor() int64 { 238 + cursor, err := i.db.GetCursor("firehose_cursor") 296 239 if err != nil { 297 - return 298 - } 299 - 300 - event := &FirehoseEvent{ 301 - Repo: repo, 302 - Collection: collection, 303 - Rkey: rkey, 304 - Record: recordJSON, 305 - Operation: "create", 306 - Cursor: seq, 307 - } 308 - 309 - switch collection { 310 - case CollectionAnnotation: 311 - i.handleAnnotation(event) 312 - case CollectionHighlight: 313 - i.handleHighlight(event) 314 - case CollectionBookmark: 315 - i.handleBookmark(event) 316 - case CollectionReply: 317 - i.handleReply(event) 318 - case CollectionLike: 319 - i.handleLike(event) 320 - case CollectionCollection: 321 - i.handleCollection(event) 322 - case CollectionCollectionItem: 323 - i.handleCollectionItem(event) 240 + log.Printf("Failed to get last cursor from DB: %v", err) 241 + return 0 324 242 } 243 + return cursor 325 244 } 326 245 327 246 type FirehoseEvent struct { ··· 711 630 log.Printf("Indexed collection item from %s", event.Repo) 712 631 } 713 632 } 714 - 715 - func (i *Ingester) getLastCursor() int64 { 716 - cursor, err := i.db.GetCursor("firehose_cursor") 717 - if err != nil { 718 - log.Printf("Failed to get last cursor from DB: %v", err) 719 - return 0 720 - } 721 - return cursor 722 - }
+16 -1
backend/internal/oauth/handler.go
··· 17 17 "time" 18 18 19 19 "margin.at/internal/db" 20 + internal_sync "margin.at/internal/sync" 20 21 "margin.at/internal/xrpc" 21 22 ) 22 23 ··· 26 27 privateKey *ecdsa.PrivateKey 27 28 pending map[string]*PendingAuth 28 29 pendingMu sync.RWMutex 30 + syncService *internal_sync.Service 29 31 } 30 32 31 - func NewHandler(database *db.DB) (*Handler, error) { 33 + func NewHandler(database *db.DB, syncService *internal_sync.Service) (*Handler, error) { 32 34 33 35 configuredBaseURL := os.Getenv("BASE_URL") 34 36 ··· 42 44 configuredBaseURL: configuredBaseURL, 43 45 privateKey: privateKey, 44 46 pending: make(map[string]*PendingAuth), 47 + syncService: syncService, 45 48 }, nil 46 49 } 47 50 ··· 364 367 }) 365 368 366 369 go h.cleanupOrphanedReplies(tokenResp.Sub, tokenResp.AccessToken, string(dpopKeyPEM), pending.PDS) 370 + go func() { 371 + log.Printf("Starting background sync for %s...", tokenResp.Sub) 372 + _, err := h.syncService.PerformSync(context.Background(), tokenResp.Sub, func(ctx context.Context, did string) (*xrpc.Client, error) { 373 + return xrpc.NewClient(pending.PDS, tokenResp.AccessToken, pending.DPoPKey), nil 374 + }) 375 + 376 + if err != nil { 377 + log.Printf("Background sync failed for %s: %v", tokenResp.Sub, err) 378 + } else { 379 + log.Printf("Background sync completed for %s", tokenResp.Sub) 380 + } 381 + }() 367 382 368 383 http.Redirect(w, r, "/?logged_in=true", http.StatusFound) 369 384 }
+428
backend/internal/sync/service.go
··· 1 + package sync 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "io" 8 + "net/http" 9 + "time" 10 + 11 + "margin.at/internal/db" 12 + "margin.at/internal/xrpc" 13 + ) 14 + 15 + type Service struct { 16 + db *db.DB 17 + } 18 + 19 + func NewService(database *db.DB) *Service { 20 + return &Service{db: database} 21 + } 22 + 23 + func (s *Service) PerformSync(ctx context.Context, did string, getClient func(context.Context, string) (*xrpc.Client, error)) (map[string]string, error) { 24 + collections := []string{ 25 + xrpc.CollectionAnnotation, 26 + xrpc.CollectionHighlight, 27 + xrpc.CollectionBookmark, 28 + xrpc.CollectionReply, 29 + xrpc.CollectionLike, 30 + xrpc.CollectionCollection, 31 + xrpc.CollectionCollectionItem, 32 + } 33 + 34 + results := make(map[string]string) 35 + 36 + client, err := getClient(ctx, did) 37 + if err != nil { 38 + return nil, err 39 + } 40 + 41 + for _, collectionNSID := range collections { 42 + count := 0 43 + cursor := "" 44 + fetchedURIs := make(map[string]bool) 45 + 46 + for { 47 + url := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s&limit=100", client.PDS, did, collectionNSID) 48 + if cursor != "" { 49 + url += "&cursor=" + cursor 50 + } 51 + 52 + req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) 53 + req.Header.Set("Authorization", "Bearer "+client.AccessToken) 54 + 55 + resp, err := http.DefaultClient.Do(req) 56 + if err != nil { 57 + return nil, fmt.Errorf("failed to fetch %s: %w", collectionNSID, err) 58 + } 59 + defer resp.Body.Close() 60 + 61 + if resp.StatusCode != 200 { 62 + body, _ := io.ReadAll(resp.Body) 63 + results[collectionNSID] = fmt.Sprintf("error: %s", string(body)) 64 + break 65 + } 66 + 67 + var output struct { 68 + Records []struct { 69 + URI string `json:"uri"` 70 + CID string `json:"cid"` 71 + Value json.RawMessage `json:"value"` 72 + } `json:"records"` 73 + Cursor string `json:"cursor"` 74 + } 75 + 76 + if err := json.NewDecoder(resp.Body).Decode(&output); err != nil { 77 + return nil, err 78 + } 79 + 80 + for _, rec := range output.Records { 81 + err := s.upsertRecord(did, collectionNSID, rec.URI, rec.CID, rec.Value) 82 + if err != nil { 83 + fmt.Printf("Error upserting %s: %v\n", rec.URI, err) 84 + } else { 85 + count++ 86 + fetchedURIs[rec.URI] = true 87 + } 88 + } 89 + 90 + if output.Cursor == "" { 91 + break 92 + } 93 + cursor = output.Cursor 94 + } 95 + 96 + deletedCount := 0 97 + if results[collectionNSID] == "" { 98 + var localURIs []string 99 + var err error 100 + 101 + switch collectionNSID { 102 + case xrpc.CollectionAnnotation: 103 + localURIs, err = s.db.GetAnnotationURIs(did) 104 + case xrpc.CollectionHighlight: 105 + localURIs, err = s.db.GetHighlightURIs(did) 106 + case xrpc.CollectionBookmark: 107 + localURIs, err = s.db.GetBookmarkURIs(did) 108 + case xrpc.CollectionCollection: 109 + cols, e := s.db.GetCollectionsByAuthor(did) 110 + if e == nil { 111 + for _, c := range cols { 112 + localURIs = append(localURIs, c.URI) 113 + } 114 + } else { 115 + err = e 116 + } 117 + case xrpc.CollectionCollectionItem: 118 + items, e := s.db.GetCollectionItemsByAuthor(did) 119 + if e == nil { 120 + for _, item := range items { 121 + localURIs = append(localURIs, item.URI) 122 + } 123 + } else { 124 + err = e 125 + } 126 + case xrpc.CollectionReply: 127 + replies, e := s.db.GetRepliesByAuthor(did) 128 + if e == nil { 129 + for _, r := range replies { 130 + localURIs = append(localURIs, r.URI) 131 + } 132 + } else { 133 + err = e 134 + } 135 + case xrpc.CollectionLike: 136 + likes, e := s.db.GetLikesByAuthor(did) 137 + if e == nil { 138 + for _, l := range likes { 139 + localURIs = append(localURIs, l.URI) 140 + } 141 + } else { 142 + err = e 143 + } 144 + } 145 + 146 + if err == nil { 147 + for _, uri := range localURIs { 148 + if !fetchedURIs[uri] { 149 + switch collectionNSID { 150 + case xrpc.CollectionAnnotation: 151 + _ = s.db.DeleteAnnotation(uri) 152 + case xrpc.CollectionHighlight: 153 + _ = s.db.DeleteHighlight(uri) 154 + case xrpc.CollectionBookmark: 155 + _ = s.db.DeleteBookmark(uri) 156 + case xrpc.CollectionCollection: 157 + _ = s.db.DeleteCollection(uri) 158 + case xrpc.CollectionCollectionItem: 159 + _ = s.db.RemoveFromCollection(uri) 160 + case xrpc.CollectionReply: 161 + _ = s.db.DeleteReply(uri) 162 + case xrpc.CollectionLike: 163 + _ = s.db.DeleteLike(uri) 164 + } 165 + deletedCount++ 166 + } 167 + } 168 + } 169 + } 170 + 171 + if results[collectionNSID] == "" { 172 + results[collectionNSID] = fmt.Sprintf("synced %d records, deleted %d stale", count, deletedCount) 173 + } 174 + } 175 + return results, nil 176 + } 177 + 178 + func strPtr(s string) *string { 179 + if s == "" { 180 + return nil 181 + } 182 + return &s 183 + } 184 + 185 + func (s *Service) upsertRecord(did, collection, uri, cid string, value json.RawMessage) error { 186 + cidPtr := strPtr(cid) 187 + switch collection { 188 + case xrpc.CollectionAnnotation: 189 + var record xrpc.AnnotationRecord 190 + if err := json.Unmarshal(value, &record); err != nil { 191 + return err 192 + } 193 + 194 + createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 195 + 196 + targetSource := record.Target.Source 197 + if targetSource == "" { 198 + 199 + } 200 + 201 + targetHash := record.Target.SourceHash 202 + if targetHash == "" && targetSource != "" { 203 + targetHash = db.HashURL(targetSource) 204 + } 205 + 206 + motivation := record.Motivation 207 + if motivation == "" { 208 + motivation = "commenting" 209 + } 210 + 211 + var bodyValuePtr, bodyFormatPtr, bodyURIPtr, targetTitlePtr, selectorJSONPtr, tagsJSONPtr *string 212 + if record.Body != nil { 213 + if record.Body.Value != "" { 214 + val := record.Body.Value 215 + bodyValuePtr = &val 216 + } 217 + if record.Body.Format != "" { 218 + fmt := record.Body.Format 219 + bodyFormatPtr = &fmt 220 + } 221 + } 222 + if record.Target.Title != "" { 223 + t := record.Target.Title 224 + targetTitlePtr = &t 225 + } 226 + if len(record.Target.Selector) > 0 { 227 + selectorStr := string(record.Target.Selector) 228 + selectorJSONPtr = &selectorStr 229 + } 230 + if len(record.Tags) > 0 { 231 + tagsBytes, _ := json.Marshal(record.Tags) 232 + tagsStr := string(tagsBytes) 233 + tagsJSONPtr = &tagsStr 234 + } 235 + 236 + return s.db.CreateAnnotation(&db.Annotation{ 237 + URI: uri, 238 + AuthorDID: did, 239 + Motivation: motivation, 240 + BodyValue: bodyValuePtr, 241 + BodyFormat: bodyFormatPtr, 242 + BodyURI: bodyURIPtr, 243 + TargetSource: targetSource, 244 + TargetHash: targetHash, 245 + TargetTitle: targetTitlePtr, 246 + SelectorJSON: selectorJSONPtr, 247 + TagsJSON: tagsJSONPtr, 248 + CreatedAt: createdAt, 249 + IndexedAt: time.Now(), 250 + CID: cidPtr, 251 + }) 252 + 253 + case xrpc.CollectionHighlight: 254 + var record xrpc.HighlightRecord 255 + if err := json.Unmarshal(value, &record); err != nil { 256 + return err 257 + } 258 + 259 + createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 260 + if createdAt.IsZero() { 261 + createdAt = time.Now() 262 + } 263 + 264 + targetHash := record.Target.SourceHash 265 + if targetHash == "" && record.Target.Source != "" { 266 + targetHash = db.HashURL(record.Target.Source) 267 + } 268 + 269 + var titlePtr, selectorJSONPtr, colorPtr, tagsJSONPtr *string 270 + if record.Target.Title != "" { 271 + t := record.Target.Title 272 + titlePtr = &t 273 + } 274 + if len(record.Target.Selector) > 0 { 275 + selectorStr := string(record.Target.Selector) 276 + selectorJSONPtr = &selectorStr 277 + } 278 + if record.Color != "" { 279 + c := record.Color 280 + colorPtr = &c 281 + } 282 + if len(record.Tags) > 0 { 283 + tagsBytes, _ := json.Marshal(record.Tags) 284 + tagsStr := string(tagsBytes) 285 + tagsJSONPtr = &tagsStr 286 + } 287 + 288 + return s.db.CreateHighlight(&db.Highlight{ 289 + URI: uri, 290 + AuthorDID: did, 291 + TargetSource: record.Target.Source, 292 + TargetHash: targetHash, 293 + TargetTitle: titlePtr, 294 + SelectorJSON: selectorJSONPtr, 295 + Color: colorPtr, 296 + TagsJSON: tagsJSONPtr, 297 + CreatedAt: createdAt, 298 + IndexedAt: time.Now(), 299 + CID: cidPtr, 300 + }) 301 + 302 + case xrpc.CollectionBookmark: 303 + var record xrpc.BookmarkRecord 304 + if err := json.Unmarshal(value, &record); err != nil { 305 + return err 306 + } 307 + 308 + createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 309 + 310 + sourceHash := record.SourceHash 311 + if sourceHash == "" && record.Source != "" { 312 + sourceHash = db.HashURL(record.Source) 313 + } 314 + 315 + var titlePtr, descPtr, tagsJSONPtr *string 316 + if record.Title != "" { 317 + t := record.Title 318 + titlePtr = &t 319 + } 320 + if record.Description != "" { 321 + d := record.Description 322 + descPtr = &d 323 + } 324 + if len(record.Tags) > 0 { 325 + tagsBytes, _ := json.Marshal(record.Tags) 326 + tagsStr := string(tagsBytes) 327 + tagsJSONPtr = &tagsStr 328 + } 329 + 330 + return s.db.CreateBookmark(&db.Bookmark{ 331 + URI: uri, 332 + AuthorDID: did, 333 + Source: record.Source, 334 + SourceHash: sourceHash, 335 + Title: titlePtr, 336 + Description: descPtr, 337 + TagsJSON: tagsJSONPtr, 338 + CreatedAt: createdAt, 339 + IndexedAt: time.Now(), 340 + CID: cidPtr, 341 + }) 342 + 343 + case xrpc.CollectionCollection: 344 + var record xrpc.CollectionRecord 345 + if err := json.Unmarshal(value, &record); err != nil { 346 + return err 347 + } 348 + createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 349 + 350 + var descPtr, iconPtr *string 351 + if record.Description != "" { 352 + d := record.Description 353 + descPtr = &d 354 + } 355 + if record.Icon != "" { 356 + i := record.Icon 357 + iconPtr = &i 358 + } 359 + 360 + return s.db.CreateCollection(&db.Collection{ 361 + URI: uri, 362 + AuthorDID: did, 363 + Name: record.Name, 364 + Description: descPtr, 365 + Icon: iconPtr, 366 + CreatedAt: createdAt, 367 + IndexedAt: time.Now(), 368 + }) 369 + 370 + case xrpc.CollectionCollectionItem: 371 + var record xrpc.CollectionItemRecord 372 + if err := json.Unmarshal(value, &record); err != nil { 373 + return err 374 + } 375 + createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 376 + 377 + return s.db.AddToCollection(&db.CollectionItem{ 378 + URI: uri, 379 + AuthorDID: did, 380 + CollectionURI: record.Collection, 381 + AnnotationURI: record.Annotation, 382 + Position: record.Position, 383 + CreatedAt: createdAt, 384 + IndexedAt: time.Now(), 385 + }) 386 + 387 + case xrpc.CollectionReply: 388 + var record xrpc.ReplyRecord 389 + if err := json.Unmarshal(value, &record); err != nil { 390 + return err 391 + } 392 + createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 393 + 394 + var formatPtr *string 395 + if record.Format != "" { 396 + f := record.Format 397 + formatPtr = &f 398 + } 399 + 400 + return s.db.CreateReply(&db.Reply{ 401 + URI: uri, 402 + AuthorDID: did, 403 + ParentURI: record.Parent.URI, 404 + RootURI: record.Root.URI, 405 + Text: record.Text, 406 + Format: formatPtr, 407 + CreatedAt: createdAt, 408 + IndexedAt: time.Now(), 409 + CID: cidPtr, 410 + }) 411 + 412 + case xrpc.CollectionLike: 413 + var record xrpc.LikeRecord 414 + if err := json.Unmarshal(value, &record); err != nil { 415 + return err 416 + } 417 + createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 418 + 419 + return s.db.CreateLike(&db.Like{ 420 + URI: uri, 421 + AuthorDID: did, 422 + SubjectURI: record.Subject.URI, 423 + CreatedAt: createdAt, 424 + IndexedAt: time.Now(), 425 + }) 426 + } 427 + return nil 428 + }
+51
backend/internal/xrpc/utils.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + "net/http" 7 + "strings" 8 + "time" 9 + ) 10 + 11 + func ResolveDIDToPDS(did string) (string, error) { 12 + var docURL string 13 + if strings.HasPrefix(did, "did:plc:") { 14 + docURL = fmt.Sprintf("https://plc.directory/%s", did) 15 + } else if strings.HasPrefix(did, "did:web:") { 16 + domain := strings.TrimPrefix(did, "did:web:") 17 + docURL = fmt.Sprintf("https://%s/.well-known/did.json", domain) 18 + } else { 19 + return "", nil 20 + } 21 + 22 + client := &http.Client{ 23 + Timeout: 10 * time.Second, 24 + } 25 + resp, err := client.Get(docURL) 26 + if err != nil { 27 + return "", err 28 + } 29 + defer resp.Body.Close() 30 + 31 + if resp.StatusCode != 200 { 32 + return "", fmt.Errorf("failed to fetch DID doc: %d", resp.StatusCode) 33 + } 34 + 35 + var doc struct { 36 + Service []struct { 37 + Type string `json:"type"` 38 + ServiceEndpoint string `json:"serviceEndpoint"` 39 + } `json:"service"` 40 + } 41 + if err := json.NewDecoder(resp.Body).Decode(&doc); err != nil { 42 + return "", err 43 + } 44 + 45 + for _, svc := range doc.Service { 46 + if svc.Type == "AtprotoPersonalDataServer" { 47 + return svc.ServiceEndpoint, nil 48 + } 49 + } 50 + return "", nil 51 + }
+4 -3
web/src/components/SignUpModal.jsx
··· 122 122 domain = "." + domain; 123 123 } 124 124 125 - const fullHandle = formData.handle.endsWith(domain) 126 - ? formData.handle 127 - : `${formData.handle}${domain}`; 125 + const cleanHandle = formData.handle.trim().replace(/^@/, ""); 126 + const fullHandle = cleanHandle.endsWith(domain) 127 + ? cleanHandle 128 + : `${cleanHandle}${domain}`; 128 129 129 130 try { 130 131 await createAccount(serverInfo.service, {