this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

repo subscription refactors (lex updates)

+38 -86
+6 -11
bgs/bgs.go
··· 334 334 u.Did = evt.Repo 335 335 } 336 336 337 - var prevcid *cid.Cid 338 - if evt.Prev != nil { 339 - c, err := cid.Decode(*evt.Prev) 340 - if err != nil { 341 - return fmt.Errorf("invalid value for prev cid in event: %w", err) 342 - } 343 - prevcid = &c 344 - } 345 - 346 337 // TODO: if the user is already in the 'slow' path, we shouldnt even bother trying to fast path this event 347 338 348 - if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, prevcid, evt.Blocks); err != nil { 339 + if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Prev, evt.Blocks); err != nil { 349 340 log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq) 350 341 if !errors.Is(err, carstore.ErrRepoBaseMismatch) { 351 342 return fmt.Errorf("handle user event failed: %w", err) ··· 363 354 364 355 // sync blobs 365 356 if len(evt.Blobs) > 0 { 366 - if err := bgs.syncUserBlobs(ctx, host, u.ID, evt.Blobs); err != nil { 357 + var blobStrs []string 358 + for _, b := range evt.Blobs { 359 + blobStrs = append(blobStrs, b.String()) 360 + } 361 + if err := bgs.syncUserBlobs(ctx, host, u.ID, blobStrs); err != nil { 367 362 return err 368 363 } 369 364 }
+1 -1
cmd/gosky/debug.go
··· 122 122 if err != nil { 123 123 return fmt.Errorf("loading %q: %w", op.Path, err) 124 124 } 125 - if rcid.String() != *op.Cid { 125 + if rcid != *op.Cid { 126 126 return fmt.Errorf("mismatch in record cid %s != %s", rcid, *op.Cid) 127 127 } 128 128 fmt.Printf("%s (%s): %s\n", op.Action, op.Path, *op.Cid)
+2 -2
cmd/gosky/main.go
··· 908 908 909 909 } else { 910 910 pstr := "<nil>" 911 - if evt.Prev != nil { 912 - pstr = *evt.Prev 911 + if evt.Prev != nil && *evt.Prev != cid.Undef { 912 + pstr = evt.Prev.String() 913 913 } 914 914 fmt.Printf("(%d) RepoAppend: %s (%s -> %s)\n", evt.Seq, evt.Repo, pstr, evt.Commit) 915 915 }
+22 -31
events/dbpersist.go
··· 75 75 } 76 76 77 77 var prev *util.DbCID 78 - if evt.Prev != nil { 79 - c, err := cid.Decode(*evt.Prev) 80 - if err != nil { 81 - return fmt.Errorf("decoding prev cid (%q): %w", *evt.Prev, err) 82 - } 83 - 84 - prev = &util.DbCID{c} 85 - } 86 - 87 - com, err := cid.Decode(evt.Commit) 88 - if err != nil { 89 - return err 78 + if evt.Prev != nil && *evt.Prev != cid.Undef { 79 + prev = &util.DbCID{*evt.Prev} 90 80 } 91 81 92 82 var blobs []byte ··· 104 94 } 105 95 106 96 rer := RepoEventRecord{ 107 - Commit: util.DbCID{com}, 97 + Commit: util.DbCID{evt.Commit}, 108 98 Prev: prev, 109 99 Repo: uid, 110 100 Event: evt.Event, ··· 114 104 115 105 for _, op := range evt.Ops { 116 106 var rec *util.DbCID 117 - if op.Cid != nil { 118 - c, err := cid.Decode(*op.Cid) 119 - if err != nil { 120 - return err 121 - } 122 - 123 - rec = &util.DbCID{c} 107 + if op.Cid != nil && *op.Cid != cid.Undef { 108 + rec = &util.DbCID{*op.Cid} 124 109 } 125 110 rer.Ops = append(rer.Ops, RepoOpRecord{ 126 111 Path: op.Path, ··· 195 180 return nil, err 196 181 } 197 182 } 183 + var blobCIDs []cid.Cid 184 + for _, b := range blobs { 185 + c, err := cid.Decode(b) 186 + if err != nil { 187 + return nil, err 188 + } 189 + blobCIDs = append(blobCIDs, c) 190 + } 198 191 199 192 did, err := p.didForUid(ctx, rer.Repo) 200 193 if err != nil { 201 194 return nil, err 202 195 } 203 196 204 - var prev *string 205 - if rer.Prev != nil { 206 - s := rer.Prev.CID.String() 207 - prev = &s 197 + var prevCID *cid.Cid 198 + if rer != nil && rer.Prev != nil && rer.Prev.CID != cid.Undef { 199 + prevCID = &rer.Prev.CID 208 200 } 209 201 210 202 out := &RepoAppend{ 211 203 Seq: int64(rer.Seq), 212 204 Repo: did, 213 - Commit: rer.Commit.CID.String(), 214 - Prev: prev, 205 + Commit: rer.Commit.CID, 206 + Prev: prevCID, 215 207 Time: rer.Time.Format(util.ISO8601), 216 - Blobs: blobs, 208 + Blobs: blobCIDs, 217 209 Event: rer.Event, 218 210 } 219 211 220 212 for _, op := range rer.Ops { 221 - var rec *string 213 + var recCID *cid.Cid 222 214 if op.Rec != nil { 223 - s := op.Rec.CID.String() 224 - rec = &s 215 + recCID = &op.Rec.CID 225 216 } 226 217 227 218 out.Ops = append(out.Ops, &RepoOp{ 228 219 Path: op.Path, 229 220 Action: op.Action, 230 - Cid: rec, 221 + Cid: recCID, 231 222 }) 232 223 } 233 224
+1 -1
events/repostream.go
··· 36 36 return nil 37 37 } 38 38 39 - if rc.String() != *op.Cid { 39 + if rc != *op.Cid { 40 40 return fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) 41 41 } 42 42
+3 -15
indexer/indexer.go
··· 82 82 83 83 var outops []*events.RepoOp 84 84 for _, op := range evt.Ops { 85 - var cc *string 86 - if op.RecCid != nil { 87 - s := op.RecCid.String() 88 - cc = &s 89 - } 90 85 outops = append(outops, &events.RepoOp{ 91 86 Path: op.Collection + "/" + op.Rkey, 92 87 Action: string(op.Kind), 93 - Cid: cc, 88 + Cid: op.RecCid, 94 89 }) 95 90 96 91 switch op.Kind { ··· 131 126 return err 132 127 } 133 128 134 - // TODO: these should be cids on the wire 135 - var prevstr *string 136 - if evt.OldRoot != nil { 137 - s := evt.OldRoot.String() 138 - prevstr = &s 139 - } 140 - 141 129 toobig := false 142 130 slice := evt.RepoSlice 143 131 if len(slice) > carstore.MaxSliceLength { ··· 150 138 if err := ix.events.AddEvent(ctx, &events.XRPCStreamEvent{ 151 139 RepoAppend: &events.RepoAppend{ 152 140 Repo: did, 153 - Prev: prevstr, 141 + Prev: evt.OldRoot, 154 142 Blocks: slice, 155 - Commit: evt.NewRoot.String(), 143 + Commit: evt.NewRoot, 156 144 Time: time.Now().Format(util.ISO8601), 157 145 Ops: outops, 158 146 TooBig: toobig,
+3 -25
pds/server.go
··· 142 142 u.ID = subj.Uid 143 143 } 144 144 145 - var pcid *cid.Cid 146 - if evt.Prev != nil { 147 - prev, err := cid.Decode(*evt.Prev) 148 - if err != nil { 149 - return err 150 - } 151 - 152 - pcid = &prev 153 - } 154 - 155 - return s.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, pcid, evt.Blocks) 145 + return s.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Prev, evt.Blocks) 156 146 default: 157 147 return fmt.Errorf("invalid fed event") 158 148 } ··· 251 241 return nil, err 252 242 } 253 243 254 - var prevcid *string 255 - if evt.OldRoot != nil { 256 - s := evt.OldRoot.String() 257 - prevcid = &s 258 - } 259 - 260 244 out := &events.RepoAppend{ 261 - Prev: prevcid, 245 + Prev: evt.OldRoot, 262 246 Blocks: evt.RepoSlice, 263 247 Repo: did, 264 248 Time: time.Now().Format(bsutil.ISO8601), ··· 266 250 } 267 251 268 252 for _, op := range evt.Ops { 269 - var s *string 270 - if op.RecCid != nil { 271 - cs := op.RecCid.String() 272 - s = &cs 273 - } 274 - 275 253 out.Ops = append(out.Ops, &events.RepoOp{ 276 254 Path: op.Collection + "/" + op.Rkey, 277 255 Action: string(op.Kind), 278 - Cid: s, 256 + Cid: op.RecCid, 279 257 }) 280 258 } 281 259