this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

updates for refactored persist

+20 -15
+5 -3
cmd/bigsky/main.go
··· 19 19 "github.com/bluesky-social/indigo/carstore" 20 20 "github.com/bluesky-social/indigo/did" 21 21 "github.com/bluesky-social/indigo/events" 22 + "github.com/bluesky-social/indigo/events/dbpersist" 23 + "github.com/bluesky-social/indigo/events/diskpersist" 22 24 "github.com/bluesky-social/indigo/indexer" 23 25 "github.com/bluesky-social/indigo/plc" 24 26 "github.com/bluesky-social/indigo/repomgr" ··· 431 433 if dpd := cctx.String("disk-persister-dir"); dpd != "" { 432 434 slog.Info("setting up disk persister") 433 435 434 - pOpts := events.DefaultDiskPersistOptions() 436 + pOpts := diskpersist.DefaultDiskPersistOptions() 435 437 pOpts.Retention = cctx.Duration("event-playback-ttl") 436 - dp, err := events.NewDiskPersistence(dpd, "", db, pOpts) 438 + dp, err := diskpersist.NewDiskPersistence(dpd, "", db, pOpts) 437 439 if err != nil { 438 440 return fmt.Errorf("setting up disk persister: %w", err) 439 441 } 440 442 persister = dp 441 443 } else { 442 - dbp, err := events.NewDbPersistence(db, cstore, nil) 444 + dbp, err := dbpersist.NewDbPersistence(db, cstore, nil) 443 445 if err != nil { 444 446 return fmt.Errorf("setting up db event persistence: %w", err) 445 447 }
+2 -2
cmd/rainbow/main.go
··· 9 9 "syscall" 10 10 "time" 11 11 12 - "github.com/bluesky-social/indigo/events" 12 + "github.com/bluesky-social/indigo/events/pebblepersist" 13 13 "github.com/bluesky-social/indigo/splitter" 14 14 15 15 "github.com/carlmjohnson/versioninfo" ··· 154 154 var err error 155 155 if persistPath != "" { 156 156 log.Info("building splitter with storage at", "path", persistPath) 157 - ppopts := events.PebblePersistOptions{ 157 + ppopts := pebblepersist.PebblePersistOptions{ 158 158 DbPath: persistPath, 159 159 PersistDuration: time.Duration(float64(time.Hour) * cctx.Float64("persist-hours")), 160 160 GCPeriod: 5 * time.Minute,
+2 -1
cmd/supercollider/main.go
··· 24 24 "github.com/bluesky-social/indigo/carstore" 25 25 "github.com/bluesky-social/indigo/did" 26 26 "github.com/bluesky-social/indigo/events" 27 + "github.com/bluesky-social/indigo/events/yolopersist" 27 28 "github.com/bluesky-social/indigo/indexer" 28 29 "github.com/bluesky-social/indigo/models" 29 30 "github.com/bluesky-social/indigo/plc" ··· 205 206 logger.Info(fmt.Sprintf("Generating %d total events and writing them to %s", 206 207 cctx.Int("total-events"), cctx.String("output-file"))) 207 208 208 - em := events.NewEventManager(events.NewYoloPersister()) 209 + em := events.NewEventManager(yolopersist.NewYoloPersister()) 209 210 210 211 // Try to read the key from disk 211 212 keyBytes, err := os.ReadFile(cctx.String("key-file"))
+8 -7
splitter/splitter.go
··· 22 22 "github.com/bluesky-social/indigo/api/atproto" 23 23 comatproto "github.com/bluesky-social/indigo/api/atproto" 24 24 "github.com/bluesky-social/indigo/bgs" 25 - events "github.com/bluesky-social/indigo/events" 25 + "github.com/bluesky-social/indigo/events" 26 + "github.com/bluesky-social/indigo/events/pebblepersist" 26 27 "github.com/bluesky-social/indigo/events/schedulers/sequential" 27 28 "github.com/bluesky-social/indigo/util" 28 29 "github.com/bluesky-social/indigo/xrpc" ··· 36 37 37 38 type Splitter struct { 38 39 erb *EventRingBuffer 39 - pp *events.PebblePersist 40 + pp *pebblepersist.PebblePersist 40 41 events *events.EventManager 41 42 42 43 // Management of Socket Consumers ··· 55 56 type SplitterConfig struct { 56 57 UpstreamHost string 57 58 CursorFile string 58 - PebbleOptions *events.PebblePersistOptions 59 + PebbleOptions *pebblepersist.PebblePersistOptions 59 60 } 60 61 61 62 func (sc *SplitterConfig) XrpcRootUrl() string { ··· 103 104 s.erb = erb 104 105 s.events = events.NewEventManager(erb) 105 106 } else { 106 - pp, err := events.NewPebblePersistance(conf.PebbleOptions) 107 + pp, err := pebblepersist.NewPebblePersistance(conf.PebbleOptions) 107 108 if err != nil { 108 109 return nil, err 109 110 } ··· 115 116 return s, nil 116 117 } 117 118 func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (*Splitter, error) { 118 - ppopts := events.PebblePersistOptions{ 119 + ppopts := pebblepersist.PebblePersistOptions{ 119 120 DbPath: path, 120 121 PersistDuration: time.Duration(float64(time.Hour) * persistHours), 121 122 GCPeriod: 5 * time.Minute, ··· 126 127 CursorFile: "cursor-file", 127 128 PebbleOptions: &ppopts, 128 129 } 129 - pp, err := events.NewPebblePersistance(&ppopts) 130 + pp, err := pebblepersist.NewPebblePersistance(&ppopts) 130 131 if err != nil { 131 132 return nil, err 132 133 } ··· 642 643 if err == nil { 643 644 s.log.Debug("got last cursor from pebble", "seq", seq, "millis", millis) 644 645 return seq, nil 645 - } else if errors.Is(err, events.ErrNoLast) { 646 + } else if errors.Is(err, pebblepersist.ErrNoLast) { 646 647 s.log.Info("pebble no last") 647 648 } else { 648 649 s.log.Error("pebble seq fail", "err", err)
+3 -2
testing/utils.go
··· 25 25 "github.com/bluesky-social/indigo/bgs" 26 26 "github.com/bluesky-social/indigo/carstore" 27 27 "github.com/bluesky-social/indigo/events" 28 + "github.com/bluesky-social/indigo/events/diskpersist" 28 29 "github.com/bluesky-social/indigo/events/schedulers/sequential" 29 30 "github.com/bluesky-social/indigo/indexer" 30 31 lexutil "github.com/bluesky-social/indigo/lex/util" ··· 559 560 560 561 repoman := repomgr.NewRepoManager(cs, kmgr) 561 562 562 - opts := events.DefaultDiskPersistOptions() 563 + opts := diskpersist.DefaultDiskPersistOptions() 563 564 opts.EventsPerFile = 10 564 - diskpersist, err := events.NewDiskPersistence(filepath.Join(dir, "dp-primary"), filepath.Join(dir, "dp-archive"), maindb, opts) 565 + diskpersist, err := diskpersist.NewDiskPersistence(filepath.Join(dir, "dp-primary"), filepath.Join(dir, "dp-archive"), maindb, opts) 565 566 566 567 evtman := events.NewEventManager(diskpersist) 567 568 rf := indexer.NewRepoFetcher(maindb, repoman, 10)