this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

reorg config

+63 -14
+11 -1
cmd/rainbow/main.go
··· 72 72 EnvVars: []string{"SPLITTER_PERSIST_HOURS"}, 73 73 Usage: "hours to buffer (float, may be fractional)", 74 74 }, 75 + &cli.Int64Flag{ 76 + Name: "persist-bytes", 77 + Value: 1_000_000_000, 78 + Usage: "max bytes target for event cache", 79 + EnvVars: []string{"SPLITTER_PERSIST_BYTES"}, 80 + }, 75 81 } 76 82 77 83 app.Action = Splitter ··· 127 133 var err error 128 134 if persistPath != "" { 129 135 log.Infof("building splitter with storage at: %s", persistPath) 130 - spl, err = splitter.NewDiskSplitter(upstreamHost, persistPath, cctx.Float64("persist-hours")) 136 + spl, err = splitter.NewDiskSplitter( 137 + upstreamHost, 138 + persistPath, 139 + cctx.Float64("persist-hours"), 140 + cctx.Int64("persist-bytes")) 131 141 if err != nil { 132 142 log.Fatalw("failed to create splitter", "path", persistPath, "error", err) 133 143 return err
+43 -9
events/pebblepersist.go
··· 4 4 "bytes" 5 5 "context" 6 6 "encoding/binary" 7 + "encoding/hex" 7 8 "fmt" 9 + "time" 10 + 8 11 "github.com/bluesky-social/indigo/models" 9 12 "github.com/cockroachdb/pebble" 10 - "time" 11 13 ) 12 14 13 15 type PebblePersist struct { ··· 18 20 prevSeqExtra uint32 19 21 20 22 cancel func() 23 + 24 + options PebblePersistOptions 21 25 } 22 26 23 - func NewPebblePersistance(path string) (*PebblePersist, error) { 27 + type PebblePersistOptions struct { 28 + // Throw away posts older than some time ago 29 + PersistDuration time.Duration 30 + 31 + // Throw away old posts every so often 32 + GCPeriod time.Duration 33 + 34 + // MaxBytes is what we _try_ to keep disk usage under 35 + MaxBytes uint64 36 + } 37 + 38 + var DefaultPebblePersistOptions = PebblePersistOptions{ 39 + PersistDuration: time.Minute * 20, 40 + GCPeriod: time.Minute * 5, 41 + MaxBytes: 1024 * 1024 * 1024, // 1 GiB 42 + } 43 + 44 + // Create a new EventPersistence which stores data in pebbledb 45 + // nil opts is ok 46 + func NewPebblePersistance(path string, opts *PebblePersistOptions) (*PebblePersist, error) { 24 47 db, err := pebble.Open(path, &pebble.Options{}) 25 48 if err != nil { 26 49 return nil, fmt.Errorf("%s: %w", path, err) 27 50 } 28 51 pp := new(PebblePersist) 29 52 pp.db = db 53 + if opts == nil { 54 + pp.options = DefaultPebblePersistOptions 55 + } else { 56 + pp.options = *opts 57 + } 30 58 return pp, nil 31 59 } 32 60 ··· 150 178 // pp := NewPebblePersistance("/tmp/foo.pebble") 151 179 // go pp.GCThread(context.Background(), 48 * time.Hour, 5 * time.Minute) 152 180 // ``` 153 - func (pp *PebblePersist) GCThread(ctx context.Context, retention, gcPeriod time.Duration) { 181 + func (pp *PebblePersist) GCThread(ctx context.Context) { 154 182 ctx, cancel := context.WithCancel(ctx) 155 183 pp.cancel = cancel 156 - ticker := time.NewTicker(gcPeriod) 184 + ticker := time.NewTicker(pp.options.GCPeriod) 157 185 defer ticker.Stop() 158 186 for { 159 187 select { 160 188 case <-ticker.C: 161 - err := pp.GarbageCollect(ctx, retention) 189 + err := pp.GarbageCollect(ctx) 162 190 if err != nil { 163 191 log.Errorw("GC err", "err", err) 164 192 } ··· 169 197 } 170 198 171 199 var zeroKey [16]byte 200 + var ffffKey [16]byte 172 201 173 202 func init() { 174 203 setKeySeqMillis(zeroKey[:], 0, 0) 204 + for i := range ffffKey { 205 + ffffKey[i] = 0xff 206 + } 175 207 } 176 208 177 - func (pp *PebblePersist) GarbageCollect(ctx context.Context, retention time.Duration) error { 209 + func (pp *PebblePersist) GarbageCollect(ctx context.Context) error { 178 210 nowMillis := time.Now().UnixMilli() 179 - expired := nowMillis - retention.Milliseconds() 211 + expired := nowMillis - pp.options.PersistDuration.Milliseconds() 180 212 iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{}) 181 213 if err != nil { 182 214 return err ··· 196 228 break 197 229 } 198 230 } 199 - sizeBefore, _ := pp.db.EstimateDiskUsage(nil, nil) 231 + sizeBefore, _ := pp.db.EstimateDiskUsage(zeroKey[:], ffffKey[:]) 200 232 if seq == -1 { 201 233 // nothing to delete 234 + log.Infow("pebble gc nop", "size", sizeBefore) 202 235 return nil 203 236 } 204 237 var key [16]byte 205 238 setKeySeqMillis(key[:], seq, lastKeyTime) 239 + log.Infow("pebble gc start", "to", hex.EncodeToString(key[:])) 206 240 err = pp.db.DeleteRange(zeroKey[:], key[:], pebble.Sync) 207 241 if err != nil { 208 242 return err 209 243 } 210 - sizeAfter, _ := pp.db.EstimateDiskUsage(nil, nil) 244 + sizeAfter, _ := pp.db.EstimateDiskUsage(zeroKey[:], ffffKey[:]) 211 245 log.Infow("pebble gc", "before", sizeBefore, "after", sizeAfter) 212 246 return nil 213 247 }
+1 -1
events/pebblepersist_test.go
··· 8 8 9 9 func TestPebblePersist(t *testing.T) { 10 10 factory := func(tempPath string, db *gorm.DB) (EventPersistence, error) { 11 - return NewPebblePersistance(filepath.Join(tempPath, "pebble.db")) 11 + return NewPebblePersistance(filepath.Join(tempPath, "pebble.db"), nil) 12 12 } 13 13 testPersister(t, factory) 14 14 }
+8 -3
splitter/splitter.go
··· 54 54 consumers: make(map[uint64]*SocketConsumer), 55 55 } 56 56 } 57 - func NewDiskSplitter(host, path string, persistHours float64) (*Splitter, error) { 58 - pp, err := events.NewPebblePersistance(path) 57 + func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (*Splitter, error) { 58 + ppopts := events.PebblePersistOptions{ 59 + PersistDuration: time.Duration(float64(time.Hour) * persistHours), 60 + GCPeriod: 5 * time.Minute, 61 + MaxBytes: uint64(maxBytes), 62 + } 63 + pp, err := events.NewPebblePersistance(path, &ppopts) 59 64 if err != nil { 60 65 return nil, err 61 66 } 62 67 63 - go pp.GCThread(context.Background(), time.Duration(float64(time.Hour)*persistHours), 5*time.Minute) 68 + go pp.GCThread(context.Background()) 64 69 em := events.NewEventManager(pp) 65 70 return &Splitter{ 66 71 cursorFile: "cursor-file",