this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

support for configurable initial seq num

+21 -4
+7
cmd/relay/main.go
··· 128 128 Usage: "when messages fail atproto 'Sync 1.1' validation, just log, don't drop", 129 129 EnvVars: []string{"RELAY_LENIENT_SYNC_VALIDATION"}, 130 130 }, 131 + &cli.IntFlag{ 132 + Name: "initial-seq-number", 133 + Usage: "when initializing output firehose, start with this sequence number", 134 + Value: 1, 135 + EnvVars: []string{"RELAY_INITIAL_SEQ_NUMBER"}, 136 + }, 131 137 &cli.StringSliceFlag{ 132 138 Name: "sibling-relays", 133 139 Usage: "servers (eg https://example.com) to forward admin state changes to; multiple allowed", ··· 222 228 } 223 229 persitConfig := diskpersist.DefaultDiskPersistOptions() 224 230 persitConfig.Retention = cctx.Duration("replay-window") 231 + persitConfig.InitialSeq = cctx.Int64("initial-seq-number") 225 232 logger.Info("setting up disk persister", "dir", persistDir, "replayWindow", persitConfig.Retention) 226 233 persister, err := diskpersist.NewDiskPersistence(persistDir, "", db, persitConfig) 227 234 if err != nil {
+14 -4
cmd/relay/stream/persist/diskpersist/diskpersist.go
··· 39 39 40 40 eventCounter int64 41 41 curSeq int64 42 + initialSeq int64 42 43 43 44 uids UidSource 44 45 uidCache *arc.ARCCache[uint64, string] ··· 78 79 WriteBufferSize int 79 80 Retention time.Duration 80 81 82 + // starting sequence number to use (if there is no existing persisted data) 83 + InitialSeq int64 84 + 81 85 Logger *slog.Logger 82 86 } 83 87 ··· 88 92 DIDCacheSize: 1_000_000, 89 93 WriteBufferSize: 50, 90 94 Retention: time.Hour * 24 * 3, // 3 days 95 + InitialSeq: 1, 91 96 } 92 97 } 93 98 ··· 124 129 New: func() any { 125 130 return cbg.NewCborWriter(nil) 126 131 }, 132 + } 133 + 134 + if opts.InitialSeq <= 0 { 135 + return nil, fmt.Errorf("negative or zero initial seq: %d", opts.InitialSeq) 127 136 } 128 137 129 138 dp := &DiskPersistence{ ··· 141 150 writeBufferSize: opts.WriteBufferSize, 142 151 shutdown: make(chan struct{}), 143 152 log: opts.Logger, 153 + initialSeq: opts.InitialSeq, 144 154 } 145 155 if dp.log == nil { 146 156 dp.log = slog.Default().With("system", "diskpersist") ··· 211 221 212 222 if err := dp.meta.Create(&LogFileRef{ 213 223 Path: "evts-0", 214 - SeqStart: 0, 224 + SeqStart: 0, // NOTE: not dp.initialSeq 215 225 }).Error; err != nil { 216 226 return err 217 227 } 218 228 219 229 dp.logfi = fi 220 - dp.curSeq = 1 230 + dp.curSeq = dp.initialSeq 221 231 return nil 222 232 } 223 233 ··· 293 303 294 304 const ( 295 305 evtKindCommit = 1 296 - evtKindHandle = 2 297 - evtKindTombstone = 3 306 + evtKindHandle = 2 // DEPRECATED 307 + evtKindTombstone = 3 // DEPRECATED 298 308 evtKindIdentity = 4 299 309 evtKindAccount = 5 300 310 evtKindSync = 6