(READ ONLY) Margin is an open annotation layer for the internet. Powered by the AT Protocol. margin.at
extension web atproto comments
98
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor some things in backend and implement microcosm

scanash00 1257864d 3920646b

+1144 -78
+14
backend/go.mod
··· 15 15 16 16 require ( 17 17 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect 18 + github.com/fxamacker/cbor/v2 v2.9.0 // indirect 19 + github.com/ipfs/go-cid v0.6.0 // indirect 20 + github.com/klauspost/cpuid/v2 v2.0.9 // indirect 21 + github.com/minio/sha256-simd v1.0.0 // indirect 22 + github.com/mr-tron/base58 v1.2.0 // indirect 23 + github.com/multiformats/go-base32 v0.0.3 // indirect 24 + github.com/multiformats/go-base36 v0.1.0 // indirect 25 + github.com/multiformats/go-multibase v0.2.0 // indirect 26 + github.com/multiformats/go-multihash v0.2.3 // indirect 27 + github.com/multiformats/go-varint v0.1.0 // indirect 18 28 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect 29 + github.com/spaolacci/murmur3 v1.1.0 // indirect 19 30 github.com/stretchr/testify v1.10.0 // indirect 31 + github.com/x448/float16 v0.8.4 // indirect 20 32 golang.org/x/crypto v0.35.0 // indirect 33 + golang.org/x/sys v0.30.0 // indirect 21 34 golang.org/x/text v0.32.0 // indirect 35 + lukechampine.com/blake3 v1.1.6 // indirect 22 36 )
+29
backend/go.sum
··· 1 1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= 2 2 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 3 + github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM= 4 + github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= 3 5 github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= 4 6 github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= 5 7 github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4= ··· 10 12 github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= 11 13 github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= 12 14 github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= 15 + github.com/ipfs/go-cid v0.6.0 h1:DlOReBV1xhHBhhfy/gBNNTSyfOM6rLiIx9J7A4DGf30= 16 + github.com/ipfs/go-cid v0.6.0/go.mod h1:NC4kS1LZjzfhK40UGmpXv5/qD2kcMzACYJNntCUiDhQ= 13 17 github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= 14 18 github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= 19 + github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= 20 + github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= 21 + github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= 15 22 github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= 16 23 github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= 17 24 github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= 18 25 github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= 26 + github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= 27 + github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= 28 + github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= 29 + github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= 30 + github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI= 31 + github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= 32 + github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ89tUg4F4= 33 + github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM= 34 + github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= 35 + github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk= 36 + github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= 37 + github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= 38 + github.com/multiformats/go-varint v0.1.0 h1:i2wqFp4sdl3IcIxfAonHQV9qU5OsZ4Ts9IOoETFs5dI= 39 + github.com/multiformats/go-varint v0.1.0/go.mod h1:5KVAVXegtfmNQQm/lCY+ATvDzvJJhSkUlGQV9wgObdI= 19 40 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= 20 41 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 42 + github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= 43 + github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= 21 44 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= 22 45 github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= 46 + github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= 47 + github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= 23 48 golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= 24 49 golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= 25 50 golang.org/x/image v0.34.0 h1:33gCkyw9hmwbZJeZkct8XyR11yH889EQt/QH4VmXMn8= 26 51 golang.org/x/image v0.34.0/go.mod h1:2RNFBZRB+vnwwFil8GkMdRvrJOFd1AzdZI6vOY+eJVU= 52 + golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= 53 + golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= 27 54 golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= 28 55 golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= 29 56 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= 30 57 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= 58 + lukechampine.com/blake3 v1.1.6 h1:H3cROdztr7RCfoaTpGZFQsrqvweFLrqS73j7L7cmR5c= 59 + lukechampine.com/blake3 v1.1.6/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=
+106
backend/internal/api/handler.go
··· 68 68 r.Post("/sync", h.SyncAll) 69 69 70 70 r.Get("/targets", h.GetByTarget) 71 + r.Get("/discover", h.DiscoverForURL) 71 72 72 73 r.Get("/users/{did}/annotations", h.GetUserAnnotations) 73 74 r.Get("/users/{did}/highlights", h.GetUserHighlights) ··· 628 629 "annotations": enrichedAnnotations, 629 630 "highlights": enrichedHighlights, 630 631 "bookmarks": enrichedBookmarks, 632 + }) 633 + } 634 + 635 + func (h *Handler) DiscoverForURL(w http.ResponseWriter, r *http.Request) { 636 + source := r.URL.Query().Get("source") 637 + if source == "" { 638 + source = r.URL.Query().Get("url") 639 + } 640 + if source == "" { 641 + http.Error(w, "source or url parameter required", http.StatusBadRequest) 642 + return 643 + } 644 + 645 + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) 646 + defer cancel() 647 + 648 + annotations, highlights, bookmarks, err := ConstellationClient.GetAllItemsForURL(ctx, source) 649 + if err != nil { 650 + log.Printf("Constellation discover error, falling back to local: %v", err) 651 + h.GetByTarget(w, r) 652 + return 653 + } 654 + 655 + var annotationURIs, highlightURIs, bookmarkURIs []string 656 + seenURIs := make(map[string]bool) 657 + 658 + for _, link := range annotations { 659 + if !seenURIs[link.URI] { 660 + annotationURIs = append(annotationURIs, link.URI) 661 + seenURIs[link.URI] = true 662 + } 663 + } 664 + for _, link := range highlights { 665 + if !seenURIs[link.URI] { 666 + highlightURIs = append(highlightURIs, link.URI) 667 + seenURIs[link.URI] = true 668 + } 669 + } 670 + for _, link := range bookmarks { 671 + if !seenURIs[link.URI] { 672 + bookmarkURIs = append(bookmarkURIs, link.URI) 673 + seenURIs[link.URI] = true 674 + } 675 + } 676 + 677 + localAnnotations, _ := h.db.GetAnnotationsByURIs(annotationURIs) 678 + localHighlights, _ := h.db.GetHighlightsByURIs(highlightURIs) 679 + localBookmarks, _ := h.db.GetBookmarksByURIs(bookmarkURIs) 680 + 681 + urlHash := db.HashURL(source) 682 + dbAnnotations, _ := h.db.GetAnnotationsByTargetHash(urlHash, 100, 0) 683 + dbHighlights, _ := h.db.GetHighlightsByTargetHash(urlHash, 100, 0) 684 + dbBookmarks, _ := h.db.GetBookmarksByTargetHash(urlHash, 100, 0) 685 + 686 + annoMap := make(map[string]db.Annotation) 687 + for _, a := range localAnnotations { 688 + annoMap[a.URI] = a 689 + } 690 + for _, a := range dbAnnotations { 691 + annoMap[a.URI] = a 692 + } 693 + 694 + highMap := make(map[string]db.Highlight) 695 + for _, h := range localHighlights { 696 + highMap[h.URI] = h 697 + } 698 + for _, h := range dbHighlights { 699 + highMap[h.URI] = h 700 + } 701 + 702 + bookMap := make(map[string]db.Bookmark) 703 + for _, b := range localBookmarks { 704 + bookMap[b.URI] = b 705 + } 706 + for _, b := range dbBookmarks { 707 + bookMap[b.URI] = b 708 + } 709 + 710 + var mergedAnnotations []db.Annotation 711 + for _, a := range annoMap { 712 + mergedAnnotations = append(mergedAnnotations, a) 713 + } 714 + var mergedHighlights []db.Highlight 715 + for _, h := range highMap { 716 + mergedHighlights = append(mergedHighlights, h) 717 + } 718 + var mergedBookmarks []db.Bookmark 719 + for _, b := range bookMap { 720 + mergedBookmarks = append(mergedBookmarks, b) 721 + } 722 + 723 + viewerDID := h.getViewerDID(r) 724 + enrichedAnnotations, _ := hydrateAnnotations(h.db, mergedAnnotations, viewerDID) 725 + enrichedHighlights, _ := hydrateHighlights(h.db, mergedHighlights, viewerDID) 726 + enrichedBookmarks, _ := hydrateBookmarks(h.db, mergedBookmarks, viewerDID) 727 + 728 + w.Header().Set("Content-Type", "application/json") 729 + json.NewEncoder(w).Encode(map[string]interface{}{ 730 + "@context": "http://www.w3.org/ns/anno.jsonld", 731 + "source": source, 732 + "sourceHash": urlHash, 733 + "annotations": enrichedAnnotations, 734 + "highlights": enrichedHighlights, 735 + "bookmarks": enrichedBookmarks, 736 + "networkDiscovered": len(annotations) + len(highlights) + len(bookmarks), 631 737 }) 632 738 } 633 739
+77 -63
backend/internal/api/hydration.go
··· 1 1 package api 2 2 3 3 import ( 4 + "context" 4 5 "encoding/json" 5 6 "fmt" 6 7 "log" ··· 10 11 "sync" 11 12 "time" 12 13 14 + "margin.at/internal/constellation" 13 15 "margin.at/internal/db" 14 16 ) 15 17 16 18 var ( 17 - Cache ProfileCache = NewInMemoryCache(5 * time.Minute) 19 + Cache ProfileCache = NewInMemoryCache(5 * time.Minute) 20 + ConstellationClient *constellation.Client = constellation.NewClient() // Enabled by default 18 21 ) 22 + 23 + func init() { 24 + log.Printf("Constellation client initialized: %s", constellation.DefaultBaseURL) 25 + } 19 26 20 27 type Author struct { 21 28 DID string `json:"did"` ··· 145 152 ReadAt *time.Time `json:"readAt,omitempty"` 146 153 } 147 154 148 - func hydrateAnnotations(database *db.DB, annotations []db.Annotation, viewerDID string) ([]APIAnnotation, error) { 149 - if len(annotations) == 0 { 150 - return []APIAnnotation{}, nil 151 - } 155 + func fetchCounts(ctx context.Context, database *db.DB, uris []string, viewerDID string) (likeCounts, replyCounts map[string]int, viewerLikes map[string]bool) { 156 + likeCounts = make(map[string]int) 157 + replyCounts = make(map[string]int) 158 + viewerLikes = make(map[string]bool) 152 159 153 - profiles := fetchProfilesForDIDs(collectDIDs(annotations, func(a db.Annotation) string { return a.AuthorDID })) 154 - 155 - var likeCounts map[string]int 156 - var replyCounts map[string]int 157 - var viewerLikes map[string]bool 160 + if len(uris) == 0 { 161 + return 162 + } 158 163 159 164 if database != nil { 160 - uris := make([]string, len(annotations)) 161 - for i, a := range annotations { 162 - uris[i] = a.URI 163 - } 164 - 165 165 likeCounts, _ = database.GetLikeCounts(uris) 166 166 replyCounts, _ = database.GetReplyCounts(uris) 167 167 if viewerDID != "" { ··· 169 169 } 170 170 } 171 171 172 + if ConstellationClient != nil && len(uris) <= 5 { 173 + constellationCounts, err := ConstellationClient.GetCountsBatch(ctx, uris) 174 + if err != nil { 175 + log.Printf("Constellation fetch error (non-fatal): %v", err) 176 + return 177 + } 178 + 179 + for uri, counts := range constellationCounts { 180 + if counts.LikeCount > likeCounts[uri] { 181 + likeCounts[uri] = counts.LikeCount 182 + } 183 + if counts.ReplyCount > replyCounts[uri] { 184 + replyCounts[uri] = counts.ReplyCount 185 + } 186 + } 187 + } 188 + 189 + return 190 + } 191 + 192 + func hydrateAnnotations(database *db.DB, annotations []db.Annotation, viewerDID string) ([]APIAnnotation, error) { 193 + if len(annotations) == 0 { 194 + return []APIAnnotation{}, nil 195 + } 196 + 197 + profiles := fetchProfilesForDIDs(collectDIDs(annotations, func(a db.Annotation) string { return a.AuthorDID })) 198 + 199 + uris := make([]string, len(annotations)) 200 + for i, a := range annotations { 201 + uris[i] = a.URI 202 + } 203 + 204 + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) 205 + defer cancel() 206 + likeCounts, replyCounts, viewerLikes := fetchCounts(ctx, database, uris, viewerDID) 207 + 172 208 result := make([]APIAnnotation, len(annotations)) 173 209 for i, a := range annotations { 174 210 var body *APIBody ··· 228 264 IndexedAt: a.IndexedAt, 229 265 } 230 266 231 - if database != nil { 232 - result[i].LikeCount = likeCounts[a.URI] 233 - result[i].ReplyCount = replyCounts[a.URI] 234 - if viewerLikes != nil && viewerLikes[a.URI] { 235 - result[i].ViewerHasLiked = true 236 - } 267 + result[i].LikeCount = likeCounts[a.URI] 268 + result[i].ReplyCount = replyCounts[a.URI] 269 + if viewerLikes != nil && viewerLikes[a.URI] { 270 + result[i].ViewerHasLiked = true 237 271 } 238 272 } 239 273 ··· 247 281 248 282 profiles := fetchProfilesForDIDs(collectDIDs(highlights, func(h db.Highlight) string { return h.AuthorDID })) 249 283 250 - var likeCounts map[string]int 251 - var replyCounts map[string]int 252 - var viewerLikes map[string]bool 253 - 254 - if database != nil { 255 - uris := make([]string, len(highlights)) 256 - for i, h := range highlights { 257 - uris[i] = h.URI 258 - } 259 - 260 - likeCounts, _ = database.GetLikeCounts(uris) 261 - replyCounts, _ = database.GetReplyCounts(uris) 262 - if viewerDID != "" { 263 - viewerLikes, _ = database.GetViewerLikes(viewerDID, uris) 264 - } 284 + uris := make([]string, len(highlights)) 285 + for i, h := range highlights { 286 + uris[i] = h.URI 265 287 } 288 + 289 + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) 290 + defer cancel() 291 + likeCounts, replyCounts, viewerLikes := fetchCounts(ctx, database, uris, viewerDID) 266 292 267 293 result := make([]APIHighlight, len(highlights)) 268 294 for i, h := range highlights { ··· 307 333 CID: cid, 308 334 } 309 335 310 - if database != nil { 311 - result[i].LikeCount = likeCounts[h.URI] 312 - result[i].ReplyCount = replyCounts[h.URI] 313 - if viewerLikes != nil && viewerLikes[h.URI] { 314 - result[i].ViewerHasLiked = true 315 - } 336 + result[i].LikeCount = likeCounts[h.URI] 337 + result[i].ReplyCount = replyCounts[h.URI] 338 + if viewerLikes != nil && viewerLikes[h.URI] { 339 + result[i].ViewerHasLiked = true 316 340 } 317 341 } 318 342 ··· 326 350 327 351 profiles := fetchProfilesForDIDs(collectDIDs(bookmarks, func(b db.Bookmark) string { return b.AuthorDID })) 328 352 329 - var likeCounts map[string]int 330 - var replyCounts map[string]int 331 - var viewerLikes map[string]bool 353 + uris := make([]string, len(bookmarks)) 354 + for i, b := range bookmarks { 355 + uris[i] = b.URI 356 + } 332 357 333 - if database != nil { 334 - uris := make([]string, len(bookmarks)) 335 - for i, b := range bookmarks { 336 - uris[i] = b.URI 337 - } 338 - 339 - likeCounts, _ = database.GetLikeCounts(uris) 340 - replyCounts, _ = database.GetReplyCounts(uris) 341 - if viewerDID != "" { 342 - viewerLikes, _ = database.GetViewerLikes(viewerDID, uris) 343 - } 344 - } 358 + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) 359 + defer cancel() 360 + likeCounts, replyCounts, viewerLikes := fetchCounts(ctx, database, uris, viewerDID) 345 361 346 362 result := make([]APIBookmark, len(bookmarks)) 347 363 for i, b := range bookmarks { ··· 376 392 CreatedAt: b.CreatedAt, 377 393 CID: cid, 378 394 } 379 - if database != nil { 380 - result[i].LikeCount = likeCounts[b.URI] 381 - result[i].ReplyCount = replyCounts[b.URI] 382 - if viewerLikes != nil && viewerLikes[b.URI] { 383 - result[i].ViewerHasLiked = true 384 - } 395 + result[i].LikeCount = likeCounts[b.URI] 396 + result[i].ReplyCount = replyCounts[b.URI] 397 + if viewerLikes != nil && viewerLikes[b.URI] { 398 + result[i].ViewerHasLiked = true 385 399 } 386 400 } 387 401
+341
backend/internal/constellation/client.go
··· 1 + package constellation 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "net/http" 8 + "net/url" 9 + "sync" 10 + "time" 11 + ) 12 + 13 + const ( 14 + DefaultBaseURL = "https://constellation.microcosm.blue" 15 + DefaultTimeout = 5 * time.Second 16 + UserAgent = "Margin (margin.at)" 17 + ) 18 + 19 + type Client struct { 20 + baseURL string 21 + httpClient *http.Client 22 + } 23 + 24 + func NewClient() *Client { 25 + return &Client{ 26 + baseURL: DefaultBaseURL, 27 + httpClient: &http.Client{ 28 + Timeout: DefaultTimeout, 29 + }, 30 + } 31 + } 32 + 33 + func NewClientWithURL(baseURL string) *Client { 34 + return &Client{ 35 + baseURL: baseURL, 36 + httpClient: &http.Client{ 37 + Timeout: DefaultTimeout, 38 + }, 39 + } 40 + } 41 + 42 + type CountResponse struct { 43 + Total int `json:"total"` 44 + } 45 + 46 + type Link struct { 47 + URI string `json:"uri"` 48 + Collection string `json:"collection"` 49 + DID string `json:"did"` 50 + Path string `json:"path"` 51 + } 52 + 53 + type LinksResponse struct { 54 + Links []Link `json:"links"` 55 + Cursor string `json:"cursor,omitempty"` 56 + } 57 + 58 + func (c *Client) GetLikeCount(ctx context.Context, subjectURI string) (int, error) { 59 + params := url.Values{} 60 + params.Set("target", subjectURI) 61 + params.Set("collection", "at.margin.like") 62 + params.Set("path", ".subject.uri") 63 + 64 + endpoint := fmt.Sprintf("%s/links/count/distinct-dids?%s", c.baseURL, params.Encode()) 65 + 66 + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) 67 + if err != nil { 68 + return 0, fmt.Errorf("failed to create request: %w", err) 69 + } 70 + req.Header.Set("User-Agent", UserAgent) 71 + 72 + resp, err := c.httpClient.Do(req) 73 + if err != nil { 74 + return 0, fmt.Errorf("request failed: %w", err) 75 + } 76 + defer resp.Body.Close() 77 + 78 + if resp.StatusCode != http.StatusOK { 79 + return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 80 + } 81 + 82 + var countResp CountResponse 83 + if err := json.NewDecoder(resp.Body).Decode(&countResp); err != nil { 84 + return 0, fmt.Errorf("failed to decode response: %w", err) 85 + } 86 + 87 + return countResp.Total, nil 88 + } 89 + 90 + func (c *Client) GetReplyCount(ctx context.Context, rootURI string) (int, error) { 91 + params := url.Values{} 92 + params.Set("target", rootURI) 93 + params.Set("collection", "at.margin.reply") 94 + params.Set("path", ".root.uri") 95 + 96 + endpoint := fmt.Sprintf("%s/links/count?%s", c.baseURL, params.Encode()) 97 + 98 + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) 99 + if err != nil { 100 + return 0, fmt.Errorf("failed to create request: %w", err) 101 + } 102 + req.Header.Set("User-Agent", UserAgent) 103 + 104 + resp, err := c.httpClient.Do(req) 105 + if err != nil { 106 + return 0, fmt.Errorf("request failed: %w", err) 107 + } 108 + defer resp.Body.Close() 109 + 110 + if resp.StatusCode != http.StatusOK { 111 + return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 112 + } 113 + 114 + var countResp CountResponse 115 + if err := json.NewDecoder(resp.Body).Decode(&countResp); err != nil { 116 + return 0, fmt.Errorf("failed to decode response: %w", err) 117 + } 118 + 119 + return countResp.Total, nil 120 + } 121 + 122 + type CountsResult struct { 123 + LikeCount int 124 + ReplyCount int 125 + } 126 + 127 + func (c *Client) GetCountsBatch(ctx context.Context, uris []string) (map[string]CountsResult, error) { 128 + if len(uris) == 0 { 129 + return map[string]CountsResult{}, nil 130 + } 131 + 132 + results := make(map[string]CountsResult) 133 + var mu sync.Mutex 134 + var wg sync.WaitGroup 135 + 136 + semaphore := make(chan struct{}, 10) 137 + 138 + for _, uri := range uris { 139 + wg.Add(1) 140 + go func(u string) { 141 + defer wg.Done() 142 + semaphore <- struct{}{} 143 + defer func() { <-semaphore }() 144 + 145 + likeCount, _ := c.GetLikeCount(ctx, u) 146 + replyCount, _ := c.GetReplyCount(ctx, u) 147 + 148 + mu.Lock() 149 + results[u] = CountsResult{ 150 + LikeCount: likeCount, 151 + ReplyCount: replyCount, 152 + } 153 + mu.Unlock() 154 + }(uri) 155 + } 156 + 157 + wg.Wait() 158 + return results, nil 159 + } 160 + 161 + func (c *Client) GetAnnotationsForURL(ctx context.Context, targetURL string) ([]Link, error) { 162 + params := url.Values{} 163 + params.Set("target", targetURL) 164 + params.Set("collection", "at.margin.annotation") 165 + params.Set("path", ".target.source") 166 + 167 + endpoint := fmt.Sprintf("%s/links?%s", c.baseURL, params.Encode()) 168 + 169 + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) 170 + if err != nil { 171 + return nil, fmt.Errorf("failed to create request: %w", err) 172 + } 173 + req.Header.Set("User-Agent", UserAgent) 174 + 175 + resp, err := c.httpClient.Do(req) 176 + if err != nil { 177 + return nil, fmt.Errorf("request failed: %w", err) 178 + } 179 + defer resp.Body.Close() 180 + 181 + if resp.StatusCode != http.StatusOK { 182 + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 183 + } 184 + 185 + var linksResp LinksResponse 186 + if err := json.NewDecoder(resp.Body).Decode(&linksResp); err != nil { 187 + return nil, fmt.Errorf("failed to decode response: %w", err) 188 + } 189 + 190 + return linksResp.Links, nil 191 + } 192 + 193 + func (c *Client) GetHighlightsForURL(ctx context.Context, targetURL string) ([]Link, error) { 194 + params := url.Values{} 195 + params.Set("target", targetURL) 196 + params.Set("collection", "at.margin.highlight") 197 + params.Set("path", ".target.source") 198 + 199 + endpoint := fmt.Sprintf("%s/links?%s", c.baseURL, params.Encode()) 200 + 201 + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) 202 + if err != nil { 203 + return nil, fmt.Errorf("failed to create request: %w", err) 204 + } 205 + req.Header.Set("User-Agent", UserAgent) 206 + 207 + resp, err := c.httpClient.Do(req) 208 + if err != nil { 209 + return nil, fmt.Errorf("request failed: %w", err) 210 + } 211 + defer resp.Body.Close() 212 + 213 + if resp.StatusCode != http.StatusOK { 214 + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 215 + } 216 + 217 + var linksResp LinksResponse 218 + if err := json.NewDecoder(resp.Body).Decode(&linksResp); err != nil { 219 + return nil, fmt.Errorf("failed to decode response: %w", err) 220 + } 221 + 222 + return linksResp.Links, nil 223 + } 224 + 225 + func (c *Client) GetBookmarksForURL(ctx context.Context, targetURL string) ([]Link, error) { 226 + params := url.Values{} 227 + params.Set("target", targetURL) 228 + params.Set("collection", "at.margin.bookmark") 229 + params.Set("path", ".source") 230 + 231 + endpoint := fmt.Sprintf("%s/links?%s", c.baseURL, params.Encode()) 232 + 233 + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) 234 + if err != nil { 235 + return nil, fmt.Errorf("failed to create request: %w", err) 236 + } 237 + req.Header.Set("User-Agent", UserAgent) 238 + 239 + resp, err := c.httpClient.Do(req) 240 + if err != nil { 241 + return nil, fmt.Errorf("request failed: %w", err) 242 + } 243 + defer resp.Body.Close() 244 + 245 + if resp.StatusCode != http.StatusOK { 246 + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 247 + } 248 + 249 + var linksResp LinksResponse 250 + if err := json.NewDecoder(resp.Body).Decode(&linksResp); err != nil { 251 + return nil, fmt.Errorf("failed to decode response: %w", err) 252 + } 253 + 254 + return linksResp.Links, nil 255 + } 256 + 257 + func (c *Client) GetAllItemsForURL(ctx context.Context, targetURL string) (annotations, highlights, bookmarks []Link, err error) { 258 + var wg sync.WaitGroup 259 + var mu sync.Mutex 260 + var errs []error 261 + 262 + wg.Add(3) 263 + 264 + go func() { 265 + defer wg.Done() 266 + links, e := c.GetAnnotationsForURL(ctx, targetURL) 267 + mu.Lock() 268 + defer mu.Unlock() 269 + if e != nil { 270 + errs = append(errs, e) 271 + } else { 272 + annotations = links 273 + } 274 + }() 275 + 276 + go func() { 277 + defer wg.Done() 278 + links, e := c.GetHighlightsForURL(ctx, targetURL) 279 + mu.Lock() 280 + defer mu.Unlock() 281 + if e != nil { 282 + errs = append(errs, e) 283 + } else { 284 + highlights = links 285 + } 286 + }() 287 + 288 + go func() { 289 + defer wg.Done() 290 + links, e := c.GetBookmarksForURL(ctx, targetURL) 291 + mu.Lock() 292 + defer mu.Unlock() 293 + if e != nil { 294 + errs = append(errs, e) 295 + } else { 296 + bookmarks = links 297 + } 298 + }() 299 + 300 + wg.Wait() 301 + 302 + if len(errs) > 0 { 303 + return annotations, highlights, bookmarks, errs[0] 304 + } 305 + 306 + return annotations, highlights, bookmarks, nil 307 + } 308 + 309 + func (c *Client) GetLikers(ctx context.Context, subjectURI string) ([]string, error) { 310 + params := url.Values{} 311 + params.Set("target", subjectURI) 312 + params.Set("collection", "at.margin.like") 313 + params.Set("path", ".subject.uri") 314 + 315 + endpoint := fmt.Sprintf("%s/links/distinct-dids?%s", c.baseURL, params.Encode()) 316 + 317 + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) 318 + if err != nil { 319 + return nil, fmt.Errorf("failed to create request: %w", err) 320 + } 321 + req.Header.Set("User-Agent", UserAgent) 322 + 323 + resp, err := c.httpClient.Do(req) 324 + if err != nil { 325 + return nil, fmt.Errorf("request failed: %w", err) 326 + } 327 + defer resp.Body.Close() 328 + 329 + if resp.StatusCode != http.StatusOK { 330 + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 331 + } 332 + 333 + var result struct { 334 + DIDs []string `json:"dids"` 335 + } 336 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 337 + return nil, fmt.Errorf("failed to decode response: %w", err) 338 + } 339 + 340 + return result.DIDs, nil 341 + }
+228
backend/internal/crypto/cid.go
··· 1 + package crypto 2 + 3 + import ( 4 + "bytes" 5 + "encoding/json" 6 + "fmt" 7 + "sort" 8 + "strings" 9 + 10 + "github.com/fxamacker/cbor/v2" 11 + "github.com/ipfs/go-cid" 12 + "github.com/multiformats/go-multihash" 13 + ) 14 + 15 + const ( 16 + DagCBORCodec = 0x71 17 + SHA256Code = multihash.SHA2_256 18 + ) 19 + 20 + type CIDVerificationError struct { 21 + ExpectedCID string 22 + ComputedCID string 23 + RecordURI string 24 + } 25 + 26 + func (e *CIDVerificationError) Error() string { 27 + return fmt.Sprintf("CID verification failed for %s: expected %s, computed %s", 28 + e.RecordURI, e.ExpectedCID, e.ComputedCID) 29 + } 30 + 31 + func VerifyRecordCID(recordJSON json.RawMessage, expectedCID string, recordURI string) error { 32 + if expectedCID == "" { 33 + return nil 34 + } 35 + 36 + expectedC, err := cid.Decode(expectedCID) 37 + if err != nil { 38 + return fmt.Errorf("invalid CID format: %w", err) 39 + } 40 + 41 + cborBytes, err := jsonToDAGCBOR(recordJSON) 42 + if err != nil { 43 + return fmt.Errorf("failed to encode as DAG-CBOR: %w", err) 44 + } 45 + 46 + mh, err := multihash.Sum(cborBytes, SHA256Code, -1) 47 + if err != nil { 48 + return fmt.Errorf("failed to compute hash: %w", err) 49 + } 50 + 51 + computedC := cid.NewCidV1(DagCBORCodec, mh) 52 + 53 + if !expectedC.Equals(computedC) { 54 + return &CIDVerificationError{ 55 + ExpectedCID: expectedCID, 56 + ComputedCID: computedC.String(), 57 + RecordURI: recordURI, 58 + } 59 + } 60 + 61 + return nil 62 + } 63 + 64 + func jsonToDAGCBOR(jsonData json.RawMessage) ([]byte, error) { 65 + var data interface{} 66 + if err := json.Unmarshal(jsonData, &data); err != nil { 67 + return nil, err 68 + } 69 + 70 + processed := processValue(data) 71 + 72 + encMode, err := cbor.CanonicalEncOptions().EncMode() 73 + if err != nil { 74 + return nil, err 75 + } 76 + 77 + return encMode.Marshal(processed) 78 + } 79 + 80 + func processValue(v interface{}) interface{} { 81 + switch val := v.(type) { 82 + case map[string]interface{}: 83 + return processMap(val) 84 + case []interface{}: 85 + result := make([]interface{}, len(val)) 86 + for i, item := range val { 87 + result[i] = processValue(item) 88 + } 89 + return result 90 + case float64: 91 + if val == float64(int64(val)) { 92 + return int64(val) 93 + } 94 + return val 95 + case string: 96 + return val 97 + default: 98 + return val 99 + } 100 + } 101 + 102 + func processMap(m map[string]interface{}) interface{} { 103 + if link, ok := m["$link"].(string); ok && len(m) == 1 { 104 + c, err := cid.Decode(link) 105 + if err == nil { 106 + return cbor.Tag{ 107 + Number: 42, 108 + Content: append([]byte{0x00}, c.Bytes()...), 109 + } 110 + } 111 + } 112 + 113 + if bytesStr, ok := m["$bytes"].(string); ok && len(m) == 1 { 114 + bytesStr = strings.TrimRight(bytesStr, "=") 115 + decoded := decodeBase64(bytesStr) 116 + if decoded != nil { 117 + return decoded 118 + } 119 + } 120 + 121 + keys := make([]string, 0, len(m)) 122 + for k := range m { 123 + keys = append(keys, k) 124 + } 125 + sort.Strings(keys) 126 + 127 + result := make(map[string]interface{}, len(m)) 128 + for _, k := range keys { 129 + result[k] = processValue(m[k]) 130 + } 131 + 132 + return result 133 + } 134 + 135 + func decodeBase64(s string) []byte { 136 + switch len(s) % 4 { 137 + case 2: 138 + s += "==" 139 + case 3: 140 + s += "=" 141 + } 142 + 143 + decoded := make([]byte, len(s)) 144 + n := 0 145 + for i := 0; i < len(s); i += 4 { 146 + if i+4 > len(s) { 147 + break 148 + } 149 + chunk := s[i : i+4] 150 + val := uint32(0) 151 + for _, c := range chunk { 152 + var v byte 153 + switch { 154 + case c >= 'A' && c <= 'Z': 155 + v = byte(c - 'A') 156 + case c >= 'a' && c <= 'z': 157 + v = byte(c - 'a' + 26) 158 + case c >= '0' && c <= '9': 159 + v = byte(c - '0' + 52) 160 + case c == '+' || c == '-': 161 + v = 62 162 + case c == '/' || c == '_': 163 + v = 63 164 + case c == '=': 165 + v = 0 166 + default: 167 + return nil 168 + } 169 + val = val<<6 | uint32(v) 170 + } 171 + decoded[n] = byte(val >> 16) 172 + n++ 173 + if chunk[2] != '=' { 174 + decoded[n] = byte(val >> 8) 175 + n++ 176 + } 177 + if chunk[3] != '=' { 178 + decoded[n] = byte(val) 179 + n++ 180 + } 181 + } 182 + return decoded[:n] 183 + } 184 + 185 + func VerifyRecordCIDBatch(records []struct { 186 + JSON json.RawMessage 187 + CID string 188 + URI string 189 + }) []error { 190 + var errors []error 191 + for _, r := range records { 192 + if err := VerifyRecordCID(r.JSON, r.CID, r.URI); err != nil { 193 + errors = append(errors, err) 194 + } 195 + } 196 + return errors 197 + } 198 + 199 + func MustVerifyRecordCID(recordJSON json.RawMessage, expectedCID string, recordURI string) bool { 200 + return VerifyRecordCID(recordJSON, expectedCID, recordURI) == nil 201 + } 202 + 203 + func ComputeRecordCID(recordJSON json.RawMessage) (string, error) { 204 + cborBytes, err := jsonToDAGCBOR(recordJSON) 205 + if err != nil { 206 + return "", fmt.Errorf("failed to encode as DAG-CBOR: %w", err) 207 + } 208 + 209 + mh, err := multihash.Sum(cborBytes, SHA256Code, -1) 210 + if err != nil { 211 + return "", fmt.Errorf("failed to compute hash: %w", err) 212 + } 213 + 214 + c := cid.NewCidV1(DagCBORCodec, mh) 215 + return c.String(), nil 216 + } 217 + 218 + func CompareRecordBytes(a, b json.RawMessage) (bool, error) { 219 + cborA, err := jsonToDAGCBOR(a) 220 + if err != nil { 221 + return false, err 222 + } 223 + cborB, err := jsonToDAGCBOR(b) 224 + if err != nil { 225 + return false, err 226 + } 227 + return bytes.Equal(cborA, cborB), nil 228 + }
+40 -7
backend/internal/firehose/ingester.go
··· 10 10 "time" 11 11 12 12 "github.com/gorilla/websocket" 13 + "margin.at/internal/crypto" 13 14 "margin.at/internal/db" 14 15 internal_sync "margin.at/internal/sync" 15 16 "margin.at/internal/xrpc" 16 17 ) 18 + 19 + var CIDVerificationEnabled = true 17 20 18 21 const ( 19 22 CollectionAnnotation = "at.margin.annotation" ··· 28 31 CollectionSembleCollection = "network.cosmik.collection" 29 32 ) 30 33 31 - var RelayURL = "wss://jetstream2.us-east.bsky.network/subscribe" 34 + var RelayURLs = []string{ 35 + "wss://jetstream2.us-east.bsky.network/subscribe", 36 + "wss://jetstream2.fr.hose.cam/subscribe", 37 + "wss://jetstream.fire.hose.cam/subscribe", 38 + } 39 + 40 + var RelayURL = RelayURLs[0] 32 41 33 42 type Ingester struct { 34 - db *db.DB 35 - sync *internal_sync.Service 36 - cancel context.CancelFunc 37 - handlers map[string]RecordHandler 43 + db *db.DB 44 + sync *internal_sync.Service 45 + cancel context.CancelFunc 46 + handlers map[string]RecordHandler 47 + currentRelayIdx int 38 48 } 39 49 40 50 type RecordHandler func(event *FirehoseEvent) ··· 80 90 } 81 91 82 92 func (i *Ingester) run(ctx context.Context) { 93 + consecutiveFailures := 0 94 + maxFailuresBeforeSwitch := 3 95 + 83 96 for { 84 97 select { 85 98 case <-ctx.Done(): 86 99 return 87 100 default: 88 101 if err := i.subscribe(ctx); err != nil { 89 - log.Printf("Jetstream error: %v, reconnecting in 5s...", err) 102 + consecutiveFailures++ 103 + log.Printf("Jetstream error (relay %d): %v, reconnecting in 5s...", i.currentRelayIdx, err) 104 + 105 + if consecutiveFailures >= maxFailuresBeforeSwitch { 106 + i.currentRelayIdx = (i.currentRelayIdx + 1) % len(RelayURLs) 107 + log.Printf("Switching to relay %d: %s", i.currentRelayIdx, RelayURLs[i.currentRelayIdx]) 108 + consecutiveFailures = 0 109 + } 110 + 90 111 if ctx.Err() != nil { 91 112 return 92 113 } 93 114 time.Sleep(5 * time.Second) 115 + } else { 116 + consecutiveFailures = 0 94 117 } 95 118 } 96 119 } ··· 120 143 collections = append(collections, collection) 121 144 } 122 145 123 - url := fmt.Sprintf("%s?wantedCollections=%s", RelayURL, strings.Join(collections, "&wantedCollections=")) 146 + relayURL := RelayURLs[i.currentRelayIdx] 147 + url := fmt.Sprintf("%s?wantedCollections=%s", relayURL, strings.Join(collections, "&wantedCollections=")) 124 148 if cursor > 0 { 125 149 url = fmt.Sprintf("%s&cursor=%d", url, cursor) 126 150 } ··· 171 195 switch commit.Operation { 172 196 case "create", "update": 173 197 if len(commit.Record) > 0 { 198 + if CIDVerificationEnabled && commit.Cid != "" { 199 + if err := crypto.VerifyRecordCID(commit.Record, commit.Cid, uri); err != nil { 200 + log.Printf("CID verification failed for %s: %v (skipping)", uri, err) 201 + return 202 + } 203 + } 204 + 174 205 firehoseEvent := &FirehoseEvent{ 175 206 Repo: event.Did, 176 207 Collection: commit.Collection, ··· 178 209 Record: commit.Record, 179 210 Operation: commit.Operation, 180 211 Cursor: event.Time, 212 + CID: commit.Cid, 181 213 } 182 214 183 215 i.dispatchToHandler(firehoseEvent) ··· 267 299 Record json.RawMessage `json:"record"` 268 300 Operation string `json:"operation"` 269 301 Cursor int64 `json:"cursor"` 302 + CID string `json:"cid"` 270 303 } 271 304 272 305 func (i *Ingester) handleAnnotation(event *FirehoseEvent) {
+6
backend/internal/oauth/handler.go
··· 330 330 return 331 331 } 332 332 333 + if tokenResp.Sub != pending.DID { 334 + log.Printf("Security: OAuth sub mismatch, expected %s, got %s", pending.DID, tokenResp.Sub) 335 + http.Error(w, "Account identity mismatch, authorization returned different account", http.StatusBadRequest) 336 + return 337 + } 338 + 333 339 _ = newNonce 334 340 335 341 sessionID := generateSessionID()
+179
backend/internal/slingshot/client.go
··· 1 + package slingshot 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "net/http" 8 + "net/url" 9 + "time" 10 + ) 11 + 12 + const ( 13 + DefaultBaseURL = "https://slingshot.microcosm.blue" 14 + DefaultTimeout = 5 * time.Second 15 + UserAgent = "Margin (margin.at)" 16 + ) 17 + 18 + type Client struct { 19 + baseURL string 20 + httpClient *http.Client 21 + } 22 + 23 + func NewClient() *Client { 24 + return &Client{ 25 + baseURL: DefaultBaseURL, 26 + httpClient: &http.Client{ 27 + Timeout: DefaultTimeout, 28 + }, 29 + } 30 + } 31 + 32 + func NewClientWithURL(baseURL string) *Client { 33 + return &Client{ 34 + baseURL: baseURL, 35 + httpClient: &http.Client{ 36 + Timeout: DefaultTimeout, 37 + }, 38 + } 39 + } 40 + 41 + type Identity struct { 42 + DID string `json:"did"` 43 + Handle string `json:"handle"` 44 + PDS string `json:"pds"` 45 + } 46 + 47 + type Record struct { 48 + URI string `json:"uri"` 49 + CID string `json:"cid"` 50 + Value json.RawMessage `json:"value"` 51 + } 52 + 53 + func (c *Client) ResolveIdentity(ctx context.Context, identifier string) (*Identity, error) { 54 + endpoint := fmt.Sprintf("%s/identity/%s", c.baseURL, url.PathEscape(identifier)) 55 + 56 + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) 57 + if err != nil { 58 + return nil, fmt.Errorf("failed to create request: %w", err) 59 + } 60 + req.Header.Set("User-Agent", UserAgent) 61 + 62 + resp, err := c.httpClient.Do(req) 63 + if err != nil { 64 + return nil, fmt.Errorf("request failed: %w", err) 65 + } 66 + defer resp.Body.Close() 67 + 68 + if resp.StatusCode == http.StatusNotFound { 69 + return nil, fmt.Errorf("identity not found: %s", identifier) 70 + } 71 + 72 + if resp.StatusCode != http.StatusOK { 73 + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 74 + } 75 + 76 + var identity Identity 77 + if err := json.NewDecoder(resp.Body).Decode(&identity); err != nil { 78 + return nil, fmt.Errorf("failed to decode response: %w", err) 79 + } 80 + 81 + return &identity, nil 82 + } 83 + 84 + func (c *Client) GetRecord(ctx context.Context, uri string) (*Record, error) { 85 + params := url.Values{} 86 + params.Set("uri", uri) 87 + 88 + endpoint := fmt.Sprintf("%s/record?%s", c.baseURL, params.Encode()) 89 + 90 + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) 91 + if err != nil { 92 + return nil, fmt.Errorf("failed to create request: %w", err) 93 + } 94 + req.Header.Set("User-Agent", UserAgent) 95 + 96 + resp, err := c.httpClient.Do(req) 97 + if err != nil { 98 + return nil, fmt.Errorf("request failed: %w", err) 99 + } 100 + defer resp.Body.Close() 101 + 102 + if resp.StatusCode == http.StatusNotFound { 103 + return nil, fmt.Errorf("record not found: %s", uri) 104 + } 105 + 106 + if resp.StatusCode != http.StatusOK { 107 + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 108 + } 109 + 110 + var record Record 111 + if err := json.NewDecoder(resp.Body).Decode(&record); err != nil { 112 + return nil, fmt.Errorf("failed to decode response: %w", err) 113 + } 114 + 115 + return &record, nil 116 + } 117 + 118 + func (c *Client) GetRecordByParts(ctx context.Context, repo, collection, rkey string) (*Record, error) { 119 + uri := fmt.Sprintf("at://%s/%s/%s", repo, collection, rkey) 120 + return c.GetRecord(ctx, uri) 121 + } 122 + 123 + type ListRecordsResponse struct { 124 + Records []Record `json:"records"` 125 + Cursor string `json:"cursor,omitempty"` 126 + } 127 + 128 + func (c *Client) ListRecords(ctx context.Context, repo, collection string, limit int, cursor string) (*ListRecordsResponse, error) { 129 + params := url.Values{} 130 + params.Set("repo", repo) 131 + params.Set("collection", collection) 132 + if limit > 0 { 133 + params.Set("limit", fmt.Sprintf("%d", limit)) 134 + } 135 + if cursor != "" { 136 + params.Set("cursor", cursor) 137 + } 138 + 139 + endpoint := fmt.Sprintf("%s/records?%s", c.baseURL, params.Encode()) 140 + 141 + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) 142 + if err != nil { 143 + return nil, fmt.Errorf("failed to create request: %w", err) 144 + } 145 + req.Header.Set("User-Agent", UserAgent) 146 + 147 + resp, err := c.httpClient.Do(req) 148 + if err != nil { 149 + return nil, fmt.Errorf("request failed: %w", err) 150 + } 151 + defer resp.Body.Close() 152 + 153 + if resp.StatusCode != http.StatusOK { 154 + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 155 + } 156 + 157 + var listResp ListRecordsResponse 158 + if err := json.NewDecoder(resp.Body).Decode(&listResp); err != nil { 159 + return nil, fmt.Errorf("failed to decode response: %w", err) 160 + } 161 + 162 + return &listResp, nil 163 + } 164 + 165 + func (c *Client) ResolveDID(ctx context.Context, did string) (string, error) { 166 + identity, err := c.ResolveIdentity(ctx, did) 167 + if err != nil { 168 + return "", err 169 + } 170 + return identity.PDS, nil 171 + } 172 + 173 + func (c *Client) ResolveHandle(ctx context.Context, handle string) (string, error) { 174 + identity, err := c.ResolveIdentity(ctx, handle) 175 + if err != nil { 176 + return "", err 177 + } 178 + return identity.DID, nil 179 + }
+11
backend/internal/sync/service.go
··· 5 5 "encoding/json" 6 6 "fmt" 7 7 "io" 8 + "log" 8 9 "net/http" 9 10 "strings" 10 11 "time" 11 12 13 + "margin.at/internal/crypto" 12 14 "margin.at/internal/db" 13 15 "margin.at/internal/xrpc" 14 16 ) 17 + 18 + var CIDVerificationEnabled = true 15 19 16 20 type Service struct { 17 21 db *db.DB ··· 82 86 } 83 87 84 88 for _, rec := range output.Records { 89 + if CIDVerificationEnabled && rec.CID != "" { 90 + if err := crypto.VerifyRecordCID(rec.Value, rec.CID, rec.URI); err != nil { 91 + log.Printf("CID verification failed for %s: %v (skipping)", rec.URI, err) 92 + continue 93 + } 94 + } 95 + 85 96 err := s.upsertRecord(did, collectionNSID, rec.URI, rec.CID, rec.Value) 86 97 if err != nil { 87 98 fmt.Printf("Error upserting %s: %v\n", rec.URI, err)
+6 -8
backend/internal/xrpc/client.go
··· 11 11 "fmt" 12 12 "io" 13 13 "net/http" 14 - "strings" 15 14 "time" 16 15 17 16 "github.com/go-jose/go-jose/v4" ··· 193 192 } 194 193 195 194 func (c *Client) DeleteRecordByURI(ctx context.Context, uri string) error { 196 - 197 - if !strings.HasPrefix(uri, "at://") { 198 - return fmt.Errorf("invalid AT URI format") 195 + parsed, err := ParseATURI(uri) 196 + if err != nil { 197 + return err 199 198 } 200 199 201 - parts := strings.Split(strings.TrimPrefix(uri, "at://"), "/") 202 - if len(parts) != 3 { 203 - return fmt.Errorf("invalid AT URI format") 200 + if parsed.Collection == "" || parsed.RKey == "" { 201 + return fmt.Errorf("invalid AT-URI: must include collection and rkey") 204 202 } 205 203 206 - return c.DeleteRecord(ctx, parts[0], parts[1], parts[2]) 204 + return c.DeleteRecord(ctx, parsed.DID, parsed.Collection, parsed.RKey) 207 205 } 208 206 209 207 type PutRecordInput struct {
+107
backend/internal/xrpc/utils.go
··· 1 1 package xrpc 2 2 3 3 import ( 4 + "context" 4 5 "encoding/json" 5 6 "fmt" 7 + "log" 6 8 "net/http" 9 + "regexp" 7 10 "strings" 8 11 "time" 12 + 13 + "margin.at/internal/slingshot" 9 14 ) 10 15 16 + var SlingshotClient = slingshot.NewClient() 17 + 18 + var ( 19 + didPattern = regexp.MustCompile(`^did:[a-z]+:[a-zA-Z0-9._:%-]+$`) 20 + nsidPattern = regexp.MustCompile(`^[a-z][a-z0-9]*(\.[a-z][a-z0-9]*)+$`) 21 + rkeyPattern = regexp.MustCompile(`^[a-zA-Z0-9._-]+$`) 22 + ) 23 + 24 + type ATURI struct { 25 + DID string 26 + Collection string 27 + RKey string 28 + } 29 + 30 + func ParseATURI(uri string) (*ATURI, error) { 31 + if !strings.HasPrefix(uri, "at://") { 32 + return nil, fmt.Errorf("invalid AT-URI: must start with at://") 33 + } 34 + 35 + path := strings.TrimPrefix(uri, "at://") 36 + parts := strings.Split(path, "/") 37 + 38 + if len(parts) < 1 || parts[0] == "" { 39 + return nil, fmt.Errorf("invalid AT-URI: missing DID authority") 40 + } 41 + 42 + did := parts[0] 43 + if !didPattern.MatchString(did) { 44 + return nil, fmt.Errorf("invalid AT-URI: malformed DID %q", did) 45 + } 46 + 47 + result := &ATURI{DID: did} 48 + 49 + if len(parts) >= 2 && parts[1] != "" { 50 + collection := parts[1] 51 + if !nsidPattern.MatchString(collection) { 52 + return nil, fmt.Errorf("invalid AT-URI: malformed collection NSID %q", collection) 53 + } 54 + result.Collection = collection 55 + } 56 + 57 + if len(parts) >= 3 && parts[2] != "" { 58 + rkey := parts[2] 59 + if !rkeyPattern.MatchString(rkey) || strings.HasPrefix(rkey, ".") || strings.HasSuffix(rkey, ".") { 60 + return nil, fmt.Errorf("invalid AT-URI: malformed record key %q", rkey) 61 + } 62 + if len(rkey) > 512 { 63 + return nil, fmt.Errorf("invalid AT-URI: record key too long (max 512)") 64 + } 65 + result.RKey = rkey 66 + } 67 + 68 + if len(parts) > 3 { 69 + return nil, fmt.Errorf("invalid AT-URI: too many path segments") 70 + } 71 + 72 + return result, nil 73 + } 74 + 75 + func (a *ATURI) String() string { 76 + if a.Collection == "" { 77 + return fmt.Sprintf("at://%s", a.DID) 78 + } 79 + if a.RKey == "" { 80 + return fmt.Sprintf("at://%s/%s", a.DID, a.Collection) 81 + } 82 + return fmt.Sprintf("at://%s/%s/%s", a.DID, a.Collection, a.RKey) 83 + } 84 + 85 + func init() { 86 + log.Printf("Slingshot client initialized: %s", slingshot.DefaultBaseURL) 87 + } 88 + 11 89 func ResolveDIDToPDS(did string) (string, error) { 90 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 91 + defer cancel() 92 + 93 + if pds, err := SlingshotClient.ResolveDID(ctx, did); err == nil && pds != "" { 94 + return pds, nil 95 + } 96 + 97 + return resolveDIDToPDSDirect(did) 98 + } 99 + 100 + func resolveDIDToPDSDirect(did string) (string, error) { 12 101 var docURL string 13 102 if strings.HasPrefix(did, "did:plc:") { 14 103 docURL = fmt.Sprintf("https://plc.directory/%s", did) ··· 34 123 35 124 var doc struct { 36 125 Service []struct { 126 + ID string `json:"id"` 37 127 Type string `json:"type"` 38 128 ServiceEndpoint string `json:"serviceEndpoint"` 39 129 } `json:"service"` ··· 43 133 } 44 134 45 135 for _, svc := range doc.Service { 136 + if svc.ID == "#atproto_pds" && svc.Type == "AtprotoPersonalDataServer" { 137 + return svc.ServiceEndpoint, nil 138 + } 139 + } 140 + for _, svc := range doc.Service { 46 141 if svc.Type == "AtprotoPersonalDataServer" { 47 142 return svc.ServiceEndpoint, nil 48 143 } 49 144 } 50 145 return "", nil 51 146 } 147 + 52 148 func ResolveHandle(handle string) (string, error) { 53 149 if strings.HasPrefix(handle, "did:") { 54 150 return handle, nil 55 151 } 56 152 153 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 154 + defer cancel() 155 + 156 + if did, err := SlingshotClient.ResolveHandle(ctx, handle); err == nil && did != "" { 157 + return did, nil 158 + } 159 + 160 + return resolveHandleDirect(handle) 161 + } 162 + 163 + func resolveHandleDirect(handle string) (string, error) { 57 164 url := fmt.Sprintf("https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle=%s", handle) 58 165 client := &http.Client{ 59 166 Timeout: 5 * time.Second,