this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

handle sync messages

dholms 5105e6d4 b8f76dfb

+68 -9
+19 -9
atproto/repo/sync.go
··· 164 164 return out, nil 165 165 } 166 166 167 + func VerifySyncMessage(ctx context.Context, dir identity.Directory, msg *comatproto.SyncSubscribeRepos_Sync) (*Commit, error) { 168 + return VerifyCommitSignatureFromCar(ctx, dir, []byte(msg.Blocks)) 169 + } 170 + 167 171 // temporary/experimental code showing how to verify a commit signature from firehose 168 172 // 169 173 // TODO: in real implementation, will want to merge this code with `VerifyCommitMessage` above, and have it hanging off some service struct with a configured `identity.Directory` 170 174 func VerifyCommitSignature(ctx context.Context, dir identity.Directory, msg *comatproto.SyncSubscribeRepos_Commit) error { 171 - commit, _, err := LoadRepoFromCAR(ctx, bytes.NewReader([]byte(msg.Blocks))) 175 + _, err := VerifyCommitSignatureFromCar(ctx, dir, []byte(msg.Blocks)) 176 + return err 177 + } 178 + 179 + func VerifyCommitSignatureFromCar(ctx context.Context, dir identity.Directory, car []byte) (*Commit, error) { 180 + commit, _, err := LoadCommitFromCAR(ctx, bytes.NewReader(car)) 172 181 if err != nil { 173 - return err 182 + return nil, err 174 183 } 175 184 176 - if err := commit.VerifyStructure(); err != nil { 177 - return err 178 - } 179 185 did, err := syntax.ParseDID(commit.DID) 180 186 if err != nil { 181 - return err 187 + return nil, err 182 188 } 183 189 184 190 ident, err := dir.LookupDID(ctx, did) 185 191 if err != nil { 186 - return err 192 + return nil, err 187 193 } 188 194 pubkey, err := ident.PublicKey() 189 195 if err != nil { 190 - return err 196 + return nil, err 191 197 } 192 198 193 - return commit.VerifySignature(pubkey) 199 + err = commit.VerifySignature(pubkey) 200 + if err != nil { 201 + return nil, err 202 + } 203 + return commit, nil 194 204 }
+11
nexus/nexus.go
··· 94 94 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 95 95 return n.EventProcessor.ProcessCommit(context.Background(), evt) 96 96 }, 97 + RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 98 + return n.EventProcessor.ProcessSync(context.Background(), evt) 99 + }, 100 + RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 101 + // @TODO 102 + return nil 103 + }, 104 + RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 105 + // @TODO 106 + return nil 107 + }, 97 108 } 98 109 99 110 n.FirehoseConsumer = &FirehoseConsumer{
+38
nexus/processor.go
··· 28 28 seqMu sync.Mutex 29 29 } 30 30 31 + func (ep *EventProcessor) ProcessSync(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) error { 32 + defer ep.trackLastSeq(evt.Seq) 33 + 34 + var curr models.Repo 35 + if err := ep.DB.First(&curr, "did = ?", evt.Did).Error; err != nil { 36 + if err != gorm.ErrRecordNotFound { 37 + ep.Logger.Error("failed to get repo state", "did", evt.Did, "error", err) 38 + } 39 + return nil 40 + } 41 + 42 + commit, err := repo.VerifySyncMessage(ctx, ep.Dir, evt) 43 + if err != nil { 44 + return fmt.Errorf("failed to verify sync message: %w", err) 45 + } 46 + 47 + if curr.State != models.RepoStateActive { 48 + return nil 49 + } 50 + 51 + if curr.Rev != "" && commit.Rev <= curr.Rev { 52 + ep.Logger.Debug("skipping replayed event", "did", commit.DID, "eventRev", commit.Rev, "currentRev", curr.Rev) 53 + return nil 54 + } 55 + 56 + if curr.PrevData == commit.Data.String() { 57 + ep.Logger.Debug("skipping noop sync event", "did", commit.DID, "rev", commit.Rev) 58 + return nil 59 + } 60 + 61 + if err := ep.UpdateRepoState(commit.DID, models.RepoStateDesynced); err != nil { 62 + ep.Logger.Error("failed to update repo state to desynced", "did", commit.DID, "error", err) 63 + return err 64 + } 65 + 66 + return nil 67 + } 68 + 31 69 func (ep *EventProcessor) ProcessCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 32 70 defer ep.trackLastSeq(evt.Seq) 33 71