this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

PR feedback, use pebble DeleteRange

+35 -20
+35 -20
events/pebblepersist.go
··· 30 30 return pp, nil 31 31 } 32 32 33 + func setKeySeqMillis(key []byte, seq, millis int64) { 34 + binary.BigEndian.PutUint64(key[:8], uint64(seq)) 35 + binary.BigEndian.PutUint64(key[8:16], uint64(millis)) 36 + } 37 + 33 38 func (pp *PebblePersist) Persist(ctx context.Context, e *XRPCStreamEvent) error { 34 39 err := e.Preserialize() 35 40 if err != nil { ··· 38 43 blob := e.Preserialized 39 44 40 45 seq := e.Sequence() 41 - nowMillis := uint64(time.Now().UnixMilli()) 46 + nowMillis := time.Now().UnixMilli() 42 47 43 48 if seq < 0 { 44 49 // persist with longer key {prev 8 byte key}{time}{int32 extra counter} 45 50 pp.prevSeqExtra++ 46 51 var key [20]byte 47 - binary.BigEndian.PutUint64(key[:8], uint64(pp.prevSeq)) 48 - binary.BigEndian.PutUint64(key[8:16], nowMillis) 52 + setKeySeqMillis(key[:], seq, nowMillis) 49 53 binary.BigEndian.PutUint32(key[16:], pp.prevSeqExtra) 50 54 51 55 err = pp.db.Set(key[:], blob, pebble.Sync) 52 - return nil 53 56 } else { 54 57 pp.prevSeq = seq 55 58 pp.prevSeqExtra = 0 56 59 var key [16]byte 57 - binary.BigEndian.PutUint64(key[:8], uint64(seq)) 58 - binary.BigEndian.PutUint64(key[8:16], nowMillis) 60 + setKeySeqMillis(key[:], seq, nowMillis) 59 61 60 62 err = pp.db.Set(key[:], blob, pebble.Sync) 61 63 } ··· 138 140 // example; 139 141 // ``` 140 142 // pp := NewPebblePersistance("/tmp/foo.pebble") 141 - // go pp.GCThread(context.TODO(), 48 * time.Hour, 5 * time.Minute) 143 + // go pp.GCThread(context.Background(), 48 * time.Hour, 5 * time.Minute) 142 144 // ``` 143 145 func (pp *PebblePersist) GCThread(ctx context.Context, retention, gcPeriod time.Duration) { 144 146 ctx, cancel := context.WithCancel(ctx) ··· 148 150 for { 149 151 select { 150 152 case <-ticker.C: 151 - pp.GarbageCollect(ctx, retention) 153 + err := pp.GarbageCollect(ctx, retention) 154 + log.Error("GC err", "err", err) 152 155 case <-ctx.Done(): 153 156 return 154 157 } 155 158 } 156 159 } 160 + 161 + var zeroKey [16]byte 162 + 163 + func init() { 164 + setKeySeqMillis(zeroKey[:], 0, 0) 165 + } 166 + 157 167 func (pp *PebblePersist) GarbageCollect(ctx context.Context, retention time.Duration) error { 158 168 nowMillis := time.Now().UnixMilli() 159 - expired := uint64(nowMillis - retention.Milliseconds()) 169 + expired := nowMillis - retention.Milliseconds() 160 170 iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{}) 161 171 if err != nil { 162 172 return err 163 173 } 164 174 defer iter.Close() 165 - todel := make(chan []byte, 100) 166 - go func() { 167 - for xkey := range todel { 168 - pp.db.Delete(xkey, nil) 169 - } 170 - }() 171 - defer close(todel) 175 + // scan keys to find last expired, then delete range 176 + var seq int64 = int64(-1) 177 + var lastKeyTime int64 172 178 for iter.First(); iter.Valid(); iter.Next() { 173 179 keyblob := iter.Key() 174 - keyTime := binary.BigEndian.Uint64(keyblob[8:16]) 175 - if keyTime < expired { 176 - todel <- bytes.Clone(keyblob) 180 + 181 + keyTime := int64(binary.BigEndian.Uint64(keyblob[8:16])) 182 + if keyTime <= expired { 183 + lastKeyTime = keyTime 184 + seq = int64(binary.BigEndian.Uint64(keyblob[:8])) 177 185 } else { 178 186 break 179 187 } 180 188 } 181 - return nil 189 + if seq == -1 { 190 + // nothing to delete 191 + return nil 192 + } 193 + var key [16]byte 194 + setKeySeqMillis(key[:], seq, lastKeyTime) 195 + err = pp.db.DeleteRange(zeroKey[:], key[:], pebble.Sync) 196 + return err 182 197 }