this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor automod consumer into package (#776)

authored by

bnewbold and committed by
GitHub
3ff74054 150e0519

+619 -518
+2
automod/consumer/doc.go
··· 1 + // Code for consuming from atproto firehose and ozone event stream, pushing events in to automod engine. 2 + package consumer
+312
automod/consumer/firehose.go
··· 1 + package consumer 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "log/slog" 8 + "net/http" 9 + "net/url" 10 + "sync/atomic" 11 + "time" 12 + 13 + comatproto "github.com/bluesky-social/indigo/api/atproto" 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/bluesky-social/indigo/automod" 16 + "github.com/bluesky-social/indigo/events/schedulers/autoscaling" 17 + "github.com/bluesky-social/indigo/events/schedulers/parallel" 18 + lexutil "github.com/bluesky-social/indigo/lex/util" 19 + 20 + "github.com/bluesky-social/indigo/events" 21 + "github.com/bluesky-social/indigo/repo" 22 + "github.com/bluesky-social/indigo/repomgr" 23 + "github.com/carlmjohnson/versioninfo" 24 + "github.com/gorilla/websocket" 25 + "github.com/redis/go-redis/v9" 26 + ) 27 + 28 + // TODO: should probably make this not hepa-specific; or even configurable 29 + var firehoseCursorKey = "hepa/seq" 30 + 31 + type FirehoseConsumer struct { 32 + Parallelism int 33 + Logger *slog.Logger 34 + RedisClient *redis.Client 35 + Engine *automod.Engine 36 + Host string 37 + 38 + // TODO: prefilter record collections; or predicate function? 39 + // TODO: enable/disable event types; or predicate function? 40 + 41 + // lastSeq is the most recent event sequence number we've received and begun to handle. 42 + // This number is periodically persisted to redis, if redis is present. 43 + // The value is best-effort (the stream handling itself is concurrent, so event numbers may not be monotonic), 44 + // but nonetheless, you must use atomics when updating or reading this (to avoid data races). 45 + lastSeq int64 46 + } 47 + 48 + func (fc *FirehoseConsumer) Run(ctx context.Context) error { 49 + 50 + if fc.Engine == nil { 51 + return fmt.Errorf("nil engine") 52 + } 53 + 54 + cur, err := fc.ReadLastCursor(ctx) 55 + if err != nil { 56 + return err 57 + } 58 + 59 + dialer := websocket.DefaultDialer 60 + u, err := url.Parse(fc.Host) 61 + if err != nil { 62 + return fmt.Errorf("invalid Host URI: %w", err) 63 + } 64 + u.Path = "xrpc/com.atproto.sync.subscribeRepos" 65 + if cur != 0 { 66 + u.RawQuery = fmt.Sprintf("cursor=%d", cur) 67 + } 68 + fc.Logger.Info("subscribing to repo event stream", "upstream", fc.Host, "cursor", cur) 69 + con, _, err := dialer.Dial(u.String(), http.Header{ 70 + "User-Agent": []string{fmt.Sprintf("hepa/%s", versioninfo.Short())}, 71 + }) 72 + if err != nil { 73 + return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 74 + } 75 + 76 + rsc := &events.RepoStreamCallbacks{ 77 + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 78 + atomic.StoreInt64(&fc.lastSeq, evt.Seq) 79 + return fc.HandleRepoCommit(ctx, evt) 80 + }, 81 + RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 82 + atomic.StoreInt64(&fc.lastSeq, evt.Seq) 83 + did, err := syntax.ParseDID(evt.Did) 84 + if err != nil { 85 + fc.Logger.Error("bad DID in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err) 86 + return nil 87 + } 88 + if err := fc.Engine.ProcessIdentityEvent(ctx, "identity", did); err != nil { 89 + fc.Logger.Error("processing repo identity failed", "did", evt.Did, "seq", evt.Seq, "err", err) 90 + } 91 + return nil 92 + }, 93 + RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 94 + atomic.StoreInt64(&fc.lastSeq, evt.Seq) 95 + did, err := syntax.ParseDID(evt.Did) 96 + if err != nil { 97 + fc.Logger.Error("bad DID in RepoAccount event", "did", evt.Did, "seq", evt.Seq, "err", err) 98 + return nil 99 + } 100 + if err := fc.Engine.ProcessIdentityEvent(ctx, "account", did); err != nil { 101 + fc.Logger.Error("processing repo account failed", "did", evt.Did, "seq", evt.Seq, "err", err) 102 + } 103 + return nil 104 + }, 105 + // TODO: deprecated 106 + RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 107 + atomic.StoreInt64(&fc.lastSeq, evt.Seq) 108 + did, err := syntax.ParseDID(evt.Did) 109 + if err != nil { 110 + fc.Logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 111 + return nil 112 + } 113 + if err := fc.Engine.ProcessIdentityEvent(ctx, "handle", did); err != nil { 114 + fc.Logger.Error("processing handle update failed", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 115 + } 116 + return nil 117 + }, 118 + // TODO: deprecated 119 + RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { 120 + atomic.StoreInt64(&fc.lastSeq, evt.Seq) 121 + did, err := syntax.ParseDID(evt.Did) 122 + if err != nil { 123 + fc.Logger.Error("bad DID in RepoTombstone event", "did", evt.Did, "seq", evt.Seq, "err", err) 124 + return nil 125 + } 126 + if err := fc.Engine.ProcessIdentityEvent(ctx, "tombstone", did); err != nil { 127 + fc.Logger.Error("processing repo tombstone failed", "did", evt.Did, "seq", evt.Seq, "err", err) 128 + } 129 + return nil 130 + }, 131 + } 132 + 133 + var scheduler events.Scheduler 134 + if fc.Parallelism > 0 { 135 + // use a fixed-parallelism scheduler if configured 136 + scheduler = parallel.NewScheduler( 137 + fc.Parallelism, 138 + 1000, 139 + fc.Host, 140 + rsc.EventHandler, 141 + ) 142 + fc.Logger.Info("hepa scheduler configured", "scheduler", "parallel", "initial", fc.Parallelism) 143 + } else { 144 + // otherwise use auto-scaling scheduler 145 + scaleSettings := autoscaling.DefaultAutoscaleSettings() 146 + // start at higher parallelism (somewhat arbitrary) 147 + scaleSettings.Concurrency = 4 148 + scaleSettings.MaxConcurrency = 200 149 + scheduler = autoscaling.NewScheduler(scaleSettings, fc.Host, rsc.EventHandler) 150 + fc.Logger.Info("hepa scheduler configured", "scheduler", "autoscaling", "initial", scaleSettings.Concurrency, "max", scaleSettings.MaxConcurrency) 151 + } 152 + 153 + return events.HandleRepoStream(ctx, con, scheduler) 154 + } 155 + 156 + // NOTE: for now, this function basically never errors, just logs and returns nil. Should think through error processing better. 157 + func (fc *FirehoseConsumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 158 + 159 + logger := fc.Logger.With("event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq) 160 + logger.Debug("received commit event") 161 + 162 + if evt.TooBig { 163 + logger.Warn("skipping tooBig events for now") 164 + return nil 165 + } 166 + 167 + did, err := syntax.ParseDID(evt.Repo) 168 + if err != nil { 169 + logger.Error("bad DID syntax in event", "err", err) 170 + return nil 171 + } 172 + 173 + rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 174 + if err != nil { 175 + logger.Error("failed to read repo from car", "err", err) 176 + return nil 177 + } 178 + 179 + // empty commit is a special case, temporarily, basically indicates "new account" 180 + if len(evt.Ops) == 0 { 181 + if err := fc.Engine.ProcessIdentityEvent(ctx, "create", did); err != nil { 182 + fc.Logger.Error("processing handle update failed", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq, "err", err) 183 + } 184 + } 185 + 186 + for _, op := range evt.Ops { 187 + logger = logger.With("eventKind", op.Action, "path", op.Path) 188 + collection, rkey, err := splitRepoPath(op.Path) 189 + if err != nil { 190 + logger.Error("invalid path in repo op") 191 + return nil 192 + } 193 + 194 + ek := repomgr.EventKind(op.Action) 195 + switch ek { 196 + case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 197 + // read the record bytes from blocks, and verify CID 198 + rc, recCBOR, err := rr.GetRecordBytes(ctx, op.Path) 199 + if err != nil { 200 + logger.Error("reading record from event blocks (CAR)", "err", err) 201 + break 202 + } 203 + if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid { 204 + logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid) 205 + break 206 + } 207 + var action string 208 + switch ek { 209 + case repomgr.EvtKindCreateRecord: 210 + action = automod.CreateOp 211 + case repomgr.EvtKindUpdateRecord: 212 + action = automod.UpdateOp 213 + default: 214 + logger.Error("impossible event kind", "kind", ek) 215 + break 216 + } 217 + recCID := syntax.CID(op.Cid.String()) 218 + err = fc.Engine.ProcessRecordOp(ctx, automod.RecordOp{ 219 + Action: action, 220 + DID: did, 221 + Collection: collection, 222 + RecordKey: rkey, 223 + CID: &recCID, 224 + RecordCBOR: *recCBOR, 225 + }) 226 + if err != nil { 227 + logger.Error("engine failed to process record", "err", err) 228 + continue 229 + } 230 + case repomgr.EvtKindDeleteRecord: 231 + err = fc.Engine.ProcessRecordOp(ctx, automod.RecordOp{ 232 + Action: automod.DeleteOp, 233 + DID: did, 234 + Collection: collection, 235 + RecordKey: rkey, 236 + CID: nil, 237 + RecordCBOR: nil, 238 + }) 239 + if err != nil { 240 + logger.Error("engine failed to process record", "err", err) 241 + continue 242 + } 243 + default: 244 + // TODO: should this be an error? 245 + } 246 + } 247 + 248 + return nil 249 + } 250 + 251 + func (fc *FirehoseConsumer) ReadLastCursor(ctx context.Context) (int64, error) { 252 + // if redis isn't configured, just skip 253 + if fc.RedisClient == nil { 254 + fc.Logger.Info("redis not configured, skipping cursor read") 255 + return 0, nil 256 + } 257 + 258 + val, err := fc.RedisClient.Get(ctx, firehoseCursorKey).Int64() 259 + if err == redis.Nil { 260 + fc.Logger.Info("no pre-existing cursor in redis") 261 + return 0, nil 262 + } else if err != nil { 263 + return 0, err 264 + } 265 + fc.Logger.Info("successfully found prior subscription cursor seq in redis", "seq", val) 266 + return val, nil 267 + } 268 + 269 + func (fc *FirehoseConsumer) PersistCursor(ctx context.Context) error { 270 + // if redis isn't configured, just skip 271 + if fc.RedisClient == nil { 272 + return nil 273 + } 274 + lastSeq := atomic.LoadInt64(&fc.lastSeq) 275 + if lastSeq <= 0 { 276 + return nil 277 + } 278 + err := fc.RedisClient.Set(ctx, firehoseCursorKey, lastSeq, 14*24*time.Hour).Err() 279 + return err 280 + } 281 + 282 + // this method runs in a loop, persisting the current cursor state every 5 seconds 283 + func (fc *FirehoseConsumer) RunPersistCursor(ctx context.Context) error { 284 + 285 + // if redis isn't configured, just skip 286 + if fc.RedisClient == nil { 287 + return nil 288 + } 289 + ticker := time.NewTicker(5 * time.Second) 290 + for { 291 + select { 292 + case <-ctx.Done(): 293 + lastSeq := atomic.LoadInt64(&fc.lastSeq) 294 + if lastSeq >= 1 { 295 + fc.Logger.Info("persisting final cursor seq value", "seq", lastSeq) 296 + err := fc.PersistCursor(ctx) 297 + if err != nil { 298 + fc.Logger.Error("failed to persist cursor", "err", err, "seq", lastSeq) 299 + } 300 + } 301 + return nil 302 + case <-ticker.C: 303 + lastSeq := atomic.LoadInt64(&fc.lastSeq) 304 + if lastSeq >= 1 { 305 + err := fc.PersistCursor(ctx) 306 + if err != nil { 307 + fc.Logger.Error("failed to persist cursor", "err", err, "seq", lastSeq) 308 + } 309 + } 310 + } 311 + } 312 + }
+186
automod/consumer/ozone.go
··· 1 + package consumer 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "sync/atomic" 8 + "time" 9 + 10 + toolsozone "github.com/bluesky-social/indigo/api/ozone" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + "github.com/bluesky-social/indigo/automod" 13 + "github.com/bluesky-social/indigo/xrpc" 14 + 15 + "github.com/redis/go-redis/v9" 16 + ) 17 + 18 + // TODO: should probably make this not hepa-specific; or even configurable 19 + var ozoneCursorKey = "hepa/ozoneTimestamp" 20 + 21 + type OzoneConsumer struct { 22 + Logger *slog.Logger 23 + RedisClient *redis.Client 24 + OzoneClient *xrpc.Client 25 + Engine *automod.Engine 26 + 27 + // same as lastSeq, but for Ozone timestamp cursor. the value is a string. 28 + lastCursor atomic.Value 29 + } 30 + 31 + func (oc *OzoneConsumer) Run(ctx context.Context) error { 32 + 33 + if oc.Engine == nil { 34 + return fmt.Errorf("nil engine") 35 + } 36 + if oc.OzoneClient == nil { 37 + return fmt.Errorf("nil ozoneclient") 38 + } 39 + 40 + cur, err := oc.ReadLastCursor(ctx) 41 + if err != nil { 42 + return err 43 + } 44 + 45 + if cur == "" { 46 + cur = syntax.DatetimeNow().String() 47 + } 48 + since, err := syntax.ParseDatetime(cur) 49 + if err != nil { 50 + return err 51 + } 52 + 53 + oc.Logger.Info("subscribing to ozone event log", "upstream", oc.OzoneClient.Host, "cursor", cur, "since", since) 54 + var limit int64 = 50 55 + period := time.Second * 5 56 + 57 + for { 58 + //func ModerationQueryEvents(ctx context.Context, c *xrpc.Client, addedLabels []string, addedTags []string, comment string, createdAfter string, createdBefore string, createdBy string, cursor string, hasComment bool, includeAllUserRecords bool, limit int64, removedLabels []string, removedTags []string, reportTypes []string, sortDirection string, subject string, types []string) (*ModerationQueryEvents_Output, error) { 59 + me, err := toolsozone.ModerationQueryEvents( 60 + ctx, 61 + oc.OzoneClient, 62 + nil, // addedLabels: If specified, only events where all of these labels were added are returned 63 + nil, // addedTags: If specified, only events where all of these tags were added are returned 64 + "", // comment: If specified, only events with comments containing the keyword are returned 65 + since.String(), // createdAfter: Retrieve events created after a given timestamp 66 + "", // createdBefore: Retrieve events created before a given timestamp 67 + "", // createdBy 68 + "", // cursor 69 + false, // hasComment: If true, only events with comments are returned 70 + true, // includeAllUserRecords: If true, events on all record types (posts, lists, profile etc.) owned by the did are returned 71 + limit, 72 + nil, // removedLabels: If specified, only events where all of these labels were removed are returned 73 + nil, // removedTags 74 + nil, // reportTypes 75 + "asc", // sortDirection: Sort direction for the events. Defaults to descending order of created at timestamp. 76 + "", // subject 77 + nil, // types: The types of events (fully qualified string in the format of tools.ozone.moderation.defs#modEvent<name>) to filter by. If not specified, all events are returned. 78 + ) 79 + if err != nil { 80 + oc.Logger.Warn("ozone query events failed; sleeping then will retrying", "err", err, "period", period.String()) 81 + time.Sleep(period) 82 + continue 83 + } 84 + 85 + // track if the response contained anything new 86 + anyNewEvents := false 87 + for _, evt := range me.Events { 88 + createdAt, err := syntax.ParseDatetime(evt.CreatedAt) 89 + if err != nil { 90 + return fmt.Errorf("invalid time format for ozone 'createdAt': %w", err) 91 + } 92 + // skip if the timestamp is the exact same 93 + if createdAt == since { 94 + continue 95 + } 96 + anyNewEvents = true 97 + // TODO: is there a race condition here? 98 + if !createdAt.Time().After(since.Time()) { 99 + oc.Logger.Error("out of order ozone event", "createdAt", createdAt, "since", since) 100 + return fmt.Errorf("out of order ozone event") 101 + } 102 + if err = oc.HandleOzoneEvent(ctx, evt); err != nil { 103 + oc.Logger.Error("failed to process ozone event", "event", evt) 104 + } 105 + since = createdAt 106 + oc.lastCursor.Store(since.String()) 107 + } 108 + if !anyNewEvents { 109 + oc.Logger.Debug("... ozone poller sleeping", "period", period.String()) 110 + time.Sleep(period) 111 + } 112 + } 113 + } 114 + 115 + func (oc *OzoneConsumer) HandleOzoneEvent(ctx context.Context, eventView *toolsozone.ModerationDefs_ModEventView) error { 116 + 117 + oc.Logger.Debug("received ozone event", "eventID", eventView.Id, "createdAt", eventView.CreatedAt) 118 + 119 + if err := oc.Engine.ProcessOzoneEvent(ctx, eventView); err != nil { 120 + oc.Logger.Error("engine failed to process ozone event", "err", err) 121 + } 122 + return nil 123 + } 124 + 125 + func (oc *OzoneConsumer) ReadLastCursor(ctx context.Context) (string, error) { 126 + // if redis isn't configured, just skip 127 + if oc.RedisClient == nil { 128 + oc.Logger.Info("redis not configured, skipping ozone cursor read") 129 + return "", nil 130 + } 131 + 132 + val, err := oc.RedisClient.Get(ctx, ozoneCursorKey).Result() 133 + if err == redis.Nil || val == "" { 134 + oc.Logger.Info("no pre-existing ozone cursor in redis") 135 + return "", nil 136 + } else if err != nil { 137 + return "", err 138 + } 139 + oc.Logger.Info("successfully found prior ozone offset timestamp in redis", "cursor", val) 140 + return val, nil 141 + } 142 + 143 + func (oc *OzoneConsumer) PersistCursor(ctx context.Context) error { 144 + // if redis isn't configured, just skip 145 + if oc.RedisClient == nil { 146 + return nil 147 + } 148 + lastCursor := oc.lastCursor.Load() 149 + if lastCursor == nil || lastCursor == "" { 150 + return nil 151 + } 152 + err := oc.RedisClient.Set(ctx, ozoneCursorKey, lastCursor, 14*24*time.Hour).Err() 153 + return err 154 + } 155 + 156 + // this method runs in a loop, persisting the current cursor state every 5 seconds 157 + func (oc *OzoneConsumer) RunPersistCursor(ctx context.Context) error { 158 + 159 + // if redis isn't configured, just skip 160 + if oc.RedisClient == nil { 161 + return nil 162 + } 163 + ticker := time.NewTicker(5 * time.Second) 164 + for { 165 + select { 166 + case <-ctx.Done(): 167 + lastCursor := oc.lastCursor.Load() 168 + if lastCursor != nil && lastCursor != "" { 169 + oc.Logger.Info("persisting final ozone cursor timestamp", "cursor", lastCursor) 170 + err := oc.PersistCursor(ctx) 171 + if err != nil { 172 + oc.Logger.Error("failed to persist ozone cursor", "err", err, "cursor", lastCursor) 173 + } 174 + } 175 + return nil 176 + case <-ticker.C: 177 + lastCursor := oc.lastCursor.Load() 178 + if lastCursor != nil && lastCursor != "" { 179 + err := oc.PersistCursor(ctx) 180 + if err != nil { 181 + oc.Logger.Error("failed to persist ozone cursor", "err", err, "cursor", lastCursor) 182 + } 183 + } 184 + } 185 + } 186 + }
+25
automod/consumer/util.go
··· 1 + package consumer 2 + 3 + import ( 4 + "fmt" 5 + "strings" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + ) 9 + 10 + // TODO: move this to a "ParsePath" helper in syntax package? 11 + func splitRepoPath(path string) (syntax.NSID, syntax.RecordKey, error) { 12 + parts := strings.SplitN(path, "/", 3) 13 + if len(parts) != 2 { 14 + return "", "", fmt.Errorf("invalid record path: %s", path) 15 + } 16 + collection, err := syntax.ParseNSID(parts[0]) 17 + if err != nil { 18 + return "", "", err 19 + } 20 + rkey, err := syntax.ParseRecordKey(parts[1]) 21 + if err != nil { 22 + return "", "", err 23 + } 24 + return collection, rkey, nil 25 + }
+7
automod/engine/context.go
··· 169 169 return out 170 170 } 171 171 172 + // Returns a pointer to the underlying automod engine. This usually should NOT be used in rules. 173 + // 174 + // This is an escape hatch for hacking on the system before features get fully integerated in to the content API surface. The Engine API is not stable. 175 + func (c *BaseContext) InternalEngine() *Engine { 176 + return c.engine 177 + } 178 + 172 179 func NewAccountContext(ctx context.Context, eng *Engine, meta AccountMeta) AccountContext { 173 180 return AccountContext{ 174 181 BaseContext: BaseContext{
+32 -8
automod/engine/engine.go
··· 43 43 AdminClient *xrpc.Client 44 44 // used to fetch blobs from upstream PDS instances 45 45 BlobClient *http.Client 46 + 47 + // internal configuration 48 + Config EngineConfig 49 + } 50 + 51 + type EngineConfig struct { 52 + // if enabled, account metadata is not hydrated for every event by default 53 + SkipAccountMeta bool 46 54 } 47 55 48 56 // Entrypoint for external code pushing arbitrary identity events in to the engine. ··· 80 88 return fmt.Errorf("identity not found for DID: %s", did.String()) 81 89 } 82 90 83 - am, err := eng.GetAccountMeta(ctx, ident) 84 - if err != nil { 85 - eventErrorCount.WithLabelValues("identity").Inc() 86 - return fmt.Errorf("failed to fetch account metadata: %w", err) 91 + var am *AccountMeta 92 + if !eng.Config.SkipAccountMeta { 93 + am, err = eng.GetAccountMeta(ctx, ident) 94 + if err != nil { 95 + eventErrorCount.WithLabelValues("identity").Inc() 96 + return fmt.Errorf("failed to fetch account metadata: %w", err) 97 + } 98 + } else { 99 + am = &AccountMeta{ 100 + Identity: ident, 101 + Profile: ProfileSummary{}, 102 + } 87 103 } 88 104 ac := NewAccountContext(ctx, eng, *am) 89 105 if err := eng.Rules.CallIdentityRules(&ac); err != nil { ··· 136 152 return fmt.Errorf("identity not found for DID: %s", op.DID) 137 153 } 138 154 139 - am, err := eng.GetAccountMeta(ctx, ident) 140 - if err != nil { 141 - eventErrorCount.WithLabelValues("record").Inc() 142 - return fmt.Errorf("failed to fetch account metadata: %w", err) 155 + var am *AccountMeta 156 + if !eng.Config.SkipAccountMeta { 157 + am, err = eng.GetAccountMeta(ctx, ident) 158 + if err != nil { 159 + eventErrorCount.WithLabelValues("identity").Inc() 160 + return fmt.Errorf("failed to fetch account metadata: %w", err) 161 + } 162 + } else { 163 + am = &AccountMeta{ 164 + Identity: ident, 165 + Profile: ProfileSummary{}, 166 + } 143 167 } 144 168 rc := NewRecordContext(ctx, eng, *am, op) 145 169 rc.Logger.Debug("processing record")
+2 -2
automod/engine/fetch_account_meta.go
··· 24 24 25 25 // fallback in case client wasn't configured (eg, testing) 26 26 if e.BskyClient == nil { 27 - logger.Warn("skipping account meta hydration") 27 + logger.Debug("skipping account meta hydration") 28 28 am := AccountMeta{ 29 29 Identity: ident, 30 30 Profile: ProfileSummary{}, ··· 64 64 // most common cause of this is a race between automod and ozone/appview for new accounts. just sleep a couple seconds and retry! 65 65 var xrpcError *xrpc.Error 66 66 if err != nil && errors.As(err, &xrpcError) && (xrpcError.StatusCode == 400 || xrpcError.StatusCode == 404) { 67 - logger.Info("account profile lookup initially failed (from bsky appview), will retry", "err", err, "sleepDuration", newAccountRetryDuration) 67 + logger.Debug("account profile lookup initially failed (from bsky appview), will retry", "err", err, "sleepDuration", newAccountRetryDuration) 68 68 time.Sleep(newAccountRetryDuration) 69 69 pv, err = appbsky.ActorGetProfile(ctx, e.BskyClient, ident.DID.String()) 70 70 }
+1
automod/pkg.go
··· 6 6 ) 7 7 8 8 type Engine = engine.Engine 9 + type EngineConfig = engine.EngineConfig 9 10 type AccountMeta = engine.AccountMeta 10 11 type ProfileSummary = engine.ProfileSummary 11 12 type AccountPrivate = engine.AccountPrivate
-240
cmd/hepa/consumer.go
··· 1 - package main 2 - 3 - import ( 4 - "bytes" 5 - "context" 6 - "fmt" 7 - "net/http" 8 - "net/url" 9 - "strings" 10 - "sync/atomic" 11 - 12 - comatproto "github.com/bluesky-social/indigo/api/atproto" 13 - "github.com/bluesky-social/indigo/atproto/syntax" 14 - "github.com/bluesky-social/indigo/automod" 15 - "github.com/bluesky-social/indigo/events/schedulers/autoscaling" 16 - "github.com/bluesky-social/indigo/events/schedulers/parallel" 17 - lexutil "github.com/bluesky-social/indigo/lex/util" 18 - 19 - "github.com/bluesky-social/indigo/events" 20 - "github.com/bluesky-social/indigo/repo" 21 - "github.com/bluesky-social/indigo/repomgr" 22 - "github.com/carlmjohnson/versioninfo" 23 - "github.com/gorilla/websocket" 24 - ) 25 - 26 - func (s *Server) RunConsumer(ctx context.Context) error { 27 - 28 - cur, err := s.ReadLastCursor(ctx) 29 - if err != nil { 30 - return err 31 - } 32 - 33 - dialer := websocket.DefaultDialer 34 - u, err := url.Parse(s.relayHost) 35 - if err != nil { 36 - return fmt.Errorf("invalid relayHost URI: %w", err) 37 - } 38 - u.Path = "xrpc/com.atproto.sync.subscribeRepos" 39 - if cur != 0 { 40 - u.RawQuery = fmt.Sprintf("cursor=%d", cur) 41 - } 42 - s.logger.Info("subscribing to repo event stream", "upstream", s.relayHost, "cursor", cur) 43 - con, _, err := dialer.Dial(u.String(), http.Header{ 44 - "User-Agent": []string{fmt.Sprintf("hepa/%s", versioninfo.Short())}, 45 - }) 46 - if err != nil { 47 - return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 48 - } 49 - 50 - rsc := &events.RepoStreamCallbacks{ 51 - RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 52 - atomic.StoreInt64(&s.lastSeq, evt.Seq) 53 - return s.HandleRepoCommit(ctx, evt) 54 - }, 55 - RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 56 - atomic.StoreInt64(&s.lastSeq, evt.Seq) 57 - did, err := syntax.ParseDID(evt.Did) 58 - if err != nil { 59 - s.logger.Error("bad DID in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err) 60 - return nil 61 - } 62 - if err := s.engine.ProcessIdentityEvent(ctx, "identity", did); err != nil { 63 - s.logger.Error("processing repo identity failed", "did", evt.Did, "seq", evt.Seq, "err", err) 64 - } 65 - return nil 66 - }, 67 - RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 68 - atomic.StoreInt64(&s.lastSeq, evt.Seq) 69 - did, err := syntax.ParseDID(evt.Did) 70 - if err != nil { 71 - s.logger.Error("bad DID in RepoAccount event", "did", evt.Did, "seq", evt.Seq, "err", err) 72 - return nil 73 - } 74 - if err := s.engine.ProcessIdentityEvent(ctx, "account", did); err != nil { 75 - s.logger.Error("processing repo account failed", "did", evt.Did, "seq", evt.Seq, "err", err) 76 - } 77 - return nil 78 - }, 79 - // TODO: deprecated 80 - RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 81 - atomic.StoreInt64(&s.lastSeq, evt.Seq) 82 - did, err := syntax.ParseDID(evt.Did) 83 - if err != nil { 84 - s.logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 85 - return nil 86 - } 87 - if err := s.engine.ProcessIdentityEvent(ctx, "handle", did); err != nil { 88 - s.logger.Error("processing handle update failed", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 89 - } 90 - return nil 91 - }, 92 - // TODO: deprecated 93 - RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { 94 - atomic.StoreInt64(&s.lastSeq, evt.Seq) 95 - did, err := syntax.ParseDID(evt.Did) 96 - if err != nil { 97 - s.logger.Error("bad DID in RepoTombstone event", "did", evt.Did, "seq", evt.Seq, "err", err) 98 - return nil 99 - } 100 - if err := s.engine.ProcessIdentityEvent(ctx, "tombstone", did); err != nil { 101 - s.logger.Error("processing repo tombstone failed", "did", evt.Did, "seq", evt.Seq, "err", err) 102 - } 103 - return nil 104 - }, 105 - } 106 - 107 - var scheduler events.Scheduler 108 - if s.firehoseParallelism > 0 { 109 - // use a fixed-parallelism scheduler if configured 110 - scheduler = parallel.NewScheduler( 111 - s.firehoseParallelism, 112 - 1000, 113 - s.relayHost, 114 - rsc.EventHandler, 115 - ) 116 - s.logger.Info("hepa scheduler configured", "scheduler", "parallel", "initial", s.firehoseParallelism) 117 - } else { 118 - // otherwise use auto-scaling scheduler 119 - scaleSettings := autoscaling.DefaultAutoscaleSettings() 120 - // start at higher parallelism (somewhat arbitrary) 121 - scaleSettings.Concurrency = 4 122 - scaleSettings.MaxConcurrency = 200 123 - scheduler = autoscaling.NewScheduler(scaleSettings, s.relayHost, rsc.EventHandler) 124 - s.logger.Info("hepa scheduler configured", "scheduler", "autoscaling", "initial", scaleSettings.Concurrency, "max", scaleSettings.MaxConcurrency) 125 - } 126 - 127 - return events.HandleRepoStream(ctx, con, scheduler) 128 - } 129 - 130 - // TODO: move this to a "ParsePath" helper in syntax package? 131 - func splitRepoPath(path string) (syntax.NSID, syntax.RecordKey, error) { 132 - parts := strings.SplitN(path, "/", 3) 133 - if len(parts) != 2 { 134 - return "", "", fmt.Errorf("invalid record path: %s", path) 135 - } 136 - collection, err := syntax.ParseNSID(parts[0]) 137 - if err != nil { 138 - return "", "", err 139 - } 140 - rkey, err := syntax.ParseRecordKey(parts[1]) 141 - if err != nil { 142 - return "", "", err 143 - } 144 - return collection, rkey, nil 145 - } 146 - 147 - // NOTE: for now, this function basically never errors, just logs and returns nil. Should think through error processing better. 148 - func (s *Server) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 149 - 150 - logger := s.logger.With("event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq) 151 - logger.Debug("received commit event") 152 - 153 - if evt.TooBig { 154 - logger.Warn("skipping tooBig events for now") 155 - return nil 156 - } 157 - 158 - did, err := syntax.ParseDID(evt.Repo) 159 - if err != nil { 160 - logger.Error("bad DID syntax in event", "err", err) 161 - return nil 162 - } 163 - 164 - rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 165 - if err != nil { 166 - logger.Error("failed to read repo from car", "err", err) 167 - return nil 168 - } 169 - 170 - // empty commit is a special case, temporarily, basically indicates "new account" 171 - if len(evt.Ops) == 0 { 172 - if err := s.engine.ProcessIdentityEvent(ctx, "create", did); err != nil { 173 - s.logger.Error("processing handle update failed", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq, "err", err) 174 - } 175 - } 176 - 177 - for _, op := range evt.Ops { 178 - logger = logger.With("eventKind", op.Action, "path", op.Path) 179 - collection, rkey, err := splitRepoPath(op.Path) 180 - if err != nil { 181 - logger.Error("invalid path in repo op") 182 - return nil 183 - } 184 - 185 - ek := repomgr.EventKind(op.Action) 186 - switch ek { 187 - case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 188 - // read the record bytes from blocks, and verify CID 189 - rc, recCBOR, err := rr.GetRecordBytes(ctx, op.Path) 190 - if err != nil { 191 - logger.Error("reading record from event blocks (CAR)", "err", err) 192 - break 193 - } 194 - if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid { 195 - logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid) 196 - break 197 - } 198 - var action string 199 - switch ek { 200 - case repomgr.EvtKindCreateRecord: 201 - action = automod.CreateOp 202 - case repomgr.EvtKindUpdateRecord: 203 - action = automod.UpdateOp 204 - default: 205 - logger.Error("impossible event kind", "kind", ek) 206 - break 207 - } 208 - recCID := syntax.CID(op.Cid.String()) 209 - err = s.engine.ProcessRecordOp(ctx, automod.RecordOp{ 210 - Action: action, 211 - DID: did, 212 - Collection: collection, 213 - RecordKey: rkey, 214 - CID: &recCID, 215 - RecordCBOR: *recCBOR, 216 - }) 217 - if err != nil { 218 - logger.Error("engine failed to process record", "err", err) 219 - continue 220 - } 221 - case repomgr.EvtKindDeleteRecord: 222 - err = s.engine.ProcessRecordOp(ctx, automod.RecordOp{ 223 - Action: automod.DeleteOp, 224 - DID: did, 225 - Collection: collection, 226 - RecordKey: rkey, 227 - CID: nil, 228 - RecordCBOR: nil, 229 - }) 230 - if err != nil { 231 - logger.Error("engine failed to process record", "err", err) 232 - continue 233 - } 234 - default: 235 - // TODO: should this be an error? 236 - } 237 - } 238 - 239 - return nil 240 - }
-97
cmd/hepa/consumer_ozone.go
··· 1 - package main 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "time" 7 - 8 - toolsozone "github.com/bluesky-social/indigo/api/ozone" 9 - "github.com/bluesky-social/indigo/atproto/syntax" 10 - ) 11 - 12 - func (s *Server) RunOzoneConsumer(ctx context.Context) error { 13 - 14 - cur, err := s.ReadLastOzoneCursor(ctx) 15 - if err != nil { 16 - return err 17 - } 18 - 19 - if cur == "" { 20 - cur = syntax.DatetimeNow().String() 21 - } 22 - since, err := syntax.ParseDatetime(cur) 23 - if err != nil { 24 - return err 25 - } 26 - 27 - s.logger.Info("subscribing to ozone event log", "upstream", s.engine.OzoneClient.Host, "cursor", cur, "since", since) 28 - var limit int64 = 50 29 - period := time.Second * 5 30 - 31 - for { 32 - //func ModerationQueryEvents(ctx context.Context, c *xrpc.Client, addedLabels []string, addedTags []string, comment string, createdAfter string, createdBefore string, createdBy string, cursor string, hasComment bool, includeAllUserRecords bool, limit int64, removedLabels []string, removedTags []string, reportTypes []string, sortDirection string, subject string, types []string) (*ModerationQueryEvents_Output, error) { 33 - me, err := toolsozone.ModerationQueryEvents( 34 - ctx, 35 - s.engine.OzoneClient, 36 - nil, // addedLabels: If specified, only events where all of these labels were added are returned 37 - nil, // addedTags: If specified, only events where all of these tags were added are returned 38 - "", // comment: If specified, only events with comments containing the keyword are returned 39 - since.String(), // createdAfter: Retrieve events created after a given timestamp 40 - "", // createdBefore: Retrieve events created before a given timestamp 41 - "", // createdBy 42 - "", // cursor 43 - false, // hasComment: If true, only events with comments are returned 44 - true, // includeAllUserRecords: If true, events on all record types (posts, lists, profile etc.) owned by the did are returned 45 - limit, 46 - nil, // removedLabels: If specified, only events where all of these labels were removed are returned 47 - nil, // removedTags 48 - nil, // reportTypes 49 - "asc", // sortDirection: Sort direction for the events. Defaults to descending order of created at timestamp. 50 - "", // subject 51 - nil, // types: The types of events (fully qualified string in the format of tools.ozone.moderation.defs#modEvent<name>) to filter by. If not specified, all events are returned. 52 - ) 53 - if err != nil { 54 - s.logger.Warn("ozone query events failed; sleeping then will retrying", "err", err, "period", period.String()) 55 - time.Sleep(period) 56 - continue 57 - } 58 - 59 - // track if the response contained anything new 60 - anyNewEvents := false 61 - for _, evt := range me.Events { 62 - createdAt, err := syntax.ParseDatetime(evt.CreatedAt) 63 - if err != nil { 64 - return fmt.Errorf("invalid time format for ozone 'createdAt': %w", err) 65 - } 66 - // skip if the timestamp is the exact same 67 - if createdAt == since { 68 - continue 69 - } 70 - anyNewEvents = true 71 - // TODO: is there a race condition here? 72 - if !createdAt.Time().After(since.Time()) { 73 - s.logger.Error("out of order ozone event", "createdAt", createdAt, "since", since) 74 - return fmt.Errorf("out of order ozone event") 75 - } 76 - if err = s.HandleOzoneEvent(ctx, evt); err != nil { 77 - s.logger.Error("failed to process ozone event", "event", evt) 78 - } 79 - since = createdAt 80 - s.lastOzoneCursor.Store(since.String()) 81 - } 82 - if !anyNewEvents { 83 - s.logger.Debug("... ozone poller sleeping", "period", period.String()) 84 - time.Sleep(period) 85 - } 86 - } 87 - } 88 - 89 - func (s *Server) HandleOzoneEvent(ctx context.Context, eventView *toolsozone.ModerationDefs_ModEventView) error { 90 - 91 - s.logger.Debug("received ozone event", "eventID", eventView.Id, "createdAt", eventView.CreatedAt) 92 - 93 - if err := s.engine.ProcessOzoneEvent(ctx, eventView); err != nil { 94 - s.logger.Error("engine failed to process ozone event", "err", err) 95 - } 96 - return nil 97 - }
+43 -24
cmd/hepa/main.go
··· 17 17 "github.com/bluesky-social/indigo/atproto/identity/redisdir" 18 18 "github.com/bluesky-social/indigo/atproto/syntax" 19 19 "github.com/bluesky-social/indigo/automod/capture" 20 + "github.com/bluesky-social/indigo/automod/consumer" 20 21 21 22 "github.com/carlmjohnson/versioninfo" 22 23 _ "github.com/joho/godotenv/autoload" ··· 236 237 dir, 237 238 Config{ 238 239 Logger: logger, 239 - RelayHost: cctx.String("atp-relay-host"), 240 + RelayHost: cctx.String("atp-relay-host"), // DEPRECATED 240 241 BskyHost: cctx.String("atp-bsky-host"), 241 242 OzoneHost: cctx.String("atp-ozone-host"), 242 243 OzoneDID: cctx.String("ozone-did"), ··· 251 252 AbyssPassword: cctx.String("abyss-password"), 252 253 RatelimitBypass: cctx.String("ratelimit-bypass"), 253 254 RulesetName: cctx.String("ruleset"), 254 - FirehoseParallelism: cctx.Int("firehose-parallelism"), 255 + FirehoseParallelism: cctx.Int("firehose-parallelism"), // DEPRECATED 255 256 PreScreenHost: cctx.String("prescreen-host"), 256 257 PreScreenToken: cctx.String("prescreen-token"), 257 258 }, ··· 260 261 return fmt.Errorf("failed to construct server: %v", err) 261 262 } 262 263 263 - // prometheus HTTP endpoint: /metrics 264 - go func() { 265 - runtime.SetBlockProfileRate(10) 266 - runtime.SetMutexProfileFraction(10) 267 - if err := srv.RunMetrics(cctx.String("metrics-listen")); err != nil { 268 - slog.Error("failed to start metrics endpoint", "error", err) 269 - panic(fmt.Errorf("failed to start metrics endpoint: %w", err)) 264 + // firehose event consumer 265 + relayHost := cctx.String("atp-relay-host") 266 + if relayHost != "" { 267 + fc := consumer.FirehoseConsumer{ 268 + Engine: srv.Engine, 269 + Logger: logger.With("subsystem", "firehose-consumer"), 270 + Host: cctx.String("atp-relay-host"), 271 + Parallelism: cctx.Int("firehose-parallelism"), 272 + RedisClient: srv.RedisClient, 270 273 } 271 - }() 272 274 273 - go func() { 274 - if err := srv.RunPersistCursor(ctx); err != nil { 275 - slog.Error("cursor routine failed", "err", err) 275 + go func() { 276 + if err := fc.RunPersistCursor(ctx); err != nil { 277 + slog.Error("cursor routine failed", "err", err) 278 + } 279 + }() 280 + 281 + if err := fc.Run(ctx); err != nil { 282 + return fmt.Errorf("failure consuming and processing firehose: %w", err) 276 283 } 277 - }() 284 + } 278 285 279 286 // ozone event consumer (if configured) 280 - if srv.engine.OzoneClient != nil { 287 + if srv.Engine.OzoneClient != nil { 288 + oc := consumer.OzoneConsumer{ 289 + Engine: srv.Engine, 290 + Logger: logger.With("subsystem", "ozone-consumer"), 291 + RedisClient: srv.RedisClient, 292 + } 293 + 281 294 go func() { 282 - if err := srv.RunOzoneConsumer(ctx); err != nil { 295 + if err := oc.Run(ctx); err != nil { 283 296 slog.Error("ozone consumer failed", "err", err) 284 297 } 285 298 }() 286 299 287 300 go func() { 288 - if err := srv.RunPersistOzoneCursor(ctx); err != nil { 301 + if err := oc.RunPersistCursor(ctx); err != nil { 289 302 slog.Error("ozone cursor routine failed", "err", err) 290 303 } 291 304 }() 292 305 } 293 306 294 - // firehose event consumer (main processor) 295 - if err := srv.RunConsumer(ctx); err != nil { 296 - return fmt.Errorf("failure consuming and processing firehose: %w", err) 297 - } 307 + // prometheus HTTP endpoint: /metrics 308 + go func() { 309 + runtime.SetBlockProfileRate(10) 310 + runtime.SetMutexProfileFraction(10) 311 + if err := srv.RunMetrics(cctx.String("metrics-listen")); err != nil { 312 + slog.Error("failed to start metrics endpoint", "error", err) 313 + panic(fmt.Errorf("failed to start metrics endpoint: %w", err)) 314 + } 315 + }() 316 + 298 317 return nil 299 318 }, 300 319 } ··· 355 374 return err 356 375 } 357 376 358 - return capture.FetchAndProcessRecord(ctx, srv.engine, aturi) 377 + return capture.FetchAndProcessRecord(ctx, srv.Engine, aturi) 359 378 }, 360 379 } 361 380 ··· 386 405 return err 387 406 } 388 407 389 - return capture.FetchAndProcessRecent(ctx, srv.engine, *atid, cctx.Int("limit")) 408 + return capture.FetchAndProcessRecent(ctx, srv.Engine, *atid, cctx.Int("limit")) 390 409 }, 391 410 } 392 411 ··· 417 436 return err 418 437 } 419 438 420 - cap, err := capture.CaptureRecent(ctx, srv.engine, *atid, cctx.Int("limit")) 439 + cap, err := capture.CaptureRecent(ctx, srv.Engine, *atid, cctx.Int("limit")) 421 440 if err != nil { 422 441 return err 423 442 }
+9 -147
cmd/hepa/server.go
··· 7 7 "net/http" 8 8 "os" 9 9 "strings" 10 - "sync/atomic" 11 10 "time" 12 11 13 12 "github.com/bluesky-social/indigo/atproto/identity" ··· 27 26 ) 28 27 29 28 type Server struct { 30 - relayHost string 31 - firehoseParallelism int 29 + Engine *automod.Engine 30 + RedisClient *redis.Client 31 + 32 + relayHost string // DEPRECATED 33 + firehoseParallelism int // DEPRECATED 32 34 logger *slog.Logger 33 - engine *automod.Engine 34 - rdb *redis.Client 35 - 36 - // lastSeq is the most recent event sequence number we've received and begun to handle. 37 - // This number is periodically persisted to redis, if redis is present. 38 - // The value is best-effort (the stream handling itself is concurrent, so event numbers may not be monotonic), 39 - // but nonetheless, you must use atomics when updating or reading this (to avoid data races). 40 - lastSeq int64 41 - 42 - // same as lastSeq, but for Ozone timestamp cursor. the value is a string. 43 - lastOzoneCursor atomic.Value 44 35 } 45 36 46 37 type Config struct { 47 38 Logger *slog.Logger 48 - RelayHost string 39 + RelayHost string // DEPRECATED 49 40 BskyHost string 50 41 OzoneHost string 51 42 OzoneDID string ··· 60 51 AbyssPassword string 61 52 RulesetName string 62 53 RatelimitBypass string 63 - FirehoseParallelism int 54 + FirehoseParallelism int // DEPRECATED 64 55 PreScreenHost string 65 56 PreScreenToken string 66 57 } ··· 234 225 relayHost: config.RelayHost, 235 226 firehoseParallelism: config.FirehoseParallelism, 236 227 logger: logger, 237 - engine: &engine, 238 - rdb: rdb, 228 + Engine: &engine, 229 + RedisClient: rdb, 239 230 } 240 231 241 232 return s, nil ··· 245 236 http.Handle("/metrics", promhttp.Handler()) 246 237 return http.ListenAndServe(listen, nil) 247 238 } 248 - 249 - var cursorKey = "hepa/seq" 250 - var ozoneCursorKey = "hepa/ozoneTimestamp" 251 - 252 - func (s *Server) ReadLastCursor(ctx context.Context) (int64, error) { 253 - // if redis isn't configured, just skip 254 - if s.rdb == nil { 255 - s.logger.Info("redis not configured, skipping cursor read") 256 - return 0, nil 257 - } 258 - 259 - val, err := s.rdb.Get(ctx, cursorKey).Int64() 260 - if err == redis.Nil { 261 - s.logger.Info("no pre-existing cursor in redis") 262 - return 0, nil 263 - } else if err != nil { 264 - return 0, err 265 - } 266 - s.logger.Info("successfully found prior subscription cursor seq in redis", "seq", val) 267 - return val, nil 268 - } 269 - 270 - func (s *Server) ReadLastOzoneCursor(ctx context.Context) (string, error) { 271 - // if redis isn't configured, just skip 272 - if s.rdb == nil { 273 - s.logger.Info("redis not configured, skipping ozone cursor read") 274 - return "", nil 275 - } 276 - 277 - val, err := s.rdb.Get(ctx, ozoneCursorKey).Result() 278 - if err == redis.Nil || val == "" { 279 - s.logger.Info("no pre-existing ozone cursor in redis") 280 - return "", nil 281 - } else if err != nil { 282 - return "", err 283 - } 284 - s.logger.Info("successfully found prior ozone offset timestamp in redis", "cursor", val) 285 - return val, nil 286 - } 287 - 288 - func (s *Server) PersistCursor(ctx context.Context) error { 289 - // if redis isn't configured, just skip 290 - if s.rdb == nil { 291 - return nil 292 - } 293 - lastSeq := atomic.LoadInt64(&s.lastSeq) 294 - if lastSeq <= 0 { 295 - return nil 296 - } 297 - err := s.rdb.Set(ctx, cursorKey, lastSeq, 14*24*time.Hour).Err() 298 - return err 299 - } 300 - 301 - func (s *Server) PersistOzoneCursor(ctx context.Context) error { 302 - // if redis isn't configured, just skip 303 - if s.rdb == nil { 304 - return nil 305 - } 306 - lastCursor := s.lastOzoneCursor.Load() 307 - if lastCursor == nil || lastCursor == "" { 308 - return nil 309 - } 310 - err := s.rdb.Set(ctx, ozoneCursorKey, lastCursor, 14*24*time.Hour).Err() 311 - return err 312 - } 313 - 314 - // this method runs in a loop, persisting the current cursor state every 5 seconds 315 - func (s *Server) RunPersistCursor(ctx context.Context) error { 316 - 317 - // if redis isn't configured, just skip 318 - if s.rdb == nil { 319 - return nil 320 - } 321 - ticker := time.NewTicker(5 * time.Second) 322 - for { 323 - select { 324 - case <-ctx.Done(): 325 - lastSeq := atomic.LoadInt64(&s.lastSeq) 326 - if lastSeq >= 1 { 327 - s.logger.Info("persisting final cursor seq value", "seq", lastSeq) 328 - err := s.PersistCursor(ctx) 329 - if err != nil { 330 - s.logger.Error("failed to persist cursor", "err", err, "seq", lastSeq) 331 - } 332 - } 333 - return nil 334 - case <-ticker.C: 335 - lastSeq := atomic.LoadInt64(&s.lastSeq) 336 - if lastSeq >= 1 { 337 - err := s.PersistCursor(ctx) 338 - if err != nil { 339 - s.logger.Error("failed to persist cursor", "err", err, "seq", lastSeq) 340 - } 341 - } 342 - } 343 - } 344 - } 345 - 346 - // this method runs in a loop, persisting the current cursor state every 5 seconds 347 - func (s *Server) RunPersistOzoneCursor(ctx context.Context) error { 348 - 349 - // if redis isn't configured, just skip 350 - if s.rdb == nil { 351 - return nil 352 - } 353 - ticker := time.NewTicker(5 * time.Second) 354 - for { 355 - select { 356 - case <-ctx.Done(): 357 - lastCursor := s.lastOzoneCursor.Load() 358 - if lastCursor != nil && lastCursor != "" { 359 - s.logger.Info("persisting final ozone cursor timestamp", "cursor", lastCursor) 360 - err := s.PersistOzoneCursor(ctx) 361 - if err != nil { 362 - s.logger.Error("failed to persist ozone cursor", "err", err, "cursor", lastCursor) 363 - } 364 - } 365 - return nil 366 - case <-ticker.C: 367 - lastCursor := s.lastOzoneCursor.Load() 368 - if lastCursor != nil && lastCursor != "" { 369 - err := s.PersistOzoneCursor(ctx) 370 - if err != nil { 371 - s.logger.Error("failed to persist ozone cursor", "err", err, "cursor", lastCursor) 372 - } 373 - } 374 - } 375 - } 376 - }