this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

GC thread with retention and period

+60 -6
+60 -6
events/pebblepersist.go
··· 7 7 "fmt" 8 8 "github.com/bluesky-social/indigo/models" 9 9 "github.com/cockroachdb/pebble" 10 + "time" 10 11 ) 11 12 12 13 type PebblePersist struct { ··· 15 16 16 17 prevSeq int64 17 18 prevSeqExtra uint32 19 + 20 + cancel func() 18 21 } 19 22 20 23 func NewPebblePersistance(path string) (*PebblePersist, error) { ··· 35 38 blob := e.Preserialized 36 39 37 40 seq := e.Sequence() 38 - log.Infof("persist %d", seq) 41 + nowMillis := uint64(time.Now().UnixMilli()) 39 42 40 43 if seq < 0 { 41 - // persist with longer key {prev 8 byte key}{int32 extra counter} 44 + // persist with longer key {prev 8 byte key}{time}{int32 extra counter} 42 45 pp.prevSeqExtra++ 43 - var key [12]byte 46 + var key [20]byte 44 47 binary.BigEndian.PutUint64(key[:8], uint64(pp.prevSeq)) 45 - binary.BigEndian.PutUint32(key[8:], pp.prevSeqExtra) 48 + binary.BigEndian.PutUint64(key[8:16], nowMillis) 49 + binary.BigEndian.PutUint32(key[16:], pp.prevSeqExtra) 46 50 47 51 err = pp.db.Set(key[:], blob, pebble.Sync) 48 52 return nil 49 53 } else { 50 54 pp.prevSeq = seq 51 55 pp.prevSeqExtra = 0 52 - var key [8]byte 53 - binary.BigEndian.PutUint64(key[:], uint64(seq)) 56 + var key [16]byte 57 + binary.BigEndian.PutUint64(key[:8], uint64(seq)) 58 + binary.BigEndian.PutUint64(key[8:16], nowMillis) 54 59 55 60 err = pp.db.Set(key[:], blob, pebble.Sync) 56 61 } ··· 105 110 return pp.db.Flush() 106 111 } 107 112 func (pp *PebblePersist) Shutdown(context.Context) error { 113 + if pp.cancel != nil { 114 + pp.cancel() 115 + } 108 116 err := pp.db.Close() 109 117 pp.db = nil 110 118 return err ··· 126 134 evt, err := eventFromPebbleIter(iter) 127 135 return evt, nil 128 136 } 137 + 138 + // example; 139 + // ``` 140 + // pp := NewPebblePersistance("/tmp/foo.pebble") 141 + // go pp.GCThread(context.TODO(), 48 * time.Hour, 5 * time.Minute) 142 + // ``` 143 + func (pp *PebblePersist) GCThread(ctx context.Context, retention, gcPeriod time.Duration) { 144 + ctx, cancel := context.WithCancel(ctx) 145 + pp.cancel = cancel 146 + ticker := time.NewTicker(gcPeriod) 147 + defer ticker.Stop() 148 + for { 149 + select { 150 + case <-ticker.C: 151 + pp.GarbageCollect(ctx, retention) 152 + case <-ctx.Done(): 153 + return 154 + } 155 + } 156 + } 157 + func (pp *PebblePersist) GarbageCollect(ctx context.Context, retention time.Duration) error { 158 + nowMillis := time.Now().UnixMilli() 159 + expired := uint64(nowMillis - retention.Milliseconds()) 160 + iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{}) 161 + if err != nil { 162 + return err 163 + } 164 + defer iter.Close() 165 + todel := make(chan []byte, 100) 166 + go func() { 167 + for xkey := range todel { 168 + pp.db.Delete(xkey, nil) 169 + } 170 + }() 171 + defer close(todel) 172 + for iter.First(); iter.Valid(); iter.Next() { 173 + keyblob := iter.Key() 174 + keyTime := binary.BigEndian.Uint64(keyblob[8:16]) 175 + if keyTime < expired { 176 + todel <- bytes.Clone(keyblob) 177 + } else { 178 + break 179 + } 180 + } 181 + return nil 182 + }