this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

use atomic

dholms a405f134 ee8e5d81

+9 -17
+9 -17
nexus/processor.go
··· 5 5 "encoding/json" 6 6 "fmt" 7 7 "log/slog" 8 - "sync" 8 + "sync/atomic" 9 9 "time" 10 10 11 11 comatproto "github.com/bluesky-social/indigo/api/atproto" ··· 24 24 RelayHost string 25 25 Outbox *Outbox 26 26 27 - lastSeq int64 28 - seqMu sync.Mutex 27 + lastSeq atomic.Int64 29 28 } 30 29 31 30 func (ep *EventProcessor) ProcessCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 32 - defer ep.trackLastSeq(evt.Seq) 31 + defer ep.lastSeq.Swap(evt.Seq) 33 32 34 33 curr, err := ep.GetRepoState(evt.Repo) 35 34 if err != nil { ··· 150 149 } 151 150 152 151 func (ep *EventProcessor) ProcessSync(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) error { 153 - defer ep.trackLastSeq(evt.Seq) 152 + defer ep.lastSeq.Swap(evt.Seq) 154 153 155 154 curr, err := ep.GetRepoState(evt.Did) 156 155 if err != nil { ··· 187 186 } 188 187 189 188 func (ep *EventProcessor) ProcessIdentity(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) error { 190 - defer ep.trackLastSeq(evt.Seq) 189 + defer ep.lastSeq.Swap(evt.Seq) 191 190 return ep.RefreshIdentity(ctx, evt.Did) 192 191 } 193 192 ··· 236 235 } 237 236 238 237 func (ep *EventProcessor) ProcessAccount(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account) error { 238 + defer ep.lastSeq.Swap(evt.Seq) 239 + 239 240 curr, err := ep.GetRepoState(evt.Did) 240 241 if err != nil { 241 242 return err ··· 389 390 }) 390 391 } 391 392 392 - func (ep *EventProcessor) trackLastSeq(seq int64) { 393 - ep.seqMu.Lock() 394 - ep.lastSeq = seq 395 - ep.seqMu.Unlock() 396 - } 397 - 398 393 func (ep *EventProcessor) saveCursor(ctx context.Context) error { 399 - ep.seqMu.Lock() 400 - seq := ep.lastSeq 401 - ep.seqMu.Unlock() 402 - 403 - if seq == 0 { 394 + seq := ep.lastSeq.Load() 395 + if seq < 1 { 404 396 return nil 405 397 } 406 398