this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

basic federation test and implementation

+580 -197
+2 -2
api/atproto/servergetAccountsConfig.go
··· 13 13 14 14 type ServerGetAccountsConfig_Links struct { 15 15 LexiconTypeID string `json:"$type,omitempty"` 16 - PrivacyPolicy *string `json:"privacyPolicy" cborgen:"privacyPolicy"` 17 - TermsOfService *string `json:"termsOfService" cborgen:"termsOfService"` 16 + PrivacyPolicy *string `json:"privacyPolicy,omitempty" cborgen:"privacyPolicy"` 17 + TermsOfService *string `json:"termsOfService,omitempty" cborgen:"termsOfService"` 18 18 } 19 19 20 20 type ServerGetAccountsConfig_Output struct {
+7 -7
api/bsky/cbor_gen.go
··· 67 67 return err 68 68 } 69 69 70 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.LexiconTypeID))); err != nil { 70 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.feed.post"))); err != nil { 71 71 return err 72 72 } 73 73 if _, err := io.WriteString(w, string("app.bsky.feed.post")); err != nil { ··· 330 330 return err 331 331 } 332 332 333 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.LexiconTypeID))); err != nil { 333 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.feed.repost"))); err != nil { 334 334 return err 335 335 } 336 336 if _, err := io.WriteString(w, string("app.bsky.feed.repost")); err != nil { ··· 491 491 return err 492 492 } 493 493 494 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.LexiconTypeID))); err != nil { 494 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.feed.trend"))); err != nil { 495 495 return err 496 496 } 497 497 if _, err := io.WriteString(w, string("app.bsky.feed.trend")); err != nil { ··· 652 652 return err 653 653 } 654 654 655 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.LexiconTypeID))); err != nil { 655 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.feed.vote"))); err != nil { 656 656 return err 657 657 } 658 658 if _, err := io.WriteString(w, string("app.bsky.feed.vote")); err != nil { ··· 2280 2280 return err 2281 2281 } 2282 2282 2283 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.LexiconTypeID))); err != nil { 2283 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.graph.follow"))); err != nil { 2284 2284 return err 2285 2285 } 2286 2286 if _, err := io.WriteString(w, string("app.bsky.graph.follow")); err != nil { ··· 2604 2604 return err 2605 2605 } 2606 2606 2607 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.LexiconTypeID))); err != nil { 2607 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.actor.profile"))); err != nil { 2608 2608 return err 2609 2609 } 2610 2610 if _, err := io.WriteString(w, string("app.bsky.actor.profile")); err != nil { ··· 2851 2851 return err 2852 2852 } 2853 2853 2854 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.LexiconTypeID))); err != nil { 2854 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.system.declaration"))); err != nil { 2855 2855 return err 2856 2856 } 2857 2857 if _, err := io.WriteString(w, string("app.bsky.system.declaration")); err != nil {
+3 -3
api/bsky/feedfeedViewPost.go
··· 14 14 15 15 type FeedFeedViewPost struct { 16 16 LexiconTypeID string `json:"$type,omitempty"` 17 - Post *FeedPost_View `json:"post" cborgen:"post"` 18 - Reason *FeedFeedViewPost_Reason `json:"reason" cborgen:"reason"` 19 - Reply *FeedFeedViewPost_ReplyRef `json:"reply" cborgen:"reply"` 17 + Post *FeedPost_View `json:"post,omitempty" cborgen:"post"` 18 + Reason *FeedFeedViewPost_Reason `json:"reason,omitempty" cborgen:"reason"` 19 + Reply *FeedFeedViewPost_ReplyRef `json:"reply,omitempty" cborgen:"reply"` 20 20 } 21 21 22 22 type FeedFeedViewPost_Reason struct {
+1 -1
api/bsky/feedgetTimeline.go
··· 13 13 14 14 type FeedGetTimeline_Output struct { 15 15 LexiconTypeID string `json:"$type,omitempty"` 16 - Cursor *string `json:"cursor" cborgen:"cursor"` 16 + Cursor *string `json:"cursor,omitempty" cborgen:"cursor"` 17 17 Feed []*FeedFeedViewPost `json:"feed" cborgen:"feed"` 18 18 } 19 19
+3 -3
api/bsky/feedpost.go
··· 116 116 117 117 type FeedPost_View struct { 118 118 LexiconTypeID string `json:"$type,omitempty"` 119 - Author *ActorRef_WithInfo `json:"author" cborgen:"author"` 119 + Author *ActorRef_WithInfo `json:"author,omitempty" cborgen:"author"` 120 120 Cid string `json:"cid" cborgen:"cid"` 121 121 DownvoteCount int64 `json:"downvoteCount" cborgen:"downvoteCount"` 122 - Embed *FeedPost_View_Embed `json:"embed" cborgen:"embed"` 122 + Embed *FeedPost_View_Embed `json:"embed,omitempty" cborgen:"embed"` 123 123 IndexedAt string `json:"indexedAt" cborgen:"indexedAt"` 124 124 Record any `json:"record" cborgen:"record"` 125 125 ReplyCount int64 `json:"replyCount" cborgen:"replyCount"` 126 126 RepostCount int64 `json:"repostCount" cborgen:"repostCount"` 127 127 UpvoteCount int64 `json:"upvoteCount" cborgen:"upvoteCount"` 128 128 Uri string `json:"uri" cborgen:"uri"` 129 - Viewer *FeedPost_ViewerState `json:"viewer" cborgen:"viewer"` 129 + Viewer *FeedPost_ViewerState `json:"viewer,omitempty" cborgen:"viewer"` 130 130 } 131 131 132 132 type FeedPost_View_Embed struct {
+37 -3
carstore/bs.go
··· 260 260 return &lastShard, nil 261 261 } 262 262 263 - func (cs *CarStore) NewDeltaSession(ctx context.Context, user uint, prev cid.Cid) (*DeltaSession, error) { 263 + func (cs *CarStore) NewDeltaSession(ctx context.Context, user uint, prev *cid.Cid) (*DeltaSession, error) { 264 264 ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") 265 265 defer span.End() 266 266 ··· 271 271 return nil, err 272 272 } 273 273 274 - if lastShard.Root != "" && lastShard.Root != prev.String() { 275 - return nil, fmt.Errorf("attempted a delta session on top of the wrong previous head") 274 + if prev != nil { 275 + if lastShard.Root != "" && lastShard.Root != prev.String() { 276 + return nil, fmt.Errorf("attempted a delta session on top of the wrong previous head") 277 + } 276 278 } 277 279 278 280 return &DeltaSession{ ··· 572 574 573 575 return int64(nw), nil 574 576 } 577 + 578 + func (cs *CarStore) ImportSlice(ctx context.Context, uid uint, carslice []byte) (cid.Cid, *DeltaSession, error) { 579 + carr, err := car.NewCarReader(bytes.NewReader(carslice)) 580 + if err != nil { 581 + return cid.Undef, nil, err 582 + } 583 + 584 + if len(carr.Header.Roots) != 1 { 585 + return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) 586 + } 587 + 588 + ds, err := cs.NewDeltaSession(ctx, uid, nil) 589 + if err != nil { 590 + return cid.Undef, nil, err 591 + } 592 + 593 + for { 594 + blk, err := carr.Next() 595 + if err != nil { 596 + if err == io.EOF { 597 + break 598 + } 599 + return cid.Undef, nil, err 600 + } 601 + 602 + if err := ds.Put(ctx, blk); err != nil { 603 + return cid.Undef, nil, err 604 + } 605 + } 606 + 607 + return carr.Header.Roots[0], ds, nil 608 + }
+5 -5
carstore/repo_test.go
··· 78 78 } 79 79 defer cleanup() 80 80 81 - ds, err := cs.NewDeltaSession(ctx, 1, cid.Undef) 81 + ds, err := cs.NewDeltaSession(ctx, 1, &cid.Undef) 82 82 if err != nil { 83 83 t.Fatal(err) 84 84 } ··· 94 94 95 95 head := ncid 96 96 for i := 0; i < 10; i++ { 97 - ds, err := cs.NewDeltaSession(ctx, 1, head) 97 + ds, err := cs.NewDeltaSession(ctx, 1, &head) 98 98 if err != nil { 99 99 t.Fatal(err) 100 100 } ··· 105 105 } 106 106 107 107 if _, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &api.PostRecord{ 108 - Text: fmt.Sprintf("hey look its a tweet %s", time.Now()), 108 + Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), 109 109 }); err != nil { 110 110 t.Fatal(err) 111 111 } ··· 157 157 } 158 158 defer cleanup() 159 159 160 - ds, err := cs.NewDeltaSession(ctx, 1, cid.Undef) 160 + ds, err := cs.NewDeltaSession(ctx, 1, &cid.Undef) 161 161 if err != nil { 162 162 b.Fatal(err) 163 163 } ··· 174 174 head := ncid 175 175 b.ResetTimer() 176 176 for i := 0; i < b.N; i++ { 177 - ds, err := cs.NewDeltaSession(ctx, 1, head) 177 + ds, err := cs.NewDeltaSession(ctx, 1, &head) 178 178 if err != nil { 179 179 b.Fatal(err) 180 180 }
+5 -1
cmd/gosky/util/util.go
··· 103 103 104 104 func loadAuthFromEnv(cctx *cli.Context, req bool) (*xrpc.AuthInfo, error) { 105 105 if a := cctx.String("auth"); a != "" { 106 - return ReadAuth(a) 106 + if ai, err := ReadAuth(a); err != nil && req { 107 + return nil, err 108 + } else { 109 + return ai, nil 110 + } 107 111 } 108 112 109 113 val := os.Getenv("BSKY_AUTH")
+7 -3
go.mod
··· 20 20 github.com/lestrrat-go/jwx v1.2.25 21 21 github.com/lestrrat-go/jwx/v2 v2.0.0 22 22 github.com/mitchellh/go-homedir v1.1.0 23 + github.com/multiformats/go-multibase v0.0.3 23 24 github.com/multiformats/go-multihash v0.1.0 25 + github.com/multiformats/go-varint v0.0.6 24 26 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f 27 + github.com/stretchr/testify v1.8.1 25 28 github.com/urfave/cli/v2 v2.23.0 26 - github.com/whyrusleeping/cbor-gen v0.0.0-20230106195754-b5f4b36f47e2 29 + github.com/whyrusleeping/cbor-gen v0.0.0-20230109192608-0173f1e641ac 27 30 github.com/whyrusleeping/go-did v0.0.0-20221105001742-8d9e0ffb0d59 28 31 go.opentelemetry.io/otel v1.11.2 29 32 go.opentelemetry.io/otel/exporters/jaeger v1.11.2 ··· 38 41 require ( 39 42 github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect 40 43 github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect 44 + github.com/davecgh/go-spew v1.1.1 // indirect 41 45 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect 42 46 github.com/go-logr/logr v1.2.3 // indirect 43 47 github.com/go-logr/stdr v1.2.2 // indirect ··· 85 89 github.com/mr-tron/base58 v1.2.0 // indirect 86 90 github.com/multiformats/go-base32 v0.0.3 // indirect 87 91 github.com/multiformats/go-base36 v0.1.0 // indirect 88 - github.com/multiformats/go-multibase v0.0.3 // indirect 89 92 github.com/multiformats/go-multicodec v0.5.0 // indirect 90 - github.com/multiformats/go-varint v0.0.6 // indirect 91 93 github.com/opentracing/opentracing-go v1.2.0 // indirect 92 94 github.com/pkg/errors v0.9.1 // indirect 95 + github.com/pmezard/go-difflib v1.0.0 // indirect 93 96 github.com/russross/blackfriday/v2 v2.1.0 // indirect 94 97 github.com/spaolacci/murmur3 v1.1.0 // indirect 95 98 github.com/valyala/bytebufferpool v1.0.0 // indirect ··· 107 110 golang.org/x/text v0.3.7 // indirect 108 111 golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect 109 112 google.golang.org/protobuf v1.28.0 // indirect 113 + gopkg.in/yaml.v3 v3.0.1 // indirect 110 114 lukechampine.com/blake3 v1.1.6 // indirect 111 115 )
+4 -14
go.sum
··· 1004 1004 github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= 1005 1005 github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= 1006 1006 github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= 1007 + github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= 1007 1008 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= 1008 1009 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= 1009 1010 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= ··· 1013 1014 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= 1014 1015 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= 1015 1016 github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= 1017 + github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= 1016 1018 github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= 1017 1019 github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= 1018 1020 github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= ··· 1037 1039 github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= 1038 1040 github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 h1:5HZfQkwe0mIfyDmc1Em5GqlNRzcdtlv4HTNmdpt7XH0= 1039 1041 github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= 1040 - github.com/whyrusleeping/cbor-gen v0.0.0-20221220214510-0333c149dec0 h1:obKzQ1ey5AJg5NKjgtTo/CKwLImVP4ETLRcsmzFJ4Qw= 1041 - github.com/whyrusleeping/cbor-gen v0.0.0-20221220214510-0333c149dec0/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= 1042 - github.com/whyrusleeping/cbor-gen v0.0.0-20230106062414-06f5543407a5 h1:v/nk/e0r4vbIWUSW5u/RMzqYwom46U+SxZcnqlLyg4A= 1043 - github.com/whyrusleeping/cbor-gen v0.0.0-20230106062414-06f5543407a5/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= 1044 - github.com/whyrusleeping/cbor-gen v0.0.0-20230106062800-b5a3c160686a h1:5cW0qECEaqKdavMdmimnHs2wr+8iwtrSF/6qiaZ8sOA= 1045 - github.com/whyrusleeping/cbor-gen v0.0.0-20230106062800-b5a3c160686a/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= 1046 - github.com/whyrusleeping/cbor-gen v0.0.0-20230106064015-d6987df820df h1:OUQsz4KM/CgbNSf/WmwjQWnsmv4cB85iqJwDrxKSsJw= 1047 - github.com/whyrusleeping/cbor-gen v0.0.0-20230106064015-d6987df820df/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= 1048 - github.com/whyrusleeping/cbor-gen v0.0.0-20230106064349-8b41f422f08e h1:IeBUOl0m5mEyrURtUdCIjwYElCGl2ahy9vyPhAaKkAs= 1049 - github.com/whyrusleeping/cbor-gen v0.0.0-20230106064349-8b41f422f08e/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= 1050 - github.com/whyrusleeping/cbor-gen v0.0.0-20230106194948-6cd57c3ed4e5 h1:bHJlG820Fq8C79sBEOaYrnHFTYBgAOirQwid/AGHprs= 1051 - github.com/whyrusleeping/cbor-gen v0.0.0-20230106194948-6cd57c3ed4e5/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= 1052 - github.com/whyrusleeping/cbor-gen v0.0.0-20230106195754-b5f4b36f47e2 h1:8/xNM8pCl613kL80lDjTTA+PpwPDHtBJG/5DUtroAuE= 1053 - github.com/whyrusleeping/cbor-gen v0.0.0-20230106195754-b5f4b36f47e2/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= 1042 + github.com/whyrusleeping/cbor-gen v0.0.0-20230109192608-0173f1e641ac h1:vSeRURgERu0v7h+bKvlP0wuT+inofyu61R15qka/Xh0= 1043 + github.com/whyrusleeping/cbor-gen v0.0.0-20230109192608-0173f1e641ac/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= 1054 1044 github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= 1055 1045 github.com/whyrusleeping/go-did v0.0.0-20221105001742-8d9e0ffb0d59 h1:dRYr/sfpZjX8evmbFrOG7ldkzdk5TLMGRVM40k1AZPQ= 1056 1046 github.com/whyrusleeping/go-did v0.0.0-20221105001742-8d9e0ffb0d59/go.mod h1:mX/AQ/SS9KrCwO8V+IWyIozytxw5gw75cMHymoJvMGo=
+9 -7
lex/gen.go
··· 204 204 205 205 // TODO: this method is necessary because in lexicon there is no way to know if 206 206 // a type needs to be marshaled with a "$type" field up front, you can only 207 - // know for sure by seeing where the type is used. 207 + // know for sure by seeing where the type is used. 208 208 func FixRecordReferences(schemas []*Schema, defmap map[string]*ExtDef, prefix string) { 209 209 for _, s := range schemas { 210 - if !strings.HasPrefix(s.ID, prefix) { 211 - continue 212 - } 210 + if !strings.HasPrefix(s.ID, prefix) { 211 + continue 212 + } 213 213 214 214 tps := s.AllTypes(prefix, defmap) 215 215 for _, t := range tps { ··· 334 334 func writeMethods(typename string, ts *TypeSchema, w io.Writer) error { 335 335 switch ts.Type { 336 336 case "token": 337 - n := ts.id 337 + n := ts.id 338 338 if ts.defName != "main" { 339 339 n += "#" + ts.defName 340 340 } ··· 999 999 if ts.record { 1000 1000 fmt.Fprintf(w, "\tLexiconTypeID string `json:\"$type\" cborgen:\"$type,const=%s\"`\n", ts.id) 1001 1001 } else { 1002 - fmt.Fprintf(w, "\tLexiconTypeID string `json:\"$type,omitempty\"`\n" ) 1002 + fmt.Fprintf(w, "\tLexiconTypeID string `json:\"$type,omitempty\"`\n") 1003 1003 } 1004 1004 1005 1005 required := make(map[string]bool) ··· 1016 1016 } 1017 1017 1018 1018 var ptr string 1019 + var omit string 1019 1020 if !required[k] { 1020 1021 if !strings.HasPrefix(tname, "*") && !strings.HasPrefix(tname, "[]") { 1021 1022 ptr = "*" 1023 + omit = ",omitempty" 1022 1024 } 1023 1025 } 1024 1026 1025 - fmt.Fprintf(w, "\t%s %s%s `json:\"%s\" cborgen:\"%s\"`\n", goname, ptr, tname, k, k) 1027 + fmt.Fprintf(w, "\t%s %s%s `json:\"%s%s\" cborgen:\"%s\"`\n", goname, ptr, tname, k, omit, k) 1026 1028 return nil 1027 1029 }); err != nil { 1028 1030 return err
+2
lex/util/util.go
··· 3 3 import ( 4 4 "bytes" 5 5 "encoding/json" 6 + "fmt" 6 7 "io" 7 8 ) 8 9 ··· 31 32 func CborTypeExtract(b []byte) (string, error) { 32 33 var tcheck CborChecker 33 34 if err := tcheck.UnmarshalCBOR(bytes.NewReader(b)); err != nil { 35 + fmt.Printf("bad bytes: %x\n", b) 34 36 return "", err 35 37 } 36 38
+1
repo/repo.go
··· 279 279 280 280 rec, err := util.CborDecodeValue(blk.RawData()) 281 281 if err != nil { 282 + fmt.Println("decoding blk: ", cc) 282 283 return cid.Undef, nil, err 283 284 } 284 285
+56 -3
repomgr/repomgr.go
··· 58 58 Record any 59 59 ActorInfo *ActorInfo 60 60 RepoSlice []byte 61 + PDS uint 61 62 } 62 63 63 64 type EventKind string ··· 147 148 return "", cid.Undef, err 148 149 } 149 150 150 - ds, err := rm.cs.NewDeltaSession(ctx, user, head) 151 + ds, err := rm.cs.NewDeltaSession(ctx, user, &head) 151 152 if err != nil { 152 153 return "", cid.Undef, err 153 154 } ··· 206 207 return cid.Undef, err 207 208 } 208 209 209 - ds, err := rm.cs.NewDeltaSession(ctx, user, head) 210 + ds, err := rm.cs.NewDeltaSession(ctx, user, &head) 210 211 if err != nil { 211 212 return cid.Undef, err 212 213 } ··· 266 267 return fmt.Errorf("must specify unique non-zero id for new actor") 267 268 } 268 269 269 - ds, err := rm.cs.NewDeltaSession(ctx, user, cid.Undef) 270 + ds, err := rm.cs.NewDeltaSession(ctx, user, &cid.Undef) 270 271 if err != nil { 271 272 return err 272 273 } ··· 399 400 400 401 return ap, nil 401 402 } 403 + 404 + func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, kind EventKind, uid uint, collection string, rkey string, carslice []byte) error { 405 + root, ds, err := rm.cs.ImportSlice(ctx, uid, carslice) 406 + if err != nil { 407 + return fmt.Errorf("importing external carslice: %w", err) 408 + } 409 + 410 + r, err := repo.OpenRepo(ctx, ds, root) 411 + if err != nil { 412 + return fmt.Errorf("opening external user repo: %w", err) 413 + } 414 + 415 + switch kind { 416 + case EvtKindCreateRecord: 417 + fmt.Println("path: ", collection, rkey) 418 + recid, rec, err := r.GetRecord(ctx, collection+"/"+rkey) 419 + if err != nil { 420 + return fmt.Errorf("reading changed record from car slice: %w", err) 421 + } 422 + 423 + rslice, err := ds.CloseWithRoot(ctx, root) 424 + if err != nil { 425 + return fmt.Errorf("close with root: %w", err) 426 + } 427 + 428 + // TODO: what happens if this update fails? 429 + if err := rm.updateUserRepoHead(ctx, uid, root); err != nil { 430 + return fmt.Errorf("updating user head: %w", err) 431 + } 432 + 433 + if rm.events != nil { 434 + fmt.Println("sending off external create record event") 435 + rm.events(ctx, &RepoEvent{ 436 + Kind: EvtKindCreateRecord, 437 + User: uid, 438 + //OldRoot: head, 439 + NewRoot: root, 440 + Collection: collection, 441 + Rkey: rkey, 442 + Record: rec, 443 + RecCid: recid, 444 + RepoSlice: rslice, 445 + PDS: pdsid, 446 + }) 447 + } 448 + return nil 449 + default: 450 + return fmt.Errorf("unrecognized external user event kind: %q", kind) 451 + } 452 + 453 + return nil 454 + }
+4 -1
server/auth.go
··· 22 22 tok.Set("exp", exp.Unix()) 23 23 24 24 return tok 25 - 26 25 } 27 26 28 27 func (s *Server) createAuthTokenForUser(ctx context.Context, handle, did string) (*xrpc.AuthInfo, error) { ··· 53 52 54 53 func (s *Server) createCrossServerAuthToken(ctx context.Context, otherpds string) (*xrpc.AuthInfo, error) { 55 54 accessTok := makeToken(otherpds, "com.atproto.federation", time.Now().Add(24*time.Hour)) 55 + 56 + // setting this is a little weird, 57 + // since the token isnt signed by this key, we dont have a way to validate... 58 + accessTok.Set("pds", s.signingKey.DID()) 56 59 57 60 rval := make([]byte, 10) 58 61 rand.Read(rval)
+60 -7
server/events.go
··· 1 1 package schemagen 2 2 3 - import "fmt" 3 + import ( 4 + "fmt" 5 + "log" 6 + 7 + "github.com/gorilla/websocket" 8 + "github.com/labstack/echo/v4" 9 + ) 4 10 5 11 type EventManager struct { 6 12 subs []*Subscriber ··· 45 51 } 46 52 case opSend: 47 53 for _, s := range em.subs { 48 - select { 49 - case s.outgoing <- op.evt: 50 - default: 51 - fmt.Println("event overflow") 54 + if s.filter(op.evt) { 55 + fmt.Println("outgoing event: ", op.evt) 56 + select { 57 + case s.outgoing <- op.evt: 58 + default: 59 + fmt.Println("event overflow") 60 + } 52 61 } 53 62 } 54 63 default: ··· 70 79 ) 71 80 72 81 type Event struct { 73 - Kind string 74 - User string 82 + Kind string 83 + 84 + // User is the DID of the user this event is about 85 + User string 86 + 75 87 Collection string 76 88 Rkey string 77 89 DID string 78 90 CarSlice []byte 91 + 92 + // some private fields for processing metadata 93 + uid uint 94 + pdsid uint 79 95 } 80 96 81 97 func (em *EventManager) AddEvent(ev *Event) error { ··· 88 104 case <-em.closed: 89 105 return fmt.Errorf("event manager shut down") 90 106 } 107 + } 108 + 109 + func (s *Server) EventsHandler(c echo.Context) error { 110 + did := c.Request().Header.Get("DID") 111 + conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10) 112 + if err != nil { 113 + return err 114 + } 115 + ctx := c.Request().Context() 116 + 117 + var peering Peering 118 + if err := s.db.First(&peering, "did = ?", did).Error; err != nil { 119 + return err 120 + } 121 + 122 + evts, cancel, err := s.events.Subscribe(func(evt *Event) bool { 123 + has, err := s.peerHasFollow(ctx, peering.ID, evt.uid) 124 + if err != nil { 125 + log.Println("error checking peer follow relationship: ", err) 126 + return false 127 + } 128 + 129 + fmt.Println("follow: ", has) 130 + return has 131 + }) 132 + if err != nil { 133 + return err 134 + } 135 + defer cancel() 136 + 137 + for evt := range evts { 138 + if err := conn.WriteJSON(evt); err != nil { 139 + return err 140 + } 141 + } 142 + 143 + return nil 91 144 } 92 145 93 146 func (em *EventManager) Subscribe(filter func(*Event) bool) (<-chan *Event, func(), error) {
+36 -6
server/fakedid.go
··· 12 12 13 13 type FakeDidMapping struct { 14 14 gorm.Model 15 - Handle string 16 - Did string `gorm:"index"` 15 + Handle string 16 + Did string `gorm:"index"` 17 + Service string 17 18 } 18 19 19 20 type FakeDid struct { ··· 25 26 return &FakeDid{db} 26 27 } 27 28 28 - func (fd *FakeDid) GetDocument(ctx context.Context, did string) (*did.Document, error) { 29 - panic("nyi") 29 + func (fd *FakeDid) GetDocument(ctx context.Context, udid string) (*did.Document, error) { 30 + var rec FakeDidMapping 31 + if err := fd.db.First(&rec, "did = ?", udid).Error; err != nil { 32 + return nil, err 33 + } 34 + 35 + d, err := did.ParseDID(rec.Did) 36 + if err != nil { 37 + panic(err) 38 + } 39 + 40 + return &did.Document{ 41 + Context: []string{}, 42 + 43 + ID: d, 44 + 45 + AlsoKnownAs: []string{"https://" + rec.Handle}, 46 + 47 + //Authentication []interface{} `json:"authentication"` 48 + 49 + //VerificationMethod []VerificationMethod `json:"verificationMethod"` 50 + 51 + Service: []did.Service{ 52 + did.Service{ 53 + //ID: "", 54 + Type: "pds", 55 + ServiceEndpoint: "http://" + rec.Service, 56 + }, 57 + }, 58 + }, nil 30 59 } 31 60 32 61 func (fd *FakeDid) CreateDID(ctx context.Context, sigkey *key.Key, recovery string, handle string, service string) (string, error) { ··· 35 64 d := "did:plc:" + hex.EncodeToString(buf) 36 65 37 66 if err := fd.db.Create(&FakeDidMapping{ 38 - Handle: handle, 39 - Did: d, 67 + Handle: handle, 68 + Did: d, 69 + Service: service, 40 70 }).Error; err != nil { 41 71 return "", err 42 72 }
+82 -11
server/federation_test.go
··· 14 14 "time" 15 15 16 16 "github.com/lestrrat-go/jwx/v2/jwk" 17 + "github.com/stretchr/testify/assert" 17 18 "github.com/whyrusleeping/gosky/api" 18 19 atproto "github.com/whyrusleeping/gosky/api/atproto" 19 20 bsky "github.com/whyrusleeping/gosky/api/bsky" ··· 70 71 } 71 72 } 72 73 73 - func setupPDS(t *testing.T, host, suffix string) *testPDS { 74 + func setupPDS(t *testing.T, host, suffix string, plc PLCClient) *testPDS { 74 75 dir, err := ioutil.TempDir("", "fedtest") 75 76 if err != nil { 76 77 t.Fatal(err) ··· 99 100 kfile := filepath.Join(dir, "server.key") 100 101 makeKey(t, kfile) 101 102 102 - plc := &api.PLCServer{ 103 - Host: "http://localhost:2582", 104 - } 105 - 106 103 srv, err := NewServer(maindb, cs, kfile, suffix, host, plc, []byte(host+suffix)) 107 104 if err != nil { 108 105 t.Fatal(err) ··· 138 135 } 139 136 140 137 func (tp *testPDS) PeerWith(t *testing.T, op *testPDS) { 141 - panic("no") 138 + if err := tp.server.HackAddPeering(op.host, op.server.signingKey.DID()); err != nil { 139 + t.Fatal(err) 140 + } 141 + 142 + if err := op.server.HackAddPeering(tp.host, tp.server.signingKey.DID()); err != nil { 143 + t.Fatal(err) 144 + } 142 145 } 143 146 144 147 func (tp *testPDS) NewUser(t *testing.T, handle string) *testUser { ··· 192 195 return resp.Uri 193 196 } 194 197 198 + func (u *testUser) Follow(t *testing.T, did string) string { 199 + t.Helper() 200 + 201 + ctx := context.TODO() 202 + resp, err := atproto.RepoCreateRecord(ctx, u.client, &atproto.RepoCreateRecord_Input{ 203 + Collection: "app.bsky.graph.follow", 204 + Did: u.did, 205 + Record: &bsky.GraphFollow{ 206 + CreatedAt: time.Now().Format(time.RFC3339), 207 + Subject: &bsky.ActorRef{ 208 + DeclarationCid: "bafyreid27zk7lbis4zw5fz4podbvbs4fc5ivwji3dmrwa6zggnj4bnd57u", 209 + Did: did, 210 + }, 211 + }, 212 + }) 213 + 214 + if err != nil { 215 + t.Fatal(err) 216 + } 217 + 218 + return resp.Uri 219 + } 220 + 221 + func (u *testUser) GetFeed(t *testing.T) []*bsky.FeedFeedViewPost { 222 + t.Helper() 223 + 224 + ctx := context.TODO() 225 + resp, err := bsky.FeedGetTimeline(ctx, u.client, "reverse-chronlogical", "", 100) 226 + if err != nil { 227 + t.Fatal(err) 228 + } 229 + 230 + return resp.Feed 231 + } 232 + 233 + func testPLC(t *testing.T) *FakeDid { 234 + // TODO: just do in memory... 235 + tdir, err := ioutil.TempDir("", "plcserv") 236 + if err != nil { 237 + t.Fatal(err) 238 + } 239 + 240 + db, err := gorm.Open(sqlite.Open(filepath.Join(tdir, "plc.db"))) 241 + if err != nil { 242 + t.Fatal(err) 243 + } 244 + return NewFakeDid(db) 245 + 246 + } 247 + 195 248 func TestBasicFederation(t *testing.T) { 196 - p1 := setupPDS(t, "localhost:8812", ".pdsone") 197 - p2 := setupPDS(t, "localhost:8813", ".pdstwo") 249 + assert := assert.New(t) 250 + plc := testPLC(t) 251 + p1 := setupPDS(t, "0.0.0.0:8812", ".pdsone", plc) 252 + p2 := setupPDS(t, "0.0.0.0:8813", ".pdstwo", plc) 198 253 199 254 defer p1.Cleanup() 200 255 defer p2.Cleanup() ··· 205 260 bob := p1.NewUser(t, "bob.pdsone") 206 261 laura := p2.NewUser(t, "laura.pdstwo") 207 262 208 - //p1.PeerWith(p2) 209 - bob.Post(t, "hello world") 210 - laura.Post(t, "hello bob") 263 + p1.PeerWith(t, p2) 264 + bob.Follow(t, laura.did) 265 + 266 + bp1 := bob.Post(t, "hello world") 267 + lp1 := laura.Post(t, "hello bob") 268 + time.Sleep(time.Millisecond * 50) 269 + 270 + f := bob.GetFeed(t) 271 + assert.Equal(f[0].Post.Uri, bp1) 272 + assert.Equal(f[1].Post.Uri, lp1) 273 + 274 + lp2 := laura.Post(t, "im posting again!") 275 + time.Sleep(time.Millisecond * 50) 276 + 277 + f = bob.GetFeed(t) 278 + assert.Equal(f[0].Post.Uri, bp1) 279 + assert.Equal(f[1].Post.Uri, lp1) 280 + assert.Equal(f[2].Post.Uri, lp2) 211 281 282 + select {} 212 283 }
+19 -18
server/fedmgr.go
··· 6 6 "fmt" 7 7 "log" 8 8 "math/rand" 9 + "net/http" 9 10 "time" 10 11 11 12 "github.com/gorilla/websocket" 12 13 ) 13 - 14 - type FederationManager struct { 15 - indexCallback IndexCallback 16 - } 17 14 18 15 type IndexCallback func(context.Context, string, *Event) error 19 16 20 - func NewFederationManager(cb IndexCallback) *FederationManager { 21 - return &FederationManager{ 22 - indexCallback: cb, 17 + func (s *Server) SubscribeToPds(ctx context.Context, host string) error { 18 + var peering Peering 19 + if err := s.db.First(&peering, "host = ?", host).Error; err != nil { 20 + return err 23 21 } 24 - } 25 22 26 - func (fm *FederationManager) SubscribeToPds(ctx context.Context, host string) error { 27 - go fm.subscribeWithRedialer(host) 23 + go s.subscribeWithRedialer(&peering) 28 24 29 25 return nil 30 26 } 31 27 32 - func (fm *FederationManager) subscribeWithRedialer(host string) { 28 + func (s *Server) subscribeWithRedialer(host *Peering) { 33 29 d := websocket.Dialer{} 34 30 35 31 var backoff int 36 32 for { 37 - con, res, err := d.Dial(host+"/events", nil) 33 + h := http.Header{ 34 + "DID": []string{s.signingKey.DID()}, 35 + } 36 + 37 + con, res, err := d.Dial("ws://"+host.Host+"/events", h) 38 38 if err != nil { 39 - fmt.Printf("dialing %q failed: %s", host, err) 39 + fmt.Printf("dialing %q failed: %s", host.Host, err) 40 40 time.Sleep(sleepForBackoff(backoff)) 41 41 backoff++ 42 + continue 42 43 } 43 44 44 45 fmt.Println("event subscription response code: ", res.StatusCode) 45 46 46 - if err := fm.handleConnection(host, con); err != nil { 47 - log.Printf("connection to %q failed: %s", host, err) 47 + if err := s.handleConnection(host, con); err != nil { 48 + log.Printf("connection to %q failed: %s", host.Host, err) 48 49 } 49 50 } 50 51 } ··· 61 62 return time.Second * 30 62 63 } 63 64 64 - func (fm *FederationManager) handleConnection(host string, con *websocket.Conn) error { 65 + func (s *Server) handleConnection(host *Peering, con *websocket.Conn) error { 65 66 for { 66 67 mt, data, err := con.ReadMessage() 67 68 if err != nil { ··· 75 76 return fmt.Errorf("failed to unmarshal event: %w", err) 76 77 } 77 78 78 - if err := fm.indexCallback(context.TODO(), host, &ev); err != nil { 79 - log.Printf("failed to index event from %q: %s", host, err) 79 + if err := s.handleFedEvent(context.TODO(), host, &ev); err != nil { 80 + log.Printf("failed to index event from %q: %s", host.Host, err) 80 81 } 81 82 } 82 83 }
+4 -25
server/feedgen.go
··· 119 119 ActorType: ai.Type, 120 120 }, 121 121 Handle: ai.Handle, 122 - DisplayName: &ai.Name, 122 + DisplayName: &ai.DisplayName, 123 123 } 124 124 } 125 125 ··· 148 148 149 149 out.Post.Author = author 150 150 151 - fmt.Println("need to finish reasons") 152 - /* 153 - if item.TrendedBy != 0 { 154 - tb, err := fg.getActorRefInfo(ctx, item.TrendedBy) 155 - if err != nil { 156 - return nil, err 157 - } 158 - 159 - out.TrendedBy = tb 160 - } 161 - 162 - if item.RepostedBy != 0 { 163 - rp, err := fg.getActorRefInfo(ctx, item.RepostedBy) 164 - if err != nil { 165 - return nil, err 166 - } 167 - 168 - out.RepostedBy = rp 169 - } 170 - */ 171 - 172 151 reccid, err := cid.Decode(item.Cid) 173 152 if err != nil { 174 153 return nil, err ··· 221 200 222 201 // TODO: this query is just a temporary hack... 223 202 var feed []*FeedPost 224 - if err := fg.db.Find(&feed, "author = (?)", 203 + if err := fg.db.Debug().Find(&feed, "author in (?)", 225 204 fg.db.Model(FollowRecord{}).Where("follower = ?", user.ID).Select("target"), 226 205 ).Error; err != nil { 227 206 return nil, err 228 207 } 229 208 230 209 var rps []*RepostRecord 231 - if err := fg.db.Find(&rps, "reposter = (?)", 210 + if err := fg.db.Debug().Find(&rps, "reposter in (?)", 232 211 fg.db.Model(FollowRecord{}).Where("follower = ?", user.ID).Select("target"), 233 212 ).Error; err != nil { 234 213 return nil, err ··· 263 242 return nil, err 264 243 } 265 244 266 - vs, err := fg.getPostViewerState(ctx, item.ID, viewer.ID, viewer.DID) 245 + vs, err := fg.getPostViewerState(ctx, item.ID, viewer.ID, viewer.Did) 267 246 if err != nil { 268 247 return nil, fmt.Errorf("getting viewer state: %w", err) 269 248 }
+43 -15
server/handlers.go
··· 5 5 "context" 6 6 "fmt" 7 7 "io" 8 + "strings" 8 9 "time" 9 10 10 11 "github.com/ipfs/go-cid" ··· 110 111 111 112 return &appbskytypes.ActorUpdateProfile_Output{ 112 113 Cid: ncid.String(), 113 - Uri: "at://" + u.DID + "/app.bsky.actor.profile/self", 114 + Uri: "at://" + u.Did + "/app.bsky.actor.profile/self", 114 115 Record: profile, 115 116 }, nil 116 117 } ··· 161 162 convertToOutputType = func(thr *ThreadPost) (*appbskytypes.FeedGetPostThread_ThreadViewPost, error) { 162 163 p := thr.Post 163 164 164 - vs, err := s.feedgen.getPostViewerState(ctx, thr.PostID, u.ID, u.DID) 165 + vs, err := s.feedgen.getPostViewerState(ctx, thr.PostID, u.ID, u.Did) 165 166 if err != nil { 166 167 return nil, err 167 168 } ··· 277 278 return nil, err 278 279 } 279 280 280 - uri := "at://" + u.DID + "/" + rpath 281 + uri := "at://" + u.Did + "/" + rpath 281 282 if input.Direction == "up" { 282 283 return &appbskytypes.FeedSetVote_Output{ 283 284 Upvote: &uri, ··· 424 425 return nil, fmt.Errorf("create did: %w", err) 425 426 } 426 427 427 - u.DID = d 428 + u.Did = d 428 429 if err := s.db.Save(&u).Error; err != nil { 429 430 return nil, err 430 431 } 431 432 432 - if err := s.repoman.InitNewActor(ctx, u.ID, u.Handle, u.DID, "", UserActorDeclCid, UserActorDeclType); err != nil { 433 + if err := s.repoman.InitNewActor(ctx, u.ID, u.Handle, u.Did, "", UserActorDeclCid, UserActorDeclType); err != nil { 433 434 return nil, err 434 435 } 435 436 ··· 473 474 return nil, err 474 475 } 475 476 476 - return &comatprototypes.HandleResolve_Output{Did: u.DID}, nil 477 + return &comatprototypes.HandleResolve_Output{Did: u.Did}, nil 477 478 } 478 479 479 480 func (s *Server) handleComAtprotoRepoBatchWrite(ctx context.Context, input *comatprototypes.RepoBatchWrite_Input) error { ··· 511 512 } 512 513 513 514 return &comatprototypes.RepoCreateRecord_Output{ 514 - Uri: "at://" + u.DID + "/" + rpath, 515 + Uri: "at://" + u.Did + "/" + rpath, 515 516 Cid: recid.String(), 516 517 }, nil 517 518 } ··· 530 531 return nil, err 531 532 } 532 533 533 - fmt.Println("USER: ", user, targetUser.Handle, targetUser.DID) 534 + fmt.Println("USER: ", user, targetUser.Handle, targetUser.Did) 534 535 535 536 var maybeCid cid.Cid 536 537 if c != "" { ··· 549 550 ccstr := reccid.String() 550 551 return &comatprototypes.RepoGetRecord_Output{ 551 552 Cid: &ccstr, 552 - Uri: "at://" + targetUser.DID + "/" + collection + "/" + rkey, 553 + Uri: "at://" + targetUser.Did + "/" + collection + "/" + rkey, 553 554 Value: rec, 554 555 }, nil 555 556 } ··· 569 570 AvailableUserDomains: []string{ 570 571 s.handleSuffix, 571 572 }, 573 + Links: &comatprototypes.ServerGetAccountsConfig_Links{}, 572 574 }, nil 573 575 } 574 576 ··· 582 584 return nil, fmt.Errorf("invalid username or password") 583 585 } 584 586 585 - tok, err := s.createAuthTokenForUser(ctx, input.Handle, u.DID) 587 + tok, err := s.createAuthTokenForUser(ctx, input.Handle, u.Did) 586 588 if err != nil { 587 589 return nil, err 588 590 } 589 591 590 592 return &comatprototypes.SessionCreate_Output{ 591 593 Handle: input.Handle, 592 - Did: u.DID, 594 + Did: u.Did, 593 595 AccessJwt: tok.AccessJwt, 594 596 RefreshJwt: tok.RefreshJwt, 595 597 }, nil ··· 607 609 608 610 return &comatprototypes.SessionGet_Output{ 609 611 Handle: u.Handle, 610 - Did: u.DID, 612 + Did: u.Did, 611 613 }, nil 612 614 } 613 615 ··· 635 637 return nil, err 636 638 } 637 639 638 - outTok, err := s.createAuthTokenForUser(ctx, u.Handle, u.DID) 640 + outTok, err := s.createAuthTokenForUser(ctx, u.Handle, u.Did) 639 641 if err != nil { 640 642 return nil, err 641 643 } 642 644 643 645 return &comatprototypes.SessionRefresh_Output{ 644 646 Handle: u.Handle, 645 - Did: u.DID, 647 + Did: u.Did, 646 648 AccessJwt: outTok.AccessJwt, 647 649 RefreshJwt: outTok.RefreshJwt, 648 650 }, nil ··· 722 724 } 723 725 724 726 func (s *Server) handleComAtprotoPeeringFollow(ctx context.Context, body *comatprototypes.PeeringFollow_Input) error { 725 - panic("not yet implemented") 727 + // TODO: cross server auth checks 728 + auth, ok := ctx.Value("auth").(string) 729 + if !ok { 730 + return fmt.Errorf("no auth present in peering.follow request header") 731 + } 732 + 733 + auth = strings.TrimPrefix(auth, "Bearer ") 734 + tok, err := jwt.ParseString(auth) 735 + if err != nil { 736 + return err 737 + } 738 + 739 + v, ok := tok.Get("pds") 740 + if !ok { 741 + panic("im a bad programmer") 742 + } 743 + 744 + opdsdid := v.(string) 745 + // 746 + 747 + for _, u := range body.Users { 748 + if err := s.AddRemoteFollow(ctx, opdsdid, u); err != nil { 749 + return fmt.Errorf("handle add remote follow: %w", err) 750 + } 751 + } 752 + 753 + return nil 726 754 }
+95 -22
server/indexer.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "errors" 5 6 "fmt" 6 7 "log" 8 + "net/url" 7 9 "time" 8 10 9 11 bsky "github.com/whyrusleeping/gosky/api/bsky" 10 12 "github.com/whyrusleeping/gosky/repomgr" 13 + "github.com/whyrusleeping/gosky/xrpc" 11 14 "go.opentelemetry.io/otel" 12 15 "gorm.io/gorm" 13 16 ) ··· 17 20 18 21 notifman *NotificationManager 19 22 events *EventManager 20 - fedmgr *FederationManager 23 + didr PLCClient 21 24 22 25 sendRemoteFollow func(context.Context, string, uint) error 23 26 } 24 27 25 - func NewIndexer(db *gorm.DB, notifman *NotificationManager, evtman *EventManager) (*Indexer, error) { 28 + func NewIndexer(db *gorm.DB, notifman *NotificationManager, evtman *EventManager, didr PLCClient) (*Indexer, error) { 26 29 db.AutoMigrate(&FeedPost{}) 27 30 db.AutoMigrate(&ActorInfo{}) 28 31 db.AutoMigrate(&FollowRecord{}) ··· 34 37 db: db, 35 38 notifman: notifman, 36 39 events: evtman, 40 + didr: didr, 37 41 }, nil 38 42 } 39 43 ··· 65 69 Handle string 66 70 DisplayName string 67 71 Did string 68 - Name string 69 - Following int64 70 - Followers int64 71 - Posts int64 72 - DeclRefCid string 73 - Type string 74 - PDS uint 72 + //Name string 73 + Following int64 74 + Followers int64 75 + Posts int64 76 + DeclRefCid string 77 + Type string 78 + PDS uint 75 79 } 76 80 77 81 type VoteDir int ··· 112 116 113 117 type ExternalFollow struct { 114 118 gorm.Model 115 - PDS uint 116 - User uint 119 + PDS uint 120 + Uid uint 117 121 } 118 122 119 123 func (ix *Indexer) catchup(ctx context.Context, evt *repomgr.RepoEvent) error { ··· 143 147 default: 144 148 log.Println("unrecognized repo event type: ", evt.Kind) 145 149 } 146 - } 147 - 148 - func (ix *Indexer) handleFedEvent(ctx context.Context, host string, evt *Event) error { 149 - panic("TODO") 150 150 } 151 151 152 152 func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEvent, local bool) error { 153 + fmt.Println("record create event", evt.Collection) 153 154 switch rec := evt.Record.(type) { 154 155 case *bsky.FeedPost: 156 + fmt.Println("feed post") 155 157 var replyid uint 156 158 if rec.Reply != nil { 157 159 replyto, err := ix.GetPost(ctx, rec.Reply.Parent.Uri) ··· 255 257 case *bsky.GraphFollow: 256 258 subj, err := ix.lookupUserByDid(ctx, rec.Subject.Did) 257 259 if err != nil { 258 - return err 260 + if !errors.Is(err, gorm.ErrRecordNotFound) { 261 + return err 262 + } 263 + 264 + doc, err := ix.didr.GetDocument(ctx, rec.Subject.Did) 265 + if err != nil { 266 + return fmt.Errorf("could not locate DID document for followed user: %s", err) 267 + } 268 + 269 + if len(doc.Service) == 0 { 270 + return fmt.Errorf("external followed user %s had no services in did document", rec.Subject.Did) 271 + } 272 + 273 + fmt.Println("AKA: ", doc.AlsoKnownAs) 274 + 275 + svc := doc.Service[0] 276 + durl, err := url.Parse(svc.ServiceEndpoint) 277 + if err != nil { 278 + return err 279 + } 280 + 281 + // TODO: the PDS's DID should also be in the service, we could use that to look up? 282 + var peering Peering 283 + if err := ix.db.First(&peering, "host = ?", durl.Host).Error; err != nil { 284 + return err 285 + } 286 + 287 + var handle string 288 + if len(doc.AlsoKnownAs) > 0 { 289 + hurl, err := url.Parse(doc.AlsoKnownAs[0]) 290 + if err != nil { 291 + return err 292 + } 293 + 294 + handle = hurl.Host 295 + } 296 + 297 + c := &xrpc.Client{Host: svc.ServiceEndpoint} 298 + profile, err := bsky.ActorGetProfile(ctx, c, rec.Subject.Did) 299 + if err != nil { 300 + return err 301 + } 302 + 303 + if handle != profile.Handle { 304 + return fmt.Errorf("mismatch in handle between did document and pds profile (%s != %s)", handle, profile.Handle) 305 + } 306 + 307 + // TODO: request this users info from their server to fill out our data... 308 + u := User{ 309 + Handle: handle, 310 + Did: rec.Subject.Did, 311 + PDS: peering.ID, 312 + } 313 + 314 + if err := ix.db.Create(&u).Error; err != nil { 315 + return fmt.Errorf("failed to create other pds user: %w", err) 316 + } 317 + 318 + // okay cool, its a user on a server we are peered with 319 + // lets make a local record of that user for the future 320 + subj = &ActorInfo{ 321 + Uid: u.ID, 322 + Handle: handle, 323 + DisplayName: *profile.DisplayName, 324 + Did: rec.Subject.Did, 325 + DeclRefCid: rec.Subject.DeclarationCid, // TODO: should verify this? 326 + Type: "", 327 + PDS: peering.ID, 328 + } 329 + if err := ix.db.Create(subj).Error; err != nil { 330 + return err 331 + } 259 332 } 260 333 261 334 // 'follower' followed 'target' ··· 349 422 func (ix *Indexer) handleInitActor(ctx context.Context, evt *repomgr.RepoEvent) error { 350 423 ai := evt.ActorInfo 351 424 if err := ix.db.Create(&ActorInfo{ 352 - Uid: evt.User, 353 - Handle: ai.Handle, 354 - Did: ai.Did, 355 - Name: ai.DisplayName, 356 - DeclRefCid: ai.DeclRefCid, 357 - Type: ai.Type, 425 + Uid: evt.User, 426 + Handle: ai.Handle, 427 + Did: ai.Did, 428 + DisplayName: ai.DisplayName, 429 + DeclRefCid: ai.DeclRefCid, 430 + Type: ai.Type, 358 431 }).Error; err != nil { 359 432 return err 360 433 }
+89 -38
server/server.go
··· 37 37 notifman *NotificationManager 38 38 indexer *Indexer 39 39 events *EventManager 40 - fedmgr *FederationManager 41 40 signingKey *key.Key 42 41 echo *echo.Echo 43 42 jwtSigningKey []byte ··· 56 55 CreateDID(ctx context.Context, sigkey *key.Key, recovery string, handle string, service string) (string, error) 57 56 } 58 57 59 - func NewServer(db *gorm.DB, cs *carstore.CarStore, kfile string, handleSuffix, serviceUrl string, plc PLCClient, jwtkey []byte) (*Server, error) { 58 + func NewServer(db *gorm.DB, cs *carstore.CarStore, kfile string, handleSuffix, serviceUrl string, didr PLCClient, jwtkey []byte) (*Server, error) { 60 59 db.AutoMigrate(&User{}) 60 + db.AutoMigrate(&Peering{}) 61 61 62 62 serkey, err := loadKey(kfile) 63 63 if err != nil { ··· 69 69 repoman := repomgr.NewRepoManager(db, cs) 70 70 notifman := NewNotificationManager(db, repoman) 71 71 72 - ix, err := NewIndexer(db, notifman, evtman) 72 + ix, err := NewIndexer(db, notifman, evtman, didr) 73 73 if err != nil { 74 74 return nil, err 75 75 } ··· 80 80 cs: cs, 81 81 notifman: notifman, 82 82 indexer: ix, 83 - plc: NewFakeDid(db), 83 + plc: didr, 84 84 events: evtman, 85 85 repoman: repoman, 86 86 handleSuffix: handleSuffix, ··· 88 88 jwtSigningKey: jwtkey, 89 89 } 90 90 91 - s.fedmgr = NewFederationManager(s.handleFedEvent) 92 - 93 91 repoman.SetEventHandler(func(ctx context.Context, evt *repomgr.RepoEvent) { 94 92 ix.HandleRepoEvent(ctx, evt) 93 + 95 94 fe, err := s.repoEventToFedEvent(context.TODO(), evt) 96 95 if err != nil { 97 96 log.Println("event conversion error: ", err) ··· 118 117 return s, nil 119 118 } 120 119 121 - func (s *Server) handleFedEvent(ctx context.Context, host string, evt *Event) error { 120 + func (s *Server) handleFedEvent(ctx context.Context, host *Peering, evt *Event) error { 121 + fmt.Printf("[%s] got fed event from %q: %s\n", s.serviceUrl, host.Host, evt.Kind) 122 122 switch evt.Kind { 123 123 case EvtKindCreateRecord: 124 - case EvtKindUpdateRecord: 124 + u, err := s.lookupUserByDid(ctx, evt.User) 125 + if err != nil { 126 + return err 127 + } 125 128 129 + return s.repoman.HandleExternalUserEvent(ctx, host.ID, repomgr.EvtKindCreateRecord, u.ID, evt.Collection, evt.Rkey, evt.CarSlice) 130 + case EvtKindUpdateRecord: 131 + default: 132 + return fmt.Errorf("unrecognized fed event kind: %q", evt.Kind) 126 133 } 127 - panic("nyi") 134 + return nil 128 135 } 129 136 130 137 func (s *Server) repoEventToFedEvent(ctx context.Context, evt *repomgr.RepoEvent) (*Event, error) { ··· 134 141 135 142 switch evt.Kind { 136 143 case repomgr.EvtKindCreateRecord: 137 - out.Kind = EvtKindUpdateRecord 144 + out.Kind = EvtKindCreateRecord 138 145 case repomgr.EvtKindUpdateRecord: 139 146 out.Kind = EvtKindUpdateRecord 140 147 case repomgr.EvtKindInitActor: ··· 148 155 return nil, err 149 156 } 150 157 158 + out.uid = evt.User 151 159 out.User = did 152 160 out.Collection = evt.Collection 153 161 out.Rkey = evt.Rkey ··· 211 219 return true 212 220 case "/xrpc/com.atproto.server.getAccountsConfig": 213 221 return true 222 + case "/xrpc/app.bsky.actor.getProfile": 223 + fmt.Println("TODO: currently not requiring auth on get profile endpoint") 224 + return true 225 + case "/xrpc/com.atproto.peering.follow", "/events": 226 + auth := c.Request().Header.Get("Authorization") 227 + 228 + did := c.Request().Header.Get("DID") 229 + ctx := c.Request().Context() 230 + ctx = context.WithValue(ctx, "did", did) 231 + ctx = context.WithValue(ctx, "auth", auth) 232 + c.SetRequest(c.Request().WithContext(ctx)) 233 + return true 214 234 default: 215 235 return false 216 236 } 217 237 }, 218 - //KeyFunc: s.getKey, 219 238 SigningKey: s.jwtSigningKey, 220 239 } 221 240 ··· 226 245 e.Use(middleware.JWTWithConfig(cfg), s.userCheckMiddleware) 227 246 s.RegisterHandlersComAtproto(e) 228 247 s.RegisterHandlersAppBsky(e) 248 + e.GET("/events", s.EventsHandler) 229 249 230 250 return e.Start(listen) 231 251 } ··· 236 256 Password string 237 257 RecoveryKey string 238 258 Email string 239 - DID string `gorm:"uniqueIndex"` 259 + Did string `gorm:"uniqueIndex"` 260 + PDS uint 240 261 } 241 262 242 263 type RefreshToken struct { ··· 319 340 } 320 341 321 342 func (s *Server) lookupUserByDid(ctx context.Context, did string) (*User, error) { 322 - var didEntry FakeDidMapping 323 - if err := s.db.First(&didEntry, "did = ?", did).Error; err != nil { 324 - return nil, err 325 - } 326 - 327 343 var u User 328 - if err := s.db.First(&u, "handle = ?", didEntry.Handle).Error; err != nil { 344 + if err := s.db.First(&u, "did = ?", did).Error; err != nil { 329 345 return nil, err 330 346 } 331 347 ··· 335 351 var ErrNoSuchUser = fmt.Errorf("no such user") 336 352 337 353 func (s *Server) lookupUserByHandle(ctx context.Context, handle string) (*User, error) { 338 - var didEntry FakeDidMapping 339 - if err := s.db.Find(&didEntry, "handle = ?", handle).Error; err != nil { 340 - return nil, err 341 - } 342 - if didEntry.ID == 0 { 343 - return nil, ErrNoSuchUser 344 - } 345 - 346 354 var u User 347 - if err := s.db.Find(&u, "handle = ?", didEntry.Handle).Error; err != nil { 355 + if err := s.db.Find(&u, "handle = ?", handle).Error; err != nil { 348 356 return nil, err 349 357 } 358 + fmt.Println("USER: ", handle) 350 359 if u.ID == 0 { 351 360 return nil, ErrNoSuchUser 352 361 } 353 362 354 - u.DID = didEntry.Did 355 - 356 363 return &u, nil 357 364 } 358 365 ··· 364 371 if !ok { 365 372 return next(c) 366 373 } 374 + ctx = context.WithValue(ctx, "token", user) 367 375 368 376 scope, did, err := s.checkTokenValidity(user) 369 377 if err != nil { ··· 378 386 ctx = context.WithValue(ctx, "authScope", scope) 379 387 ctx = context.WithValue(ctx, "user", u) 380 388 ctx = context.WithValue(ctx, "did", did) 381 - ctx = context.WithValue(ctx, "token", user) 382 389 383 390 c.SetRequest(c.Request().WithContext(ctx)) 384 391 return next(c) ··· 394 401 } 395 402 } 396 403 397 - func (s *Server) getKey(token *jwt.Token) (interface{}, error) { 398 - fmt.Println("token: ", token) 399 - 400 - return nil, nil 401 - } 402 - 403 404 func (s *Server) getUser(ctx context.Context) (*User, error) { 404 405 u, ok := ctx.Value("user").(*User) 405 406 if !ok { 406 407 return nil, fmt.Errorf("auth required") 407 408 } 408 409 409 - u.DID = ctx.Value("did").(string) 410 + //u.Did = ctx.Value("did").(string) 410 411 411 412 return u, nil 412 413 } ··· 464 465 type Peering struct { 465 466 gorm.Model 466 467 Host string 468 + Did string 467 469 Approved bool 468 470 } 469 471 472 + func (s *Server) HackAddPeering(host string, did string) error { 473 + // TODO: this method is just for proof of concept since i'm punting on 474 + // figuring out how the peering arrangements get set up. 475 + 476 + if err := s.db.Create(&Peering{ 477 + Host: host, 478 + Did: did, 479 + Approved: true, 480 + }).Error; err != nil { 481 + return err 482 + } 483 + 484 + if err := s.SubscribeToPds(context.TODO(), host); err != nil { 485 + return err 486 + } 487 + 488 + return nil 489 + } 490 + 470 491 func (s *Server) sendRemoteFollow(ctx context.Context, followed string, followedPDS uint) error { 471 492 var peering Peering 472 493 if err := s.db.First(&peering, "id = ?", followedPDS).Error; err != nil { ··· 479 500 } 480 501 481 502 c := &xrpc.Client{ 482 - Host: peering.Host, 503 + Host: "http://" + peering.Host, // TODO: maybe its correct to just put the protocol prefix in the database 483 504 Auth: auth, 484 505 } 485 506 ··· 491 512 492 513 return nil 493 514 } 515 + 516 + func (s *Server) AddRemoteFollow(ctx context.Context, opdsdid string, u string) error { 517 + var peering Peering 518 + if err := s.db.First(&peering, "did = ?", opdsdid).Error; err != nil { 519 + return err 520 + } 521 + 522 + uu, err := s.lookupUser(ctx, u) 523 + if err != nil { 524 + return err 525 + } 526 + 527 + if err := s.db.Create(&ExternalFollow{ 528 + PDS: peering.ID, 529 + Uid: uu.ID, 530 + }).Error; err != nil { 531 + return err 532 + } 533 + 534 + return nil 535 + } 536 + 537 + func (s *Server) peerHasFollow(ctx context.Context, peer uint, user uint) (bool, error) { 538 + var extfollow ExternalFollow 539 + if err := s.db.Debug().Find(&extfollow, "pds = ? AND uid = ?", peer, user).Error; err != nil { 540 + return false, err 541 + } 542 + 543 + return extfollow.ID != 0, nil 544 + }
+6 -2
testscripts/pdstest.sh
··· 11 11 ./gosky --pds="http://localhost:4989" --auth="test.auth" post "paul frazee needs to buy a sweater" 12 12 13 13 echo "3. View That Content" 14 - ./gosky --pds="http://localhost:4989" --auth="test.auth" feed --author=self 14 + ./gosky --pds="http://localhost:4989" --auth="test.auth" feed --raw --author=self 15 15 16 16 17 17 echo "4. Make a second account" ··· 21 21 ./gosky --pds="http://localhost:4989" --auth="test2.auth" post "Im a big fan of the snow" 22 22 23 23 echo "6. Upvote content" 24 - posturi=$(./gosky --pds=http://localhost:4989 --auth=test.auth feed --author=self | jq -r .post.uri | head -n1) 24 + posturi=$(./gosky --pds=http://localhost:4989 --auth=test.auth feed --raw --author=self | jq -r .post.uri | head -n1) 25 25 ./gosky --pds="http://localhost:4989" --auth="test2.auth" vote $posturi up 26 26 27 27 echo "7. Check notifications" ··· 32 32 33 33 echo "9. Check notifications" 34 34 ./gosky --pds="http://localhost:4989" --auth="test.auth" notifs 35 + 36 + 37 + 38 + echo "Success!"