this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Misc debugging improvements (#63)

authored by

Whyrusleeping and committed by
GitHub
aef25f3a e890db8a

+192 -14
+10 -7
bgs/bgs.go
··· 152 152 ctx := c.Request().Context() 153 153 154 154 // TODO: authhhh 155 - conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10) 155 + conn, err := websocket.Upgrade(c.Response(), c.Request(), c.Response().Header(), 1<<10, 1<<10) 156 156 if err != nil { 157 157 return fmt.Errorf("upgrading websocket: %w", err) 158 158 } ··· 274 274 // TODO: if the user is already in the 'slow' path, we shouldnt even bother trying to fast path this event 275 275 276 276 if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, prevcid, evt.Blocks); err != nil { 277 - log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq) 278 277 if !errors.Is(err, carstore.ErrRepoBaseMismatch) { 278 + log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq) 279 279 return fmt.Errorf("handle user event failed: %w", err) 280 280 } 281 281 ··· 291 291 292 292 // sync blobs 293 293 if len(evt.Blobs) > 0 { 294 - if err := bgs.syncUserBlobs(ctx, host.ID, u.ID, evt.Blobs); err != nil { 294 + if err := bgs.syncUserBlobs(ctx, host, u.ID, evt.Blobs); err != nil { 295 295 return err 296 296 } 297 297 } ··· 302 302 } 303 303 } 304 304 305 - func (s *BGS) syncUserBlobs(ctx context.Context, pds uint, user bsutil.Uid, blobs []string) error { 305 + func (s *BGS) syncUserBlobs(ctx context.Context, pds *models.PDS, user bsutil.Uid, blobs []string) error { 306 306 log.Warnf("not handling blob syncing yet") 307 307 return nil 308 308 } ··· 341 341 c := &xrpc.Client{Host: durl.String()} 342 342 343 343 if peering.ID == 0 { 344 - pdsdid, err := atproto.HandleResolve(ctx, c, "") 344 + 345 + cfg, err := atproto.ServerGetAccountsConfig(ctx, c) 345 346 if err != nil { 346 347 // TODO: failing this shouldnt halt our indexing 347 - return nil, fmt.Errorf("failed to get accounts config for unrecognized pds: %w", err) 348 + return nil, fmt.Errorf("failed to check unrecognized pds: %w", err) 348 349 } 349 350 351 + // since handles can be anything, checking against this list doesnt matter... 352 + _ = cfg 353 + 350 354 // TODO: could check other things, a valid response is good enough for now 351 355 peering.Host = durl.Host 352 - peering.Did = pdsdid.Did 353 356 peering.SSL = (durl.Scheme == "https") 354 357 355 358 if err := s.db.Create(&peering).Error; err != nil {
+19
bgs/handlers.go
··· 10 10 ) 11 11 12 12 func (s *BGS) handleComAtprotoSyncGetCheckout(ctx context.Context, commit string, did string) (io.Reader, error) { 13 + /* 14 + u, err := s.Index.LookupUserByDid(ctx, did) 15 + if err != nil { 16 + return nil, err 17 + } 18 + 19 + c, err := cid.Decode(commit) 20 + if err != nil { 21 + return nil, err 22 + } 23 + 24 + // TODO: need to enable a 'write to' interface for codegenned things... 25 + buf := new(bytes.Buffer) 26 + if err := s.repoman.GetCheckout(ctx, u.Uid, c, buf); err != nil { 27 + return nil, err 28 + } 29 + 30 + return buf, nil 31 + */ 13 32 panic("nyi") 14 33 } 15 34
+14 -4
cmd/bigsky/main.go
··· 35 35 36 36 func init() { 37 37 //logging.SetAllLoggers(logging.LevelDebug) 38 - logging.SetAllLoggers(logging.LevelWarn) 38 + logging.SetAllLoggers(logging.LevelInfo) 39 39 } 40 40 41 41 func main() { ··· 97 97 Name: "aggregation", 98 98 Value: true, 99 99 }, 100 + &cli.StringFlag{ 101 + Name: "api-listen", 102 + Value: ":2470", 103 + }, 104 + &cli.StringFlag{ 105 + Name: "debug-listen", 106 + Value: "localhost:2471", 107 + }, 100 108 } 101 109 102 110 app.Action = func(cctx *cli.Context) error { ··· 125 133 // ensure data directory exists; won't error if it does 126 134 datadir := cctx.String("data-dir") 127 135 csdir := filepath.Join(datadir, "carstore") 128 - os.MkdirAll(datadir, os.ModePerm) 136 + if err := os.MkdirAll(datadir, os.ModePerm); err != nil { 137 + return err 138 + } 129 139 130 140 dburl := cctx.String("db-url") 131 141 db, err := cliutil.SetupDatabase(dburl) ··· 190 200 191 201 // set up pprof endpoint 192 202 go func() { 193 - if err := bgs.StartDebug("localhost:2471"); err != nil { 203 + if err := bgs.StartDebug(cctx.String("debug-listen")); err != nil { 194 204 panic(err) 195 205 } 196 206 }() 197 207 198 - return bgs.Start(":2470") 208 + return bgs.Start(cctx.String("api-listen")) 199 209 } 200 210 201 211 app.RunAndExitOnError()
+138
cmd/gosky/debug.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "net/http" 10 + "strconv" 11 + 12 + "github.com/bluesky-social/indigo/events" 13 + "github.com/bluesky-social/indigo/repo" 14 + "github.com/gorilla/websocket" 15 + "github.com/ipfs/go-cid" 16 + "github.com/ipld/go-car/v2" 17 + cli "github.com/urfave/cli/v2" 18 + ) 19 + 20 + var debugCmd = &cli.Command{ 21 + Name: "debug", 22 + Description: "a set of debugging utilities for atproto", 23 + Subcommands: []*cli.Command{ 24 + inspectEventCmd, 25 + }, 26 + } 27 + 28 + var inspectEventCmd = &cli.Command{ 29 + Name: "inspect-event", 30 + Flags: []cli.Flag{ 31 + &cli.StringFlag{ 32 + Name: "host", 33 + Required: true, 34 + }, 35 + &cli.BoolFlag{ 36 + Name: "dump-raw-blocks", 37 + }, 38 + }, 39 + Action: func(cctx *cli.Context) error { 40 + n, err := strconv.Atoi(cctx.Args().First()) 41 + if err != nil { 42 + return err 43 + } 44 + 45 + h := cctx.String("host") 46 + 47 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeAllRepos?cursor=%d", h, n-1) 48 + d := websocket.DefaultDialer 49 + con, _, err := d.Dial(url, http.Header{}) 50 + if err != nil { 51 + return fmt.Errorf("dial failure: %w", err) 52 + } 53 + 54 + var errFoundIt = fmt.Errorf("gotem") 55 + 56 + var match *events.RepoAppend 57 + 58 + ctx := context.TODO() 59 + err = events.HandleRepoStream(ctx, con, &events.RepoStreamCallbacks{ 60 + RepoAppend: func(evt *events.RepoAppend) error { 61 + n := int64(n) 62 + if evt.Seq == n { 63 + match = evt 64 + return errFoundIt 65 + } 66 + if evt.Seq > n { 67 + return fmt.Errorf("record not found in stream") 68 + } 69 + 70 + return nil 71 + }, 72 + Info: func(evt *events.InfoFrame) error { 73 + return nil 74 + }, 75 + Error: func(evt *events.ErrorFrame) error { 76 + return fmt.Errorf("%s: %s", evt.Error, evt.Message) 77 + }, 78 + }) 79 + 80 + if err != errFoundIt { 81 + return err 82 + } 83 + 84 + b, err := json.MarshalIndent(match, "", " ") 85 + if err != nil { 86 + return err 87 + } 88 + fmt.Println(string(b)) 89 + 90 + br, err := car.NewBlockReader(bytes.NewReader(match.Blocks)) 91 + if err != nil { 92 + return err 93 + } 94 + 95 + fmt.Println("\nSlice Dump:") 96 + fmt.Println("Root: ", br.Roots[0]) 97 + for { 98 + blk, err := br.Next() 99 + if err != nil { 100 + if err == io.EOF { 101 + break 102 + } 103 + return err 104 + } 105 + 106 + fmt.Println(blk.Cid()) 107 + if cctx.Bool("dump-raw-blocks") { 108 + fmt.Printf("%x\n", blk.RawData()) 109 + } 110 + } 111 + 112 + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(match.Blocks)) 113 + if err != nil { 114 + return fmt.Errorf("opening repo from slice: %w", err) 115 + } 116 + 117 + var prev cid.Cid 118 + if match.Prev != nil { 119 + c, err := cid.Decode(*match.Prev) 120 + if err != nil { 121 + return err 122 + } 123 + prev = c 124 + } 125 + 126 + fmt.Println("\nDiff ops: ") 127 + diff, err := r.DiffSince(ctx, prev) 128 + if err != nil { 129 + return err 130 + } 131 + 132 + for _, d := range diff { 133 + fmt.Printf("%s (%s): %s -> %s\n", d.Op, d.Rpath, d.OldCid, d.NewCid) 134 + } 135 + 136 + return nil 137 + }, 138 + }
+1
cmd/gosky/main.go
··· 72 72 app.Commands = []*cli.Command{ 73 73 actorGetSuggestionsCmd, 74 74 createSessionCmd, 75 + debugCmd, 75 76 deletePostCmd, 76 77 didCmd, 77 78 feedGetCmd,
+5 -1
events/events.go
··· 6 6 7 7 "github.com/bluesky-social/indigo/util" 8 8 logging "github.com/ipfs/go-log" 9 + "go.opentelemetry.io/otel" 9 10 ) 10 11 11 12 var log = logging.Logger("events") ··· 147 148 Message string `cborgen:"message"` 148 149 } 149 150 150 - func (em *EventManager) AddEvent(ev *XRPCStreamEvent) error { 151 + func (em *EventManager) AddEvent(ctx context.Context, ev *XRPCStreamEvent) error { 152 + ctx, span := otel.Tracer("events").Start(ctx, "AddEvent") 153 + defer span.End() 154 + 151 155 select { 152 156 case em.ops <- &Operation{ 153 157 op: opSend,
+1 -1
indexer/indexer.go
··· 138 138 } 139 139 140 140 log.Infow("Sending event", "did", did) 141 - if err := ix.events.AddEvent(&events.XRPCStreamEvent{ 141 + if err := ix.events.AddEvent(ctx, &events.XRPCStreamEvent{ 142 142 RepoAppend: &events.RepoAppend{ 143 143 Repo: did, 144 144 Prev: prevstr,
+1 -1
labeling/service.go
··· 232 232 Labels: labels, 233 233 }, 234 234 } 235 - err = s.evtmgr.AddEvent(&lev) 235 + err = s.evtmgr.AddEvent(ctx, &lev) 236 236 if err != nil { 237 237 return fmt.Errorf("failed to publish XRPCStreamEvent: %w", err) 238 238 }
+3
util/dbcid.go
··· 34 34 } 35 35 36 36 func (dbc DbCID) Value() (driver.Value, error) { 37 + if !dbc.CID.Defined() { 38 + return nil, fmt.Errorf("cannot serialize undefined cid to database") 39 + } 37 40 return dbc.CID.Bytes(), nil 38 41 } 39 42