this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Support #account events in the Relay event stream

+369 -29
+9 -7
api/atproto/servercreateSession.go
··· 12 12 13 13 // ServerCreateSession_Input is the input argument to a com.atproto.server.createSession call. 14 14 type ServerCreateSession_Input struct { 15 + AuthFactorToken *string `json:"authFactorToken,omitempty" cborgen:"authFactorToken,omitempty"` 15 16 // identifier: Handle or other identifier supported by the server for the authenticating user. 16 17 Identifier string `json:"identifier" cborgen:"identifier"` 17 18 Password string `json:"password" cborgen:"password"` ··· 19 20 20 21 // ServerCreateSession_Output is the output of a com.atproto.server.createSession call. 21 22 type ServerCreateSession_Output struct { 22 - AccessJwt string `json:"accessJwt" cborgen:"accessJwt"` 23 - Did string `json:"did" cborgen:"did"` 24 - DidDoc *interface{} `json:"didDoc,omitempty" cborgen:"didDoc,omitempty"` 25 - Email *string `json:"email,omitempty" cborgen:"email,omitempty"` 26 - EmailConfirmed *bool `json:"emailConfirmed,omitempty" cborgen:"emailConfirmed,omitempty"` 27 - Handle string `json:"handle" cborgen:"handle"` 28 - RefreshJwt string `json:"refreshJwt" cborgen:"refreshJwt"` 23 + AccessJwt string `json:"accessJwt" cborgen:"accessJwt"` 24 + Did string `json:"did" cborgen:"did"` 25 + DidDoc *interface{} `json:"didDoc,omitempty" cborgen:"didDoc,omitempty"` 26 + Email *string `json:"email,omitempty" cborgen:"email,omitempty"` 27 + EmailAuthFactor *bool `json:"emailAuthFactor,omitempty" cborgen:"emailAuthFactor,omitempty"` 28 + EmailConfirmed *bool `json:"emailConfirmed,omitempty" cborgen:"emailConfirmed,omitempty"` 29 + Handle string `json:"handle" cborgen:"handle"` 30 + RefreshJwt string `json:"refreshJwt" cborgen:"refreshJwt"` 29 31 } 30 32 31 33 // ServerCreateSession calls the XRPC method "com.atproto.server.createSession".
+6 -5
api/atproto/servergetSession.go
··· 12 12 13 13 // ServerGetSession_Output is the output of a com.atproto.server.getSession call. 14 14 type ServerGetSession_Output struct { 15 - Did string `json:"did" cborgen:"did"` 16 - DidDoc *interface{} `json:"didDoc,omitempty" cborgen:"didDoc,omitempty"` 17 - Email *string `json:"email,omitempty" cborgen:"email,omitempty"` 18 - EmailConfirmed *bool `json:"emailConfirmed,omitempty" cborgen:"emailConfirmed,omitempty"` 19 - Handle string `json:"handle" cborgen:"handle"` 15 + Did string `json:"did" cborgen:"did"` 16 + DidDoc *interface{} `json:"didDoc,omitempty" cborgen:"didDoc,omitempty"` 17 + Email *string `json:"email,omitempty" cborgen:"email,omitempty"` 18 + EmailAuthFactor *bool `json:"emailAuthFactor,omitempty" cborgen:"emailAuthFactor,omitempty"` 19 + EmailConfirmed *bool `json:"emailConfirmed,omitempty" cborgen:"emailConfirmed,omitempty"` 20 + Handle string `json:"handle" cborgen:"handle"` 20 21 } 21 22 22 23 // ServerGetSession calls the XRPC method "com.atproto.server.getSession".
+2 -1
api/atproto/serverupdateEmail.go
··· 12 12 13 13 // ServerUpdateEmail_Input is the input argument to a com.atproto.server.updateEmail call. 14 14 type ServerUpdateEmail_Input struct { 15 - Email string `json:"email" cborgen:"email"` 15 + Email string `json:"email" cborgen:"email"` 16 + EmailAuthFactor *bool `json:"emailAuthFactor,omitempty" cborgen:"emailAuthFactor,omitempty"` 16 17 // token: Requires a token from com.atproto.sever.requestEmailUpdate if the account's email has been confirmed. 17 18 Token *string `json:"token,omitempty" cborgen:"token,omitempty"` 18 19 }
+37
api/atproto/syncgetRepoStatus.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + package atproto 4 + 5 + // schema: com.atproto.sync.getRepoStatus 6 + 7 + import ( 8 + "context" 9 + 10 + "github.com/bluesky-social/indigo/xrpc" 11 + ) 12 + 13 + // SyncGetRepoStatus_Output is the output of a com.atproto.sync.getRepoStatus call. 14 + type SyncGetRepoStatus_Output struct { 15 + Active bool `json:"active" cborgen:"active"` 16 + Did string `json:"did" cborgen:"did"` 17 + // rev: Optional field, the current rev of the repo, if active=true 18 + Rev *string `json:"rev,omitempty" cborgen:"rev,omitempty"` 19 + // status: If active=false, this optional field indicates a possible reason for why the account is not active. If active=false and no status is supplied, then the host makes no claim for why the repository is no longer being hosted. 20 + Status *string `json:"status,omitempty" cborgen:"status,omitempty"` 21 + } 22 + 23 + // SyncGetRepoStatus calls the XRPC method "com.atproto.sync.getRepoStatus". 24 + // 25 + // did: The DID of the repo. 26 + func SyncGetRepoStatus(ctx context.Context, c *xrpc.Client, did string) (*SyncGetRepoStatus_Output, error) { 27 + var out SyncGetRepoStatus_Output 28 + 29 + params := map[string]interface{}{ 30 + "did": did, 31 + } 32 + if err := c.Do(ctx, xrpc.Query, "", "com.atproto.sync.getRepoStatus", params, nil, &out); err != nil { 33 + return nil, err 34 + } 35 + 36 + return &out, nil 37 + }
+4 -1
api/atproto/synclistRepos.go
··· 18 18 19 19 // SyncListRepos_Repo is a "repo" in the com.atproto.sync.listRepos schema. 20 20 type SyncListRepos_Repo struct { 21 - Did string `json:"did" cborgen:"did"` 21 + Active *bool `json:"active,omitempty" cborgen:"active,omitempty"` 22 + Did string `json:"did" cborgen:"did"` 22 23 // head: Current repo commit CID 23 24 Head string `json:"head" cborgen:"head"` 24 25 Rev string `json:"rev" cborgen:"rev"` 26 + // status: If active=false, this optional field indicates a possible reason for why the account is not active. If active=false and no status is supplied, then the host makes no claim for why the repository is no longer being hosted. 27 + Status *string `json:"status,omitempty" cborgen:"status,omitempty"` 25 28 } 26 29 27 30 // SyncListRepos calls the XRPC method "com.atproto.sync.listRepos".
+21 -6
api/atproto/syncsubscribeRepos.go
··· 8 8 "github.com/bluesky-social/indigo/lex/util" 9 9 ) 10 10 11 + // SyncSubscribeRepos_Account is a "account" in the com.atproto.sync.subscribeRepos schema. 12 + // 13 + // Represents a change to an account's status on a host (eg, PDS or Relay). The semantics of this event are that the status is at the host which emitted the event, not necessarily that at the currently active PDS. Eg, a Relay takedown would emit a takedown with active=false, even if the PDS is still active. 14 + type SyncSubscribeRepos_Account struct { 15 + // active: Indicates that the account has a repository which can be fetched from the host that emitted this event. 16 + Active bool `json:"active" cborgen:"active"` 17 + Did string `json:"did" cborgen:"did"` 18 + Seq int64 `json:"seq" cborgen:"seq"` 19 + // status: If active=false, this optional field indicates a reason for why the account is not active. 20 + Status *string `json:"status,omitempty" cborgen:"status,omitempty"` 21 + Time string `json:"time" cborgen:"time"` 22 + } 23 + 11 24 // SyncSubscribeRepos_Commit is a "commit" in the com.atproto.sync.subscribeRepos schema. 12 25 // 13 26 // Represents an update of repository state. Note that empty commits are allowed, which include no repo data changes, but an update to rev and signature. ··· 38 51 39 52 // SyncSubscribeRepos_Handle is a "handle" in the com.atproto.sync.subscribeRepos schema. 40 53 // 41 - // Represents an update of the account's handle, or transition to/from invalid state. NOTE: Will be deprecated in favor of #identity. 54 + // DEPRECATED -- Use #identity event instead 42 55 type SyncSubscribeRepos_Handle struct { 43 56 Did string `json:"did" cborgen:"did"` 44 57 Handle string `json:"handle" cborgen:"handle"` ··· 50 63 // 51 64 // Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache. 52 65 type SyncSubscribeRepos_Identity struct { 53 - Did string `json:"did" cborgen:"did"` 54 - Seq int64 `json:"seq" cborgen:"seq"` 55 - Time string `json:"time" cborgen:"time"` 66 + Did string `json:"did" cborgen:"did"` 67 + // handle: The current handle for the account, or 'handle.invalid' if validation fails. This field is optional, might have been validated or passed-through from an upstream source. Semantics and behaviors for PDS vs Relay may evolve in the future; see atproto specs for more details. 68 + Handle *string `json:"handle,omitempty" cborgen:"handle,omitempty"` 69 + Seq int64 `json:"seq" cborgen:"seq"` 70 + Time string `json:"time" cborgen:"time"` 56 71 } 57 72 58 73 // SyncSubscribeRepos_Info is a "info" in the com.atproto.sync.subscribeRepos schema. ··· 63 78 64 79 // SyncSubscribeRepos_Migrate is a "migrate" in the com.atproto.sync.subscribeRepos schema. 65 80 // 66 - // Represents an account moving from one PDS instance to another. NOTE: not implemented; account migration uses #identity instead 81 + // DEPRECATED -- Use #account event instead 67 82 type SyncSubscribeRepos_Migrate struct { 68 83 Did string `json:"did" cborgen:"did"` 69 84 MigrateTo *string `json:"migrateTo" cborgen:"migrateTo"` ··· 83 98 84 99 // SyncSubscribeRepos_Tombstone is a "tombstone" in the com.atproto.sync.subscribeRepos schema. 85 100 // 86 - // Indicates that an account has been deleted. NOTE: may be deprecated in favor of #identity or a future #account event 101 + // DEPRECATED -- Use #account event instead 87 102 type SyncSubscribeRepos_Tombstone struct { 88 103 Did string `json:"did" cborgen:"did"` 89 104 Seq int64 `json:"seq" cborgen:"seq"`
+1 -1
api/bsky/cbor_gen.go
··· 6233 6233 return err 6234 6234 } 6235 6235 6236 - t.LabelValues[i] = &sval 6236 + t.LabelValues[i] = string(sval) 6237 6237 } 6238 6238 6239 6239 }
+157 -8
bgs/bgs.go
··· 46 46 ) 47 47 48 48 var log = logging.Logger("bgs") 49 + var tracer = otel.Tracer("bgs") 49 50 50 51 // serverListenerBootTimeout is how long to wait for the requested server socket 51 52 // to become available for use. This is an arbitrary timeout that should be safe ··· 431 432 432 433 func (bgs *BGS) checkAdminAuth(next echo.HandlerFunc) echo.HandlerFunc { 433 434 return func(e echo.Context) error { 434 - ctx, span := otel.Tracer("bgs").Start(e.Request().Context(), "checkAdminAuth") 435 + ctx, span := tracer.Start(e.Request().Context(), "checkAdminAuth") 435 436 defer span.End() 436 437 437 438 e.SetRequest(e.Request().WithContext(ctx)) ··· 472 473 // and no data about this user will be served. 473 474 TakenDown bool 474 475 Tombstoned bool 476 + 477 + // SuspendedByPDS accounts shouldn't be served or processed until we get a revert account event 478 + SuspendedByPDS bool 479 + // DeactivatedByPDS accounts shouldn't be served or processed until we get a revert account event 480 + DeactivatedByPDS bool 481 + // TakenDownByPDS is set to true if the user in question has been taken down by their PDS and not by the relay administrator 482 + TakenDownByPDS bool 475 483 } 476 484 477 485 type addTargetBody struct { ··· 643 651 case evt.RepoIdentity != nil: 644 652 header.MsgType = "#identity" 645 653 obj = evt.RepoIdentity 654 + case evt.RepoAccount != nil: 655 + header.MsgType = "#account" 656 + obj = evt.RepoAccount 646 657 case evt.RepoInfo != nil: 647 658 header.MsgType = "#info" 648 659 obj = evt.RepoInfo ··· 747 758 } 748 759 749 760 func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error) { 750 - ctx, span := otel.Tracer("bgs").Start(ctx, "lookupUserByDid") 761 + ctx, span := tracer.Start(ctx, "lookupUserByDid") 751 762 defer span.End() 752 763 753 764 var u User ··· 763 774 } 764 775 765 776 func (bgs *BGS) lookupUserByUID(ctx context.Context, uid models.Uid) (*User, error) { 766 - ctx, span := otel.Tracer("bgs").Start(ctx, "lookupUserByUID") 777 + ctx, span := tracer.Start(ctx, "lookupUserByUID") 767 778 defer span.End() 768 779 769 780 var u User ··· 787 798 } 788 799 789 800 func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *events.XRPCStreamEvent) error { 790 - ctx, span := otel.Tracer("bgs").Start(ctx, "handleFedEvent") 801 + ctx, span := tracer.Start(ctx, "handleFedEvent") 791 802 defer span.End() 792 803 793 804 start := time.Now() ··· 819 830 u.Did = evt.Repo 820 831 } 821 832 822 - if u.TakenDown { 823 - log.Debugw("dropping event from taken down user", "did", evt.Repo, "seq", evt.Seq, "host", host.Host) 833 + if u.TakenDown || u.TakenDownByPDS { 834 + span.SetAttributes(attribute.Bool("taken_down_by_pds", u.TakenDownByPDS)) 835 + span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.TakenDown)) 836 + log.Debugw("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "host", host.Host) 837 + return nil 838 + } 839 + 840 + if u.SuspendedByPDS { 841 + span.SetAttributes(attribute.Bool("suspended_by_pds", true)) 842 + log.Debugw("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "host", host.Host) 824 843 return nil 825 844 } 826 845 ··· 844 863 } 845 864 846 865 if u.Tombstoned { 866 + span.SetAttributes(attribute.Bool("tombstoned", true)) 847 867 // we've checked the authority of the users PDS, so reinstate the account 848 868 if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil { 849 869 return fmt.Errorf("failed to un-tombstone a user: %w", err) ··· 962 982 } 963 983 964 984 return nil 985 + case env.RepoAccount != nil: 986 + span.SetAttributes( 987 + attribute.String("did", env.RepoAccount.Did), 988 + attribute.Int64("seq", env.RepoAccount.Seq), 989 + attribute.Bool("active", env.RepoAccount.Active), 990 + ) 991 + 992 + if env.RepoAccount.Status != nil { 993 + span.SetAttributes(attribute.String("repo_status", *env.RepoAccount.Status)) 994 + } 995 + 996 + log.Infow("bgs got account event", "did", env.RepoAccount.Did) 997 + // Flush any cached DID documents for this user 998 + bgs.didr.FlushCacheFor(env.RepoAccount.Did) 999 + 1000 + // Refetch the DID doc to make sure the PDS is still authoritative 1001 + ai, err := bgs.createExternalUser(ctx, env.RepoAccount.Did) 1002 + if err != nil { 1003 + span.RecordError(err) 1004 + return err 1005 + } 1006 + 1007 + // Check if the PDS is still authoritative 1008 + // if not we don't want to be propagating this account event 1009 + if ai.PDS != host.ID { 1010 + log.Errorw("account event from non-authoritative pds", 1011 + "seq", env.RepoAccount.Seq, 1012 + "did", env.RepoAccount.Did, 1013 + "event_from", host.Host, 1014 + "did_doc_declared_pds", ai.PDS, 1015 + "account_evt", env.RepoAccount, 1016 + ) 1017 + return fmt.Errorf("event from non-authoritative pds") 1018 + } 1019 + 1020 + // Process the account status change 1021 + repoStatus := events.AccountStatusActive 1022 + if !env.RepoAccount.Active && env.RepoAccount.Status != nil { 1023 + repoStatus = *env.RepoAccount.Status 1024 + } 1025 + 1026 + err = bgs.UpdateAccountStatus(ctx, env.RepoAccount.Did, repoStatus) 1027 + if err != nil { 1028 + span.RecordError(err) 1029 + return fmt.Errorf("failed to update account status: %w", err) 1030 + } 1031 + 1032 + // Broadcast the account event to all consumers 1033 + err = bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{ 1034 + RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 1035 + Did: env.RepoAccount.Did, 1036 + Seq: env.RepoAccount.Seq, 1037 + Time: env.RepoAccount.Time, 1038 + Active: env.RepoAccount.Active, 1039 + Status: env.RepoAccount.Status, 1040 + }, 1041 + }) 1042 + if err != nil { 1043 + log.Errorw("failed to broadcast Account event", "error", err, "did", env.RepoAccount.Did) 1044 + return fmt.Errorf("failed to broadcast Account event: %w", err) 1045 + } 1046 + 1047 + return nil 965 1048 case env.RepoMigrate != nil: 966 1049 if _, err := bgs.createExternalUser(ctx, env.RepoMigrate.Did); err != nil { 967 1050 return err ··· 1042 1125 1043 1126 // TODO: rename? This also updates users, and 'external' is an old phrasing 1044 1127 func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.ActorInfo, error) { 1045 - ctx, span := otel.Tracer("bgs").Start(ctx, "createExternalUser") 1128 + ctx, span := tracer.Start(ctx, "createExternalUser") 1046 1129 defer span.End() 1047 1130 1048 1131 externalUserCreationAttempts.Inc() ··· 1277 1360 return subj, nil 1278 1361 } 1279 1362 1363 + func (bgs *BGS) UpdateAccountStatus(ctx context.Context, did string, status string) error { 1364 + ctx, span := tracer.Start(ctx, "UpdateAccountStatus") 1365 + defer span.End() 1366 + 1367 + span.SetAttributes( 1368 + attribute.String("did", did), 1369 + attribute.String("status", status), 1370 + ) 1371 + 1372 + u, err := bgs.lookupUserByDid(ctx, did) 1373 + if err != nil { 1374 + return err 1375 + } 1376 + 1377 + switch status { 1378 + case events.AccountStatusActive: 1379 + // Unset the PDS-specific status flags 1380 + if err := bgs.db.Model(User{}).Where("id = ?", u.ID).UpdateColumns(map[string]any{ 1381 + "suspended_by_pds": false, 1382 + "taken_down_by_pds": false, 1383 + "deactivated_by_pds": false, 1384 + }).Error; err != nil { 1385 + return fmt.Errorf("failed to clear PDS takedown statuss for user: %w", err) 1386 + } 1387 + case events.AccountStatusDeactivated: 1388 + if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("deactivated_by_pds", true).Error; err != nil { 1389 + return fmt.Errorf("failed to set user deactivation status: %w", err) 1390 + } 1391 + case events.AccountStatusSuspended: 1392 + if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("suspended_by_pds", true).Error; err != nil { 1393 + return fmt.Errorf("failed to set user suspension status: %w", err) 1394 + } 1395 + case events.AccountStatusTakendown: 1396 + if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down_by_pds", true).Error; err != nil { 1397 + return fmt.Errorf("failed to set user takedown status: %w", err) 1398 + } 1399 + 1400 + if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{ 1401 + "handle": nil, 1402 + }).Error; err != nil { 1403 + return err 1404 + } 1405 + case events.AccountStatusDeleted: 1406 + if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumns(map[string]any{ 1407 + "tombstoned": true, 1408 + "handle": nil, 1409 + }).Error; err != nil { 1410 + return err 1411 + } 1412 + 1413 + if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{ 1414 + "handle": nil, 1415 + }).Error; err != nil { 1416 + return err 1417 + } 1418 + 1419 + // delete data from carstore 1420 + if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil { 1421 + // don't let a failure here prevent us from propagating this event 1422 + log.Errorf("failed to delete user data from carstore: %s", err) 1423 + } 1424 + } 1425 + 1426 + return nil 1427 + } 1428 + 1280 1429 func (bgs *BGS) TakeDownRepo(ctx context.Context, did string) error { 1281 1430 u, err := bgs.lookupUserByDid(ctx, did) 1282 1431 if err != nil { ··· 1373 1522 } 1374 1523 1375 1524 func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error { 1376 - ctx, span := otel.Tracer("bgs").Start(ctx, "ResyncPDS") 1525 + ctx, span := tracer.Start(ctx, "ResyncPDS") 1377 1526 defer span.End() 1378 1527 log := log.With("pds", pds.Host, "source", "resync_pds") 1379 1528 resync, found := bgs.LoadOrStoreResync(pds)
+15
bgs/fedmgr.go
··· 606 606 607 607 return nil 608 608 }, 609 + RepoAccount: func(acct *comatproto.SyncSubscribeRepos_Account) error { 610 + log.Infow("account event", "did", acct.Did, "status", acct.Status) 611 + if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{ 612 + RepoAccount: acct, 613 + }); err != nil { 614 + log.Errorf("failed handling event from %q (%d): %s", host.Host, acct.Seq, err) 615 + } 616 + *lastCursor = acct.Seq 617 + 618 + if err := s.updateCursor(sub, *lastCursor); err != nil { 619 + return fmt.Errorf("updating cursor: %w", err) 620 + } 621 + 622 + return nil 623 + }, 609 624 // TODO: all the other event types (handle change, migration, etc) 610 625 Error: func(errf *events.ErrorFrame) error { 611 626 switch errf.Error {
+19
events/consumer.go
··· 18 18 RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error 19 19 RepoHandle func(evt *comatproto.SyncSubscribeRepos_Handle) error 20 20 RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error 21 + RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error 21 22 RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error 22 23 RepoMigrate func(evt *comatproto.SyncSubscribeRepos_Migrate) error 23 24 RepoTombstone func(evt *comatproto.SyncSubscribeRepos_Tombstone) error ··· 38 39 return rsc.RepoMigrate(xev.RepoMigrate) 39 40 case xev.RepoIdentity != nil && rsc.RepoIdentity != nil: 40 41 return rsc.RepoIdentity(xev.RepoIdentity) 42 + case xev.RepoAccount != nil && rsc.RepoAccount != nil: 43 + return rsc.RepoAccount(xev.RepoAccount) 41 44 case xev.RepoTombstone != nil && rsc.RepoTombstone != nil: 42 45 return rsc.RepoTombstone(xev.RepoTombstone) 43 46 case xev.LabelLabels != nil && rsc.LabelLabels != nil: ··· 230 233 231 234 if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{ 232 235 RepoIdentity: &evt, 236 + }); err != nil { 237 + return err 238 + } 239 + case "#account": 240 + var evt comatproto.SyncSubscribeRepos_Account 241 + if err := evt.UnmarshalCBOR(r); err != nil { 242 + return err 243 + } 244 + 245 + if evt.Seq < lastSeq { 246 + log.Errorf("Got events out of order from stream (seq = %d, prev = %d)", evt.Seq, lastSeq) 247 + } 248 + lastSeq = evt.Seq 249 + 250 + if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{ 251 + RepoAccount: &evt, 233 252 }); err != nil { 234 253 return err 235 254 }
+50
events/dbpersist.go
··· 79 79 Type string 80 80 Rebase bool 81 81 82 + // Active and Status are only set on RepoAccount events 83 + Active bool 84 + Status *string 85 + 82 86 Ops []byte 83 87 } 84 88 ··· 167 171 e.RepoHandle.Seq = int64(item.Seq) 168 172 case e.RepoIdentity != nil: 169 173 e.RepoIdentity.Seq = int64(item.Seq) 174 + case e.RepoAccount != nil: 175 + e.RepoAccount.Seq = int64(item.Seq) 170 176 case e.RepoTombstone != nil: 171 177 e.RepoTombstone.Seq = int64(item.Seq) 172 178 default: ··· 218 224 if err != nil { 219 225 return err 220 226 } 227 + case e.RepoAccount != nil: 228 + rer, err = p.RecordFromRepoAccount(ctx, e.RepoAccount) 229 + if err != nil { 230 + return err 231 + } 221 232 case e.RepoTombstone != nil: 222 233 rer, err = p.RecordFromTombstone(ctx, e.RepoTombstone) 223 234 if err != nil { ··· 270 281 Time: t, 271 282 }, nil 272 283 } 284 + 285 + func (p *DbPersistence) RecordFromRepoAccount(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account) (*RepoEventRecord, error) { 286 + t, err := time.Parse(util.ISO8601, evt.Time) 287 + if err != nil { 288 + return nil, err 289 + } 290 + 291 + uid, err := p.uidForDid(ctx, evt.Did) 292 + if err != nil { 293 + return nil, err 294 + } 295 + 296 + return &RepoEventRecord{ 297 + Repo: uid, 298 + Type: "repo_account", 299 + Time: t, 300 + Active: evt.Active, 301 + Status: evt.Status, 302 + }, nil 303 + } 304 + 273 305 func (p *DbPersistence) RecordFromTombstone(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Tombstone) (*RepoEventRecord, error) { 274 306 t, err := time.Parse(util.ISO8601, evt.Time) 275 307 if err != nil { ··· 422 454 streamEvent, err = p.hydrateHandleChange(ctx, record) 423 455 case record.Type == "repo_identity": 424 456 streamEvent, err = p.hydrateIdentityEvent(ctx, record) 457 + case record.Type == "repo_account": 458 + streamEvent, err = p.hydrateAccountEvent(ctx, record) 425 459 case record.Type == "repo_tombstone": 426 460 streamEvent, err = p.hydrateTombstone(ctx, record) 427 461 default: ··· 515 549 RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{ 516 550 Did: did, 517 551 Time: rer.Time.Format(util.ISO8601), 552 + }, 553 + }, nil 554 + } 555 + 556 + func (p *DbPersistence) hydrateAccountEvent(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) { 557 + did, err := p.didForUid(ctx, rer.Repo) 558 + if err != nil { 559 + return nil, err 560 + } 561 + 562 + return &XRPCStreamEvent{ 563 + RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 564 + Did: did, 565 + Time: rer.Time.Format(util.ISO8601), 566 + Active: rer.Active, 567 + Status: rer.Status, 518 568 }, 519 569 }, nil 520 570 }
+18
events/diskpersist.go
··· 277 277 evtKindHandle = 2 278 278 evtKindTombstone = 3 279 279 evtKindIdentity = 4 280 + evtKindAccount = 5 280 281 ) 281 282 282 283 var emptyHeader = make([]byte, headerSize) ··· 454 455 e.RepoHandle.Seq = seq 455 456 case e.RepoIdentity != nil: 456 457 e.RepoIdentity.Seq = seq 458 + case e.RepoAccount != nil: 459 + e.RepoAccount.Seq = seq 457 460 case e.RepoTombstone != nil: 458 461 e.RepoTombstone.Seq = seq 459 462 default: ··· 512 515 evtKind = evtKindIdentity 513 516 did = e.RepoIdentity.Did 514 517 if err := e.RepoIdentity.MarshalCBOR(cw); err != nil { 518 + return fmt.Errorf("failed to marshal: %w", err) 519 + } 520 + case e.RepoAccount != nil: 521 + evtKind = evtKindAccount 522 + did = e.RepoAccount.Did 523 + if err := e.RepoAccount.MarshalCBOR(cw); err != nil { 515 524 return fmt.Errorf("failed to marshal: %w", err) 516 525 } 517 526 case e.RepoTombstone != nil: ··· 748 757 } 749 758 evt.Seq = h.Seq 750 759 if err := cb(&XRPCStreamEvent{RepoIdentity: &evt}); err != nil { 760 + return nil, err 761 + } 762 + case evtKindAccount: 763 + var evt atproto.SyncSubscribeRepos_Account 764 + if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 765 + return nil, err 766 + } 767 + evt.Seq = h.Seq 768 + if err := cb(&XRPCStreamEvent{RepoAccount: &evt}); err != nil { 751 769 return nil, err 752 770 } 753 771 case evtKindTombstone:
+9
events/events.go
··· 133 133 MsgType string `cborgen:"t"` 134 134 } 135 135 136 + var ( 137 + AccountStatusActive = "active" 138 + AccountStatusTakendown = "takendown" 139 + AccountStatusSuspended = "suspended" 140 + AccountStatusDeleted = "deleted" 141 + AccountStatusDeactivated = "deactivated" 142 + ) 143 + 136 144 type XRPCStreamEvent struct { 137 145 Error *ErrorFrame 138 146 RepoCommit *comatproto.SyncSubscribeRepos_Commit ··· 141 149 RepoInfo *comatproto.SyncSubscribeRepos_Info 142 150 RepoMigrate *comatproto.SyncSubscribeRepos_Migrate 143 151 RepoTombstone *comatproto.SyncSubscribeRepos_Tombstone 152 + RepoAccount *comatproto.SyncSubscribeRepos_Account 144 153 LabelLabels *comatproto.LabelSubscribeLabels_Labels 145 154 LabelInfo *comatproto.LabelSubscribeLabels_Info 146 155
+2
events/persist.go
··· 45 45 e.RepoHandle.Seq = mp.seq 46 46 case e.RepoIdentity != nil: 47 47 e.RepoIdentity.Seq = mp.seq 48 + case e.RepoAccount != nil: 49 + e.RepoAccount.Seq = mp.seq 48 50 case e.RepoMigrate != nil: 49 51 e.RepoMigrate.Seq = mp.seq 50 52 case e.RepoTombstone != nil:
+2
events/yolopersist.go
··· 31 31 e.RepoHandle.Seq = yp.seq 32 32 case e.RepoIdentity != nil: 33 33 e.RepoIdentity.Seq = yp.seq 34 + case e.RepoAccount != nil: 35 + e.RepoAccount.Seq = yp.seq 34 36 case e.RepoMigrate != nil: 35 37 e.RepoMigrate.Seq = yp.seq 36 38 case e.RepoTombstone != nil:
+3
pds/server.go
··· 641 641 case evt.RepoIdentity != nil: 642 642 header.MsgType = "#identity" 643 643 obj = evt.RepoIdentity 644 + case evt.RepoAccount != nil: 645 + header.MsgType = "#account" 646 + obj = evt.RepoAccount 644 647 case evt.RepoInfo != nil: 645 648 header.MsgType = "#info" 646 649 obj = evt.RepoInfo
+7
sonar/sonar.go
··· 133 133 s.Progress.LastSeq = xe.RepoIdentity.Seq 134 134 s.Progress.LastSeqProcessedAt = now 135 135 s.ProgMux.Unlock() 136 + case xe.RepoAccount != nil: 137 + eventsProcessedCounter.WithLabelValues("account", s.SocketURL).Inc() 138 + now := time.Now() 139 + s.ProgMux.Lock() 140 + s.Progress.LastSeq = xe.RepoAccount.Seq 141 + s.Progress.LastSeqProcessedAt = now 142 + s.ProgMux.Unlock() 136 143 case xe.RepoInfo != nil: 137 144 eventsProcessedCounter.WithLabelValues("repo_info", s.SocketURL).Inc() 138 145 case xe.RepoMigrate != nil:
+7
testing/utils.go
··· 612 612 es.Lk.Unlock() 613 613 return nil 614 614 }, 615 + RepoAccount: func(evt *atproto.SyncSubscribeRepos_Account) error { 616 + fmt.Println("received account event: ", evt.Seq, evt.Did) 617 + es.Lk.Lock() 618 + es.Events = append(es.Events, &events.XRPCStreamEvent{RepoAccount: evt}) 619 + es.Lk.Unlock() 620 + return nil 621 + }, 615 622 } 616 623 seqScheduler := sequential.NewScheduler("test", rsc.EventHandler) 617 624 if err := events.HandleRepoStream(ctx, con, seqScheduler); err != nil {