this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

last seq from pebble

+19 -5
+8 -5
events/pebblepersist.go
··· 129 129 pp.broadcast = broadcast 130 130 } 131 131 132 - func (pp *PebblePersist) GetLast(ctx context.Context) (*XRPCStreamEvent, error) { 132 + func (pp *PebblePersist) GetLast(ctx context.Context) (seq, millis int64, evt *XRPCStreamEvent, err error) { 133 133 iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{}) 134 134 if err != nil { 135 - return nil, err 135 + return 0, 0, nil, err 136 136 } 137 137 ok := iter.Last() 138 138 if !ok { 139 - return nil, nil 139 + return 0, 0, nil, nil 140 140 } 141 - evt, err := eventFromPebbleIter(iter) 142 - return evt, nil 141 + evt, err = eventFromPebbleIter(iter) 142 + keyblob := iter.Key() 143 + seq = int64(binary.BigEndian.Uint64(keyblob[:8])) 144 + millis = int64(binary.BigEndian.Uint64(keyblob[8:16])) 145 + return seq, millis, evt, nil 143 146 } 144 147 145 148 // example;
+11
splitter/splitter.go
··· 413 413 } 414 414 415 415 if seq%5000 == 0 { 416 + // TODO: don't need this after we move to getting seq from pebble 416 417 if err := s.writeCursor(seq); err != nil { 417 418 log.Errorf("write cursor failed: %s", err) 418 419 } ··· 426 427 } 427 428 428 429 func (s *Splitter) getLastCursor() (int64, error) { 430 + if s.pp != nil { 431 + seq, millis, _, err := s.pp.GetLast(context.Background()) 432 + if err == nil { 433 + log.Debugw("got last cursor from pebble", "seq", seq, "millis", millis) 434 + return seq, nil 435 + } else { 436 + log.Errorw("pebble seq fail", "err", err) 437 + } 438 + } 439 + 429 440 fi, err := os.Open(s.cursorFile) 430 441 if err != nil { 431 442 if os.IsNotExist(err) {