this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

initial working end to end flow for bgs events

+144 -49
+9
carstore/bs.go
··· 32 32 } 33 33 34 34 func NewCarStore(meta *gorm.DB, root string) (*CarStore, error) { 35 + if _, err := os.Stat(root); err != nil { 36 + if !os.IsNotExist(err) { 37 + return nil, err 38 + } 39 + 40 + if err := os.Mkdir(root, 0775); err != nil { 41 + return nil, err 42 + } 43 + } 35 44 if err := meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil { 36 45 return nil, err 37 46 }
+3 -4
cmd/bgs/fedmgr.go
··· 5 5 "encoding/json" 6 6 "errors" 7 7 "fmt" 8 - "log" 9 8 "math/rand" 10 9 "net" 11 10 "sync" ··· 95 94 96 95 if err := s.handleConnection(host, con); err != nil { 97 96 if errors.Is(err, ErrTimeoutShutdown) { 98 - log.Printf("shutting down pds subscription to %s, no activity after %s", host.Host, EventsTimeout) 97 + log.Infof("shutting down pds subscription to %s, no activity after %s", host.Host, EventsTimeout) 99 98 return 100 99 } 101 - log.Printf("connection to %q failed: %s", host.Host, err) 100 + log.Warnf("connection to %q failed: %s", host.Host, err) 102 101 } 103 102 } 104 103 } ··· 143 142 144 143 fmt.Println("got event: ", host.Host, ev.Kind) 145 144 if err := s.cb(context.TODO(), host, &ev); err != nil { 146 - log.Printf("failed to index event from %q: %s", host.Host, err) 145 + log.Errorf("failed to index event from %q: %s", host.Host, err) 147 146 } 148 147 } 149 148 }
+41 -13
cmd/bgs/main.go
··· 4 4 "context" 5 5 "errors" 6 6 "fmt" 7 - "log" 8 7 "net/url" 8 + "strings" 9 9 10 10 "github.com/gorilla/websocket" 11 + logging "github.com/ipfs/go-log" 11 12 "github.com/labstack/echo/v4" 13 + "github.com/labstack/echo/v4/middleware" 12 14 "github.com/urfave/cli/v2" 13 15 "github.com/whyrusleeping/gosky/api" 14 16 bsky "github.com/whyrusleeping/gosky/api/bsky" ··· 30 32 "gorm.io/gorm" 31 33 "gorm.io/plugin/opentelemetry/tracing" 32 34 ) 35 + 36 + var log = logging.Logger("bgs") 37 + 38 + func init() { 39 + logging.SetAllLoggers(logging.LevelDebug) 40 + } 33 41 34 42 func main() { 35 43 app := cli.NewApp() ··· 114 122 115 123 evtman := events.NewEventManager() 116 124 125 + go evtman.Run() 126 + 117 127 // not necessary to generate notifications, should probably make the 118 128 // indexer just take optional callbacks for notification stuff 119 129 notifman := notifs.NewNotificationManager(db, repoman.GetRecord) ··· 125 135 return err 126 136 } 127 137 138 + repoman.SetEventHandler(func(ctx context.Context, evt *repomgr.RepoEvent) { 139 + if err := ix.HandleRepoEvent(ctx, evt); err != nil { 140 + log.Errorw("failed to handle repo event", "err", err) 141 + } 142 + }) 143 + 128 144 bgs := &BGS{ 129 145 index: ix, 130 146 db: db, ··· 153 169 154 170 func (bgs *BGS) Start(listen string) error { 155 171 e := echo.New() 172 + 173 + e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ 174 + Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n", 175 + })) 176 + 177 + e.HTTPErrorHandler = func(err error, ctx echo.Context) { 178 + fmt.Printf("HANDLER ERROR: (%s) %s\n", ctx.Path(), err) 179 + ctx.Response().WriteHeader(500) 180 + } 156 181 157 182 // TODO: this API is temporary until we formalize what we want here 158 183 e.POST("/add-target", bgs.handleAddTarget) ··· 190 215 } 191 216 192 217 func (bgs *BGS) EventsHandler(c echo.Context) error { 193 - did := c.Request().Header.Get("DID") 218 + // TODO: authhhh 194 219 conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10) 195 220 if err != nil { 196 - return err 197 - } 198 - 199 - var peering PDS 200 - if err := bgs.db.First(&peering, "did = ?", did).Error; err != nil { 201 221 return err 202 222 } 203 223 ··· 218 238 219 239 func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error) { 220 240 var u User 221 - if err := bgs.db.First(&u, "did = ?", did).Error; err != nil { 241 + if err := bgs.db.Find(&u, "did = ?", did).Error; err != nil { 222 242 return nil, err 223 243 } 224 244 245 + if u.ID == 0 { 246 + return nil, gorm.ErrRecordNotFound 247 + } 248 + 225 249 return &u, nil 226 250 } 227 251 228 252 func (bgs *BGS) handleFedEvent(ctx context.Context, host *PDS, evt *events.Event) error { 229 - log.Printf("got fed event from %q: %s\n", host.Host, evt.Kind) 253 + log.Infof("got fed event from %q: %s\n", host.Host, evt.Kind) 230 254 switch evt.Kind { 231 255 case events.EvtKindRepoChange: 232 - u, err := bgs.lookupUserByDid(ctx, evt.User) 256 + u, err := bgs.lookupUserByDid(ctx, evt.Repo) 233 257 if err != nil { 234 258 if !errors.Is(err, gorm.ErrRecordNotFound) { 235 259 return fmt.Errorf("looking up event user: %w", err) 236 260 } 237 261 238 - subj, err := bgs.createExternalUser(ctx, evt.User) 262 + subj, err := bgs.createExternalUser(ctx, evt.Repo) 239 263 if err != nil { 240 264 return err 241 265 } ··· 244 268 u.ID = subj.Uid 245 269 } 246 270 247 - return bgs.repoman.HandleExternalUserEvent(ctx, host.ID, repomgr.EvtKindCreateRecord, u.ID, evt.RepoOps, evt.CarSlice) 271 + return bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, evt.RepoOps, evt.CarSlice) 248 272 default: 249 273 return fmt.Errorf("unrecognized fed event kind: %q", evt.Kind) 250 274 } ··· 267 291 return nil, err 268 292 } 269 293 294 + if strings.HasPrefix(durl.Host, "localhost:") { 295 + durl.Scheme = "http" 296 + } 297 + 270 298 // TODO: the PDS's DID should also be in the service, we could use that to look up? 271 299 var peering PDS 272 300 if err := s.db.First(&peering, "host = ?", durl.Host).Error; err != nil { ··· 283 311 handle = hurl.Host 284 312 } 285 313 286 - c := &xrpc.Client{Host: svc.ServiceEndpoint} 314 + c := &xrpc.Client{Host: durl.String()} 287 315 profile, err := bsky.ActorGetProfile(ctx, c, did) 288 316 if err != nil { 289 317 return nil, err
+13 -1
cmd/pds/main.go
··· 10 10 11 11 "github.com/lestrrat-go/jwx/v2/jwk" 12 12 "github.com/urfave/cli/v2" 13 + "github.com/whyrusleeping/gosky/api" 13 14 "github.com/whyrusleeping/gosky/carstore" 14 15 "github.com/whyrusleeping/gosky/plc" 15 16 server "github.com/whyrusleeping/gosky/server" ··· 44 45 Name: "pdshost", 45 46 Usage: "hostname of the pds", 46 47 Value: "localhost:4989", 48 + }, 49 + &cli.StringFlag{ 50 + Name: "plc", 51 + Usage: "hostname of the plc", 47 52 }, 48 53 } 49 54 ··· 118 123 return err 119 124 } 120 125 126 + var didr plc.PLCClient 127 + if plchost := cctx.String("plc"); plchost != "" { 128 + didr = &api.PLCServer{Host: plchost} 129 + } else { 130 + didr = plc.NewFakeDid(db) 131 + } 132 + 121 133 pdshost := cctx.String("pdshost") 122 - srv, err := server.NewServer(db, cs, "server.key", ".pdstest", pdshost, plc.NewFakeDid(db), []byte("jwtsecretplaceholder")) 134 + srv, err := server.NewServer(db, cs, "server.key", ".pdstest", pdshost, didr, []byte("jwtsecretplaceholder")) 123 135 if err != nil { 124 136 return err 125 137 }
+5 -5
events/events.go
··· 73 73 74 74 type Event struct { 75 75 76 - // User is the DID of the user this event is about 77 - User string 76 + // Repo is the DID of the repo this event is about 77 + Repo string 78 78 79 79 Kind string 80 80 81 - RepoOps []*RepoOp 82 - RepoReset bool 83 - CarSlice []byte 81 + RepoOps []*RepoOp 82 + RepoRebase bool 83 + CarSlice []byte 84 84 85 85 // some private fields for internal routing perf 86 86 PrivUid uint `json:"-"`
+3 -3
go.mod
··· 14 14 github.com/ipfs/go-ipfs-blockstore v1.2.1-0.20220823165003-9904c18f1d0a 15 15 github.com/ipfs/go-ipld-cbor v0.0.6 16 16 github.com/ipfs/go-ipld-format v0.3.0 17 + github.com/ipfs/go-log v1.0.5 17 18 github.com/ipld/go-car v0.5.0 18 19 github.com/ipld/go-car/v2 v2.5.1 19 20 github.com/labstack/echo/v4 v4.7.2 ··· 55 56 github.com/ipfs/go-ipfs-exchange-interface v0.1.0 // indirect 56 57 github.com/ipfs/go-ipfs-util v0.0.2 // indirect 57 58 github.com/ipfs/go-ipld-legacy v0.1.0 // indirect 58 - github.com/ipfs/go-log v1.0.5 // indirect 59 - github.com/ipfs/go-log/v2 v2.3.0 // indirect 59 + github.com/ipfs/go-log/v2 v2.5.1 // indirect 60 60 github.com/ipfs/go-merkledag v0.6.0 // indirect 61 61 github.com/ipfs/go-metrics-interface v0.0.1 // indirect 62 62 github.com/ipfs/go-verifcid v0.0.1 // indirect ··· 102 102 go.opentelemetry.io/otel/trace v1.11.2 // indirect 103 103 go.uber.org/atomic v1.7.0 // indirect 104 104 go.uber.org/multierr v1.8.0 // indirect 105 - go.uber.org/zap v1.16.0 // indirect 105 + go.uber.org/zap v1.19.1 // indirect 106 106 golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect 107 107 golang.org/x/exp v0.0.0-20210615023648-acb5c1269671 // indirect 108 108 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
+13 -7
go.sum
··· 10 10 github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= 11 11 github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= 12 12 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= 13 - github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= 14 13 github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= 15 14 github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= 16 15 github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= ··· 368 367 github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw= 369 368 github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= 370 369 github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= 371 - github.com/ipfs/go-log/v2 v2.3.0 h1:31Re/cPqFHpsRHgyVwjWADPoF0otB1WrjTy8ZFYwEZU= 372 370 github.com/ipfs/go-log/v2 v2.3.0/go.mod h1:QqGoj30OTpnKaG/LKTGTxoP2mmQtjVMEnK72gynbe/g= 371 + github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY= 372 + github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= 373 373 github.com/ipfs/go-merkledag v0.2.4/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk= 374 374 github.com/ipfs/go-merkledag v0.6.0 h1:oV5WT2321tS4YQVOPgIrWHvJ0lJobRTerU+i9nmUCuA= 375 375 github.com/ipfs/go-merkledag v0.6.0/go.mod h1:9HSEwRd5sV+lbykiYP+2NC/3o6MZbKNaa4hfNcH5iH0= ··· 1059 1059 github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= 1060 1060 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= 1061 1061 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= 1062 + github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= 1062 1063 github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= 1063 1064 go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= 1064 1065 go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= ··· 1088 1089 go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= 1089 1090 go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= 1090 1091 go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= 1092 + go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4= 1093 + go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= 1091 1094 go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= 1092 1095 go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= 1093 1096 go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= ··· 1100 1103 go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= 1101 1104 go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= 1102 1105 go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= 1103 - go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM= 1104 1106 go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= 1107 + go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI= 1108 + go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= 1105 1109 go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE= 1106 1110 golang.org/x/build v0.0.0-20190111050920-041ab4dc3f9d/go.mod h1:OWs+y06UdEOHN4y+MfF/py+xQ/tYqIWW03b70/CG9Rw= 1107 1111 golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= ··· 1147 1151 golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= 1148 1152 golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= 1149 1153 golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= 1150 - golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k= 1151 1154 golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= 1152 1155 golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= 1153 1156 golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= 1154 1157 golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= 1155 1158 golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= 1156 - golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= 1159 + golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= 1157 1160 golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= 1158 1161 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= 1159 1162 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= ··· 1187 1190 golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= 1188 1191 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= 1189 1192 golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= 1193 + golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= 1190 1194 golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= 1191 1195 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= 1192 1196 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= ··· 1253 1257 golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= 1254 1258 golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= 1255 1259 golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= 1260 + golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= 1256 1261 golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= 1257 1262 golang.org/x/sys v0.0.0-20210426080607-c94f62235c83/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= 1263 + golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 1258 1264 golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 1259 1265 golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 1260 1266 golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= ··· 1302 1308 golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= 1303 1309 golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= 1304 1310 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= 1305 - golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= 1311 + golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= 1306 1312 golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= 1307 1313 golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= 1308 1314 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= ··· 1380 1386 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= 1381 1387 gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= 1382 1388 gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= 1389 + gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= 1383 1390 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= 1384 1391 gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= 1385 1392 gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= ··· 1403 1410 honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= 1404 1411 honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= 1405 1412 honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= 1406 - honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o= 1407 1413 lukechampine.com/blake3 v1.1.6 h1:H3cROdztr7RCfoaTpGZFQsrqvweFLrqS73j7L7cmR5c= 1408 1414 lukechampine.com/blake3 v1.1.6/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA= 1409 1415 rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
+15 -8
indexer/indexer.go
··· 4 4 "context" 5 5 "errors" 6 6 "fmt" 7 - "log" 8 7 "strings" 9 8 9 + logging "github.com/ipfs/go-log" 10 10 bsky "github.com/whyrusleeping/gosky/api/bsky" 11 11 "github.com/whyrusleeping/gosky/events" 12 12 "github.com/whyrusleeping/gosky/notifs" ··· 16 16 "go.opentelemetry.io/otel" 17 17 "gorm.io/gorm" 18 18 ) 19 + 20 + var log = logging.Logger("indexer") 19 21 20 22 type Indexer struct { 21 23 db *gorm.DB ··· 40 42 notifman: notifman, 41 43 events: evtman, 42 44 didr: didr, 45 + SendRemoteFollow: func(context.Context, string, uint) error { 46 + return nil 47 + }, 43 48 }, nil 44 49 } 45 50 ··· 56 61 return fmt.Errorf("failed to catch up on user repo changes, processing events off base: %w", err) 57 62 } 58 63 59 - fmt.Println("Handling Repo Event!") 64 + log.Infof("Handling Repo Event!") 60 65 var relpds []uint 61 66 var repoOps []*events.RepoOp 62 67 for _, op := range evt.Ops { 63 68 switch op.Kind { 64 69 case repomgr.EvtKindCreateRecord: 70 + log.Infof("create record: ", evt.User, op.Collection, op.Rkey) 65 71 rop, err := ix.handleRecordCreate(ctx, evt, &op, true) 66 72 if err != nil { 67 73 return fmt.Errorf("handle recordCreate: %w", err) ··· 71 77 case repomgr.EvtKindInitActor: 72 78 rop, err := ix.handleInitActor(ctx, evt, &op) 73 79 if err != nil { 74 - log.Println("handle initActor: ", err) 80 + log.Errorf("handle initActor: %s", err) 75 81 } 76 82 77 83 repoOps = append(repoOps, rop) ··· 85 91 return err 86 92 } 87 93 88 - fmt.Println("Sending event: ", relpds, len(repoOps)) 94 + log.Infof("Sending event: ", relpds, len(repoOps)) 89 95 if err := ix.events.AddEvent(&events.Event{ 90 96 Kind: events.EvtKindRepoChange, 91 97 CarSlice: evt.RepoSlice, 92 98 PrivUid: evt.User, 93 99 RepoOps: repoOps, 94 - User: did, 100 + Repo: did, 95 101 PrivRelevantPds: relpds, 96 102 }); err != nil { 97 103 return fmt.Errorf("failed to push event: %s", err) ··· 101 107 } 102 108 103 109 func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) (*events.RepoOp, error) { 104 - fmt.Println("record create event", op.Collection) 110 + log.Infow("record create event", "collection", op.Collection) 105 111 out := &events.RepoOp{ 106 112 Kind: string(repomgr.EvtKindCreateRecord), 107 113 Collection: op.Collection, ··· 250 256 251 257 if local && subj.PDS != 0 { 252 258 if err := ix.SendRemoteFollow(ctx, subj.Did, subj.PDS); err != nil { 253 - log.Println("failed to issue remote follow directive: ", err) 259 + log.Error("failed to issue remote follow directive: ", err) 254 260 } 255 261 } 256 262 ··· 301 307 if post.Reply != nil { 302 308 replyto, err := ix.GetPost(ctx, post.Reply.Parent.Uri) 303 309 if err != nil { 304 - fmt.Println("probably shouldnt error when processing a reply to a not-found post") 310 + log.Error("probably shouldnt error when processing a reply to a not-found post") 305 311 return err 306 312 } 307 313 ··· 332 338 333 339 func (ix *Indexer) handleInitActor(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) (*events.RepoOp, error) { 334 340 ai := op.ActorInfo 341 + 335 342 if err := ix.db.Create(&types.ActorInfo{ 336 343 Uid: evt.User, 337 344 Handle: ai.Handle,
+34 -4
repomgr/repomgr.go
··· 13 13 "github.com/whyrusleeping/gosky/carstore" 14 14 "github.com/whyrusleeping/gosky/events" 15 15 "github.com/whyrusleeping/gosky/repo" 16 + "github.com/whyrusleeping/gosky/types" 16 17 "go.opentelemetry.io/otel" 17 18 "gorm.io/gorm" 18 19 "gorm.io/gorm/clause" ··· 481 482 return ap, nil 482 483 } 483 484 484 - func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, kind EventKind, uid uint, ops []*events.RepoOp, carslice []byte) error { 485 + func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, uid uint, ops []*events.RepoOp, carslice []byte) error { 485 486 root, ds, err := rm.cs.ImportSlice(ctx, uid, carslice) 486 487 if err != nil { 487 488 return fmt.Errorf("importing external carslice: %w", err) ··· 496 497 497 498 var evtops []RepoOp 498 499 for _, op := range ops { 499 - switch op.Kind { 500 - case string(EvtKindCreateRecord): 500 + switch EventKind(op.Kind) { 501 + case EvtKindCreateRecord: 501 502 recid, rec, err := r.GetRecord(ctx, op.Collection+"/"+op.Rkey) 502 503 if err != nil { 503 504 return fmt.Errorf("reading changed record from car slice: %w", err) ··· 510 511 Record: rec, 511 512 RecCid: recid, 512 513 }) 514 + case EvtKindInitActor: 515 + var ai types.ActorInfo 516 + if err := rm.db.First(&ai, "id = ?", uid).Error; err != nil { 517 + return fmt.Errorf("expected initialized user: %w", err) 518 + } 519 + 520 + evtops = append(evtops, RepoOp{ 521 + Kind: EvtKindInitActor, 522 + ActorInfo: &ActorInfo{ 523 + Did: ai.Did, 524 + Handle: ai.Handle, 525 + DisplayName: ai.DisplayName, 526 + DeclRefCid: ai.DeclRefCid, 527 + Type: ai.Type, 528 + }, 529 + }) 530 + case EvtKindUpdateRecord: 531 + recid, rec, err := r.GetRecord(ctx, op.Collection+"/"+op.Rkey) 532 + if err != nil { 533 + return fmt.Errorf("reading changed record from car slice: %w", err) 534 + } 535 + 536 + evtops = append(evtops, RepoOp{ 537 + Kind: EvtKindUpdateRecord, 538 + Collection: op.Collection, 539 + Rkey: op.Rkey, 540 + Record: rec, 541 + RecCid: recid, 542 + }) 513 543 default: 514 - return fmt.Errorf("unrecognized external user event kind: %q", kind) 544 + return fmt.Errorf("unrecognized external user event kind: %q", op.Kind) 515 545 } 516 546 } 517 547
+4
server/handlers.go
··· 422 422 return nil, err 423 423 } 424 424 425 + if recoveryKey == "" { 426 + recoveryKey = s.signingKey.DID() 427 + } 428 + 425 429 d, err := s.plc.CreateDID(ctx, s.signingKey, recoveryKey, input.Handle, s.serviceUrl) 426 430 if err != nil { 427 431 return nil, fmt.Errorf("create did: %w", err)
+4 -4
server/server.go
··· 134 134 fmt.Printf("[%s] got fed event from %q: %s\n", s.serviceUrl, host.Host, evt.Kind) 135 135 switch evt.Kind { 136 136 case events.EvtKindRepoChange: 137 - u, err := s.lookupUserByDid(ctx, evt.User) 137 + u, err := s.lookupUserByDid(ctx, evt.Repo) 138 138 if err != nil { 139 139 if !errors.Is(err, gorm.ErrRecordNotFound) { 140 140 return fmt.Errorf("looking up event user: %w", err) 141 141 } 142 142 143 - subj, err := s.createExternalUser(ctx, evt.User) 143 + subj, err := s.createExternalUser(ctx, evt.Repo) 144 144 if err != nil { 145 145 return err 146 146 } ··· 149 149 u.ID = subj.Uid 150 150 } 151 151 152 - return s.repoman.HandleExternalUserEvent(ctx, host.ID, repomgr.EvtKindCreateRecord, u.ID, evt.RepoOps, evt.CarSlice) 152 + return s.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, evt.RepoOps, evt.CarSlice) 153 153 default: 154 154 return fmt.Errorf("unrecognized fed event kind: %q", evt.Kind) 155 155 } ··· 237 237 Kind: events.EvtKindRepoChange, 238 238 CarSlice: evt.RepoSlice, 239 239 PrivUid: evt.User, 240 - User: did, 240 + Repo: did, 241 241 } 242 242 243 243 for _, op := range evt.Ops {