this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

sync v1.1 event updates: add `#sync`, remove `#handle`, `#migrate`, `#tombstone` (#967)

This was an old branch. which I have now rebased on top of
https://github.com/bluesky-social/indigo/pull/1082 (which got merged to
main).

It now basically just adds `#sync` support in a few more places it was
missing.

authored by

bnewbold and committed by
GitHub
a6d3e9a1 0b69efcd

+86 -2
+15
bgs/fedmgr.go
··· 569 569 570 570 return nil 571 571 }, 572 + RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 573 + log.Info("sync event", "did", evt.Did, "pdsHost", host.Host, "seq", evt.Seq) 574 + if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{ 575 + RepoSync: evt, 576 + }); err != nil { 577 + log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err) 578 + } 579 + *lastCursor = evt.Seq 580 + 581 + if err := s.updateCursor(sub, *lastCursor); err != nil { 582 + return fmt.Errorf("updating cursor: %w", err) 583 + } 584 + 585 + return nil 586 + }, 572 587 RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error { 573 588 log.Info("info event", "name", info.Name, "message", info.Message, "pdsHost", host.Host) 574 589 return nil
+8
cmd/gosky/debug.go
··· 262 262 263 263 return nil 264 264 }, 265 + RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 266 + fmt.Printf("\rChecking seq: %d ", evt.Seq) 267 + if lastSeq > 0 && evt.Seq != lastSeq+1 { 268 + fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq) 269 + } 270 + lastSeq = evt.Seq 271 + return nil 272 + }, 265 273 RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error { 266 274 return nil 267 275 },
+14
cmd/gosky/main.go
··· 284 284 285 285 return nil 286 286 }, 287 + RepoSync: func(sync *comatproto.SyncSubscribeRepos_Sync) error { 288 + if jsonfmt { 289 + b, err := json.Marshal(sync) 290 + if err != nil { 291 + return err 292 + } 293 + fmt.Println(string(b)) 294 + } else { 295 + fmt.Printf("(%d) Sync: %s\n", sync.Seq, sync.Did) 296 + } 297 + 298 + return nil 299 + 300 + }, 287 301 RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error { 288 302 if jsonfmt { 289 303 b, err := json.Marshal(info)
+2
cmd/gosky/streamdiff.go
··· 127 127 return "ERROR" 128 128 case evt.RepoCommit != nil: 129 129 return "#commit" 130 + case evt.RepoSync != nil: 131 + return "#sync" 130 132 case evt.RepoInfo != nil: 131 133 return "#info" 132 134 default:
+7
cmd/sonar/sonar.go
··· 109 109 case xe.RepoCommit != nil: 110 110 eventsProcessedCounter.WithLabelValues("repo_commit", s.SocketURL).Inc() 111 111 return s.HandleRepoCommit(ctx, xe.RepoCommit) 112 + case xe.RepoSync != nil: 113 + eventsProcessedCounter.WithLabelValues("sync", s.SocketURL).Inc() 114 + now := time.Now() 115 + s.ProgMux.Lock() 116 + s.Progress.LastSeq = xe.RepoSync.Seq 117 + s.Progress.LastSeqProcessedAt = now 118 + s.ProgMux.Unlock() 112 119 case xe.RepoIdentity != nil: 113 120 eventsProcessedCounter.WithLabelValues("identity", s.SocketURL).Inc() 114 121 now := time.Now()
+3
cmd/supercollider/main.go
··· 338 338 case evt.RepoCommit != nil: 339 339 header.MsgType = "#commit" 340 340 obj = evt.RepoCommit 341 + case evt.RepoSync != nil: 342 + header.MsgType = "#sync" 343 + obj = evt.RepoSync 341 344 case evt.RepoInfo != nil: 342 345 header.MsgType = "#info" 343 346 obj = evt.RepoInfo
+2 -2
events/diskpersist/diskpersist.go
··· 278 278 279 279 const ( 280 280 evtKindCommit = 1 281 - evtKindHandle = 2 282 - evtKindTombstone = 3 281 + evtKindHandle = 2 // DEPRECATED 282 + evtKindTombstone = 3 // DEPRECATED 283 283 evtKindIdentity = 4 284 284 evtKindAccount = 5 285 285 evtKindSync = 6
+2
events/persist.go
··· 42 42 switch { 43 43 case e.RepoCommit != nil: 44 44 e.RepoCommit.Seq = mp.seq 45 + case e.RepoSync != nil: 46 + e.RepoSync.Seq = mp.seq 45 47 case e.RepoIdentity != nil: 46 48 e.RepoIdentity.Seq = mp.seq 47 49 case e.RepoAccount != nil:
+4
pds/server.go
··· 598 598 case evt.RepoCommit != nil: 599 599 header.MsgType = "#commit" 600 600 obj = evt.RepoCommit 601 + case evt.RepoSync != nil: 602 + header.MsgType = "#sync" 603 + obj = evt.RepoSync 601 604 case evt.RepoIdentity != nil: 602 605 header.MsgType = "#identity" 603 606 obj = evt.RepoIdentity ··· 651 654 return fmt.Errorf("failed to update handle: %w", err) 652 655 } 653 656 657 + // Push an Identity event 654 658 if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{ 655 659 RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{ 656 660 Did: u.Did,
+22
search/firehose.go
··· 115 115 return nil 116 116 117 117 }, 118 + // TODO: process RepoIdentity 119 + RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 120 + ctx := context.Background() 121 + ctx, span := tracer.Start(ctx, "RepoIdentity") 122 + defer span.End() 123 + 124 + did, err := syntax.ParseDID(evt.Did) 125 + if err != nil { 126 + idx.logger.Error("bad DID in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err) 127 + return nil 128 + } 129 + ident, err := idx.dir.LookupDID(ctx, did) 130 + if err != nil { 131 + idx.logger.Error("failed identity resolution in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err) 132 + return nil 133 + } 134 + if err := idx.updateUserHandle(ctx, did, ident.Handle.String()); err != nil { 135 + // TODO: handle this case (instead of return nil) 136 + idx.logger.Error("failed to update user handle", "did", evt.Did, "handle", ident.Handle, "seq", evt.Seq, "err", err) 137 + } 138 + return nil 139 + }, 118 140 } 119 141 120 142 return events.HandleRepoStream(
+7
testing/utils.go
··· 666 666 es.Lk.Unlock() 667 667 return nil 668 668 }, 669 + RepoSync: func(evt *atproto.SyncSubscribeRepos_Sync) error { 670 + fmt.Println("received sync event: ", evt.Seq, evt.Did) 671 + es.Lk.Lock() 672 + es.Events = append(es.Events, &events.XRPCStreamEvent{RepoSync: evt}) 673 + es.Lk.Unlock() 674 + return nil 675 + }, 669 676 RepoIdentity: func(evt *atproto.SyncSubscribeRepos_Identity) error { 670 677 fmt.Println("received identity event: ", evt.Seq, evt.Did) 671 678 es.Lk.Lock()