Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

appview/ingester: harden event processing and cursor

This is more of a practical proposal than solving a direct
problem. We fire-and-forget our cursor updates even if
an event fails. Instead, we should have it in the same thread
as the record ingestion and not be silent about errors.
Also for verification I think a little retry-backoff would be in order.

Lewis: May this revision serve well! <lewis@tangled.org>

authored by

Lewis and committed by tangled.org c184a759 7ad26e44

+39 -35
+21 -12
appview/ingester.go
··· 12 12 13 13 "time" 14 14 15 + "github.com/avast/retry-go/v4" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 16 17 jmodels "github.com/bluesky-social/jetstream/pkg/models" 17 18 "github.com/go-git/go-git/v5/plumbing" ··· 41 42 func (i *Ingester) Ingest() processFunc { 42 43 return func(ctx context.Context, e *jmodels.Event) error { 43 44 var err error 44 - defer func() { 45 - eventTime := e.TimeUS 46 - lastTimeUs := eventTime + 1 47 - if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 48 - err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 49 - } 50 - }() 51 45 52 46 l := i.Logger.With("kind", e.Kind) 53 47 switch e.Kind { ··· 92 86 } 93 87 94 88 if err != nil { 95 - l.Warn("refused to ingest record", "err", err) 89 + l.Warn("failed to ingest record, skipping", "err", err) 90 + } 91 + 92 + lastTimeUs := e.TimeUS + 1 93 + if saveErr := i.Db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 94 + l.Error("failed to save cursor", "err", saveErr) 96 95 } 97 96 98 97 return nil ··· 560 559 return err 561 560 } 562 561 563 - err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 562 + err = retry.Do( 563 + func() error { return serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) }, 564 + retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 565 + retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 566 + ) 564 567 if err != nil { 565 - l.Error("failed to add spindle to db", "err", err, "instance", instance) 568 + l.Error("failed to verify spindle after retries", "err", err, "instance", instance) 566 569 return err 567 570 } 568 571 ··· 778 781 return err 779 782 } 780 783 781 - err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 784 + err = retry.Do( 785 + func() error { 786 + return serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 787 + }, 788 + retry.Attempts(5), retry.Delay(5*time.Second), retry.MaxDelay(80*time.Second), 789 + retry.DelayType(retry.BackOffDelay), retry.LastErrorOnly(true), 790 + ) 782 791 if err != nil { 783 - l.Error("failed to verify knot", "err", err, "domain", domain) 792 + l.Error("failed to verify knot after retries", "err", err, "domain", domain) 784 793 return err 785 794 } 786 795
+1
appview/state/state.go
··· 123 123 tangled.KnotMemberNSID, 124 124 tangled.SpindleMemberNSID, 125 125 tangled.SpindleNSID, 126 + tangled.KnotNSID, 126 127 tangled.StringNSID, 127 128 tangled.RepoIssueNSID, 128 129 tangled.RepoIssueCommentNSID,
+4 -4
eventconsumer/consumer.go
··· 159 159 return 160 160 } 161 161 162 + if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil { 163 + c.logger.Error("error processing message", "source", j.source, "err", err) 164 + } 165 + 162 166 cursorVal := msg.Created 163 167 if cursorVal == 0 { 164 168 cursorVal = time.Now().UnixNano() 165 169 } 166 170 c.cfg.CursorStore.Set(j.source.Key(), cursorVal) 167 - 168 - if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil { 169 - c.logger.Error("error processing message", "source", j.source, "err", err) 170 - } 171 171 } 172 172 } 173 173 }
+6 -9
knotserver/ingester.go
··· 363 363 } 364 364 365 365 var err error 366 - defer func() { 367 - eventTime := event.TimeUS 368 - lastTimeUs := eventTime + 1 369 - if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 370 - err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 371 - } 372 - }() 373 - 374 366 switch event.Commit.Collection { 375 367 case tangled.PublicKeyNSID: 376 368 err = h.processPublicKey(ctx, event) ··· 383 375 } 384 376 385 377 if err != nil { 386 - h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) 378 + h.l.Warn("failed to process event, skipping", "nsid", event.Commit.Collection, "err", err) 379 + } 380 + 381 + lastTimeUs := event.TimeUS + 1 382 + if saveErr := h.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 383 + h.l.Error("failed to save cursor", "err", saveErr) 387 384 } 388 385 389 386 return nil
+7 -10
spindle/ingester.go
··· 24 24 25 25 func (s *Spindle) ingest() Ingester { 26 26 return func(ctx context.Context, e *models.Event) error { 27 - var err error 28 - defer func() { 29 - eventTime := e.TimeUS 30 - lastTimeUs := eventTime + 1 31 - if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 32 - err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 33 - } 34 - }() 35 - 36 27 if e.Kind != models.EventKindCommit { 37 28 return nil 38 29 } 39 30 31 + var err error 40 32 switch e.Commit.Collection { 41 33 case tangled.SpindleMemberNSID: 42 34 err = s.ingestMember(ctx, e) ··· 47 39 } 48 40 49 41 if err != nil { 50 - s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err) 42 + s.l.Warn("failed to process message, skipping", "nsid", e.Commit.Collection, "err", err) 43 + } 44 + 45 + lastTimeUs := e.TimeUS + 1 46 + if saveErr := s.db.SaveLastTimeUs(lastTimeUs); saveErr != nil { 47 + s.l.Error("failed to save cursor", "err", saveErr) 51 48 } 52 49 53 50 return nil