this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

merged EventManager (#68)

This combines the RepoAppend and LabelBatch event managers (and
MemPersisters) into a single implementation. The database persister and
some other code is not sharable.

After implementing this, I realized that the "kind" integers probably
can't be a single numeric space because there would be a coordination
problem with folks proposing random new lexicons and having conflicts.
So may have to refactor this again before label stuff stabilizes.

The `Append -> RepoAppend` rename was for clarity, but it got large and
disruptive. I don't feel super strongly about that if you want me to
rever it.

The `XRPCStreamEvent` (to replace `RepoStreamEvent`) was just the first
name I thought of, can do a quick search/replace if you prefer something
else.

authored by

Whyrusleeping and committed by
GitHub
215a0ee5 d047a372

+144 -311
+6 -6
bgs/bgs.go
··· 157 157 return fmt.Errorf("upgrading websocket: %w", err) 158 158 } 159 159 160 - evts, cancel, err := bgs.events.Subscribe(ctx, func(evt *events.RepoStreamEvent) bool { return true }, since) 160 + evts, cancel, err := bgs.events.Subscribe(ctx, func(evt *events.XRPCStreamEvent) bool { return true }, since) 161 161 if err != nil { 162 162 return err 163 163 } ··· 175 175 var obj util.CBOR 176 176 177 177 switch { 178 - case evt.Append != nil: 178 + case evt.RepoAppend != nil: 179 179 header.Op = events.EvtKindRepoAppend 180 - obj = evt.Append 180 + obj = evt.RepoAppend 181 181 case evt.Error != nil: 182 182 header.Op = events.EvtKindErrorFrame 183 183 obj = evt.Error ··· 238 238 return &u, nil 239 239 } 240 240 241 - func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *events.RepoStreamEvent) error { 241 + func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *events.XRPCStreamEvent) error { 242 242 ctx, span := otel.Tracer("bgs").Start(ctx, "handleFedEvent") 243 243 defer span.End() 244 244 245 245 switch { 246 - case env.Append != nil: 247 - evt := env.Append 246 + case env.RepoAppend != nil: 247 + evt := env.RepoAppend 248 248 log.Infof("bgs got repo append event %d from %q: %s\n", evt.Seq, host.Host, evt.Repo) 249 249 u, err := bgs.lookupUserByDid(ctx, evt.Repo) 250 250 if err != nil {
+5 -5
bgs/fedmgr.go
··· 14 14 "gorm.io/gorm" 15 15 ) 16 16 17 - type IndexCallback func(context.Context, *models.PDS, *events.RepoStreamEvent) error 17 + type IndexCallback func(context.Context, *models.PDS, *events.XRPCStreamEvent) error 18 18 19 19 // TODO: rename me 20 20 type Slurper struct { ··· 157 157 defer cancel() 158 158 159 159 return events.HandleRepoStream(ctx, con, &events.RepoStreamCallbacks{ 160 - Append: func(evt *events.RepoAppend) error { 160 + RepoAppend: func(evt *events.RepoAppend) error { 161 161 162 162 log.Infow("got remote repo event", "host", host.Host, "repo", evt.Repo) 163 - if err := s.cb(context.TODO(), host, &events.RepoStreamEvent{ 164 - Append: evt, 163 + if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{ 164 + RepoAppend: evt, 165 165 }); err != nil { 166 - log.Errorf("failed to index event from %q (%d): %s", host.Host, evt.Seq, err) 166 + log.Errorf("failed handling event from %q (%d): %s", host.Host, evt.Seq, err) 167 167 } 168 168 *lastCursor = evt.Seq 169 169
+1 -1
cmd/gosky/main.go
··· 877 877 }() 878 878 879 879 return events.HandleRepoStream(ctx, con, &events.RepoStreamCallbacks{ 880 - Append: func(evt *events.RepoAppend) error { 880 + RepoAppend: func(evt *events.RepoAppend) error { 881 881 if jsonfmt { 882 882 b, err := json.Marshal(evt) 883 883 if err != nil {
+25 -5
events/consumer.go
··· 9 9 ) 10 10 11 11 type RepoStreamCallbacks struct { 12 - Append func(evt *RepoAppend) error 13 - Info func(evt *InfoFrame) error 14 - Error func(evt *ErrorFrame) error 12 + RepoAppend func(evt *RepoAppend) error 13 + LabelBatch func(evt *LabelBatch) error 14 + Info func(evt *InfoFrame) error 15 + Error func(evt *ErrorFrame) error 15 16 } 16 17 17 18 func HandleRepoStream(ctx context.Context, con *websocket.Conn, cbs *RepoStreamCallbacks) error { ··· 66 67 67 68 lastSeq = evt.Seq 68 69 69 - if cbs.Append != nil { 70 - if err := cbs.Append(&evt); err != nil { 70 + if cbs.RepoAppend != nil { 71 + if err := cbs.RepoAppend(&evt); err != nil { 71 72 return err 72 73 } 73 74 } else { 74 75 log.Warnf("received repo append event with nil append object (seq %d)", evt.Seq) 76 + } 77 + case EvtKindLabelBatch: 78 + var evt LabelBatch 79 + if err := evt.UnmarshalCBOR(r); err != nil { 80 + return fmt.Errorf("reading LabelBatch event: %w", err) 81 + } 82 + 83 + if evt.Seq < lastSeq { 84 + log.Errorf("Got events out of order from stream (seq = %d, prev = %d)", evt.Seq, lastSeq) 85 + } 86 + 87 + lastSeq = evt.Seq 88 + 89 + if cbs.RepoAppend != nil { 90 + if err := cbs.LabelBatch(&evt); err != nil { 91 + return err 92 + } 93 + } else { 94 + log.Warnf("received label event with nil append object (seq %d)", evt.Seq) 75 95 } 76 96 case EvtKindInfoFrame: 77 97 var info InfoFrame
+8 -8
events/dbpersist.go
··· 56 56 }, nil 57 57 } 58 58 59 - func (p *DbPersistence) Persist(ctx context.Context, e *RepoStreamEvent) error { 60 - if e.Append == nil { 59 + func (p *DbPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error { 60 + if e.RepoAppend == nil { 61 61 return nil 62 62 } 63 63 64 - evt := e.Append 64 + evt := e.RepoAppend 65 65 66 66 uid, err := p.uidForDid(ctx, evt.Repo) 67 67 if err != nil { ··· 126 126 return err 127 127 } 128 128 129 - e.Append.Seq = int64(rer.Seq) 129 + e.RepoAppend.Seq = int64(rer.Seq) 130 130 131 131 return nil 132 132 } 133 133 134 - func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*RepoStreamEvent) error) error { 134 + func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { 135 135 rows, err := p.db.Model(RepoEventRecord{}).Where("seq > ?", since).Order("seq asc").Rows() 136 136 if err != nil { 137 137 return err ··· 151 151 152 152 evt.Ops = ops 153 153 154 - ra, err := p.hydrate(ctx, &evt) 154 + ra, err := p.hydrateRepoEvent(ctx, &evt) 155 155 if err != nil { 156 156 return err 157 157 } 158 158 159 - if err := cb(&RepoStreamEvent{Append: ra}); err != nil { 159 + if err := cb(&XRPCStreamEvent{RepoAppend: ra}); err != nil { 160 160 return err 161 161 } 162 162 } ··· 182 182 return u.Did, nil 183 183 } 184 184 185 - func (p *DbPersistence) hydrate(ctx context.Context, rer *RepoEventRecord) (*RepoAppend, error) { 185 + func (p *DbPersistence) hydrateRepoEvent(ctx context.Context, rer *RepoEventRecord) (*RepoAppend, error) { 186 186 var blobs []string 187 187 if len(rer.Blobs) > 0 { 188 188 if err := json.Unmarshal(rer.Blobs, &blobs); err != nil {
+2 -2
events/diskpersist.go
··· 10 10 }, nil 11 11 } 12 12 13 - func (p *DiskPersistence) Persist(e *RepoStreamEvent) error { 13 + func (p *DiskPersistence) Persist(e *XRPCStreamEvent) error { 14 14 panic("nyi") 15 15 } 16 16 17 - func (p *DiskPersistence) Playback(since int64, cb func(*RepoStreamEvent) error) error { 17 + func (p *DiskPersistence) Playback(since int64, cb func(*XRPCStreamEvent) error) error { 18 18 panic("nyi") 19 19 }
+31 -12
events/events.go
··· 38 38 type Operation struct { 39 39 op int 40 40 sub *Subscriber 41 - evt *RepoStreamEvent 41 + evt *XRPCStreamEvent 42 42 } 43 43 44 44 func (em *EventManager) Run() { ··· 75 75 } 76 76 77 77 type Subscriber struct { 78 - outgoing chan *RepoStreamEvent 78 + outgoing chan *XRPCStreamEvent 79 79 80 - filter func(*RepoStreamEvent) bool 80 + filter func(*XRPCStreamEvent) bool 81 81 82 82 done chan struct{} 83 83 } ··· 86 86 EvtKindErrorFrame = -1 87 87 EvtKindRepoAppend = 1 88 88 EvtKindInfoFrame = 2 89 + EvtKindLabelBatch = 3 89 90 ) 90 91 91 92 type EventHeader struct { 92 93 Op int64 `cborgen:"op"` 93 94 } 94 95 95 - type RepoStreamEvent struct { 96 - Append *RepoAppend 97 - Info *InfoFrame 98 - Error *ErrorFrame 96 + type XRPCStreamEvent struct { 97 + RepoAppend *RepoAppend 98 + Info *InfoFrame 99 + Error *ErrorFrame 100 + LabelBatch *LabelBatch 99 101 100 102 // some private fields for internal routing perf 101 103 PrivUid util.Uid `json:"-" cborgen:"-"` ··· 130 132 Cid *string `cborgen:"cid"` 131 133 } 132 134 135 + type LabelBatch struct { 136 + Seq int64 `cborgen:"seq"` 137 + Labels []Label `cborgen:"labels"` 138 + } 139 + 133 140 type InfoFrame struct { 134 141 Info string `cborgen:"info"` 135 142 Message string `cborgen:"message"` ··· 140 147 Message string `cborgen:"message"` 141 148 } 142 149 143 - func (em *EventManager) AddEvent(ev *RepoStreamEvent) error { 150 + func (em *EventManager) AddEvent(ev *XRPCStreamEvent) error { 151 + select { 152 + case em.ops <- &Operation{ 153 + op: opSend, 154 + evt: ev, 155 + }: 156 + return nil 157 + case <-em.closed: 158 + return fmt.Errorf("event manager shut down") 159 + } 160 + } 161 + 162 + func (em *EventManager) AddLabelEvent(ev *XRPCStreamEvent) error { 144 163 select { 145 164 case em.ops <- &Operation{ 146 165 op: opSend, ··· 152 171 } 153 172 } 154 173 155 - func (em *EventManager) Subscribe(ctx context.Context, filter func(*RepoStreamEvent) bool, since *int64) (<-chan *RepoStreamEvent, func(), error) { 174 + func (em *EventManager) Subscribe(ctx context.Context, filter func(*XRPCStreamEvent) bool, since *int64) (<-chan *XRPCStreamEvent, func(), error) { 156 175 if filter == nil { 157 - filter = func(*RepoStreamEvent) bool { return true } 176 + filter = func(*XRPCStreamEvent) bool { return true } 158 177 } 159 178 160 179 done := make(chan struct{}) 161 180 sub := &Subscriber{ 162 - outgoing: make(chan *RepoStreamEvent, em.bufferSize), 181 + outgoing: make(chan *XRPCStreamEvent, em.bufferSize), 163 182 filter: filter, 164 183 done: done, 165 184 } 166 185 167 186 go func() { 168 187 if since != nil { 169 - if err := em.persister.Playback(ctx, *since, func(e *RepoStreamEvent) error { 188 + if err := em.persister.Playback(ctx, *since, func(e *XRPCStreamEvent) error { 170 189 select { 171 190 case <-done: 172 191 return fmt.Errorf("shutting down")
+12
events/label.go
··· 1 + package events 2 + 3 + // this is here, instead of under 'labeling' package, to avoid an import loop 4 + type Label struct { 5 + LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.label.label"` 6 + SourceDid string `json:"src" cborgen:"src"` 7 + SubjectUri string `json:"uri" cborgen:"uri"` 8 + SubjectCid *string `json:"cid,omitempty" cborgen:"cid"` 9 + Value string `json:"val" cborgen:"val"` 10 + Timestamp string `json:"ts" cborgen:"ts"` // TODO: actual timestamp? 11 + LabelUri *string `json:"labeluri,omitempty" cborgen:"labeluri"` 12 + }
-165
events/label_events.go
··· 1 - package events 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - ) 7 - 8 - type LabelEventManager struct { 9 - subs []*LabelSubscriber 10 - 11 - ops chan *LabelOperation 12 - closed chan struct{} 13 - bufferSize int 14 - 15 - persister LabelEventPersistence 16 - } 17 - 18 - func NewLabelEventManager(persister LabelEventPersistence) *LabelEventManager { 19 - return &LabelEventManager{ 20 - ops: make(chan *LabelOperation), 21 - closed: make(chan struct{}), 22 - bufferSize: 1024, 23 - persister: persister, 24 - } 25 - } 26 - 27 - type LabelOperation struct { 28 - op int 29 - sub *LabelSubscriber 30 - evt *LabelStreamEvent 31 - } 32 - 33 - func (em *LabelEventManager) Run() { 34 - for op := range em.ops { 35 - switch op.op { 36 - case opSubscribe: 37 - em.subs = append(em.subs, op.sub) 38 - case opUnsubscribe: 39 - for i, s := range em.subs { 40 - if s == op.sub { 41 - em.subs[i] = em.subs[len(em.subs)-1] 42 - em.subs = em.subs[:len(em.subs)-1] 43 - break 44 - } 45 - } 46 - case opSend: 47 - if err := em.persister.Persist(context.TODO(), op.evt); err != nil { 48 - log.Errorf("failed to persist outbound event: %s", err) 49 - } 50 - 51 - for _, s := range em.subs { 52 - if s.filter(op.evt) { 53 - select { 54 - case s.outgoing <- op.evt: 55 - default: 56 - log.Error("event overflow") 57 - } 58 - } 59 - } 60 - default: 61 - log.Errorf("unrecognized eventmgr operation: %d", op.op) 62 - } 63 - } 64 - } 65 - 66 - type LabelSubscriber struct { 67 - outgoing chan *LabelStreamEvent 68 - 69 - filter func(*LabelStreamEvent) bool 70 - 71 - done chan struct{} 72 - } 73 - 74 - const ( 75 - LEvtKindErrorFrame = -1 76 - LEvtKindLabelBatch = 1 77 - LEvtKindInfoFrame = 2 78 - ) 79 - 80 - // EventHeader shared with repo events 81 - 82 - type LabelStreamEvent struct { 83 - Batch *LabelBatch 84 - Info *InfoFrame 85 - Error *ErrorFrame 86 - } 87 - 88 - type LabelBatch struct { 89 - Seq int64 `cborgen:"seq"` 90 - Labels []Label `cborgen:"labels"` 91 - // TODO: time? 92 - } 93 - 94 - // this is here, instead of under 'labeling' package, to avoid an import loop 95 - type Label struct { 96 - LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.label.label"` 97 - SourceDid string `json:"src" cborgen:"src"` 98 - SubjectUri string `json:"uri" cborgen:"uri"` 99 - SubjectCid *string `json:"cid,omitempty" cborgen:"cid"` 100 - Value string `json:"val" cborgen:"val"` 101 - Timestamp string `json:"ts" cborgen:"ts"` // TODO: actual timestamp? 102 - LabelUri *string `json:"labeluri,omitempty" cborgen:"labeluri"` 103 - } 104 - 105 - func (em *LabelEventManager) AddEvent(ev *LabelStreamEvent) error { 106 - select { 107 - case em.ops <- &LabelOperation{ 108 - op: opSend, 109 - evt: ev, 110 - }: 111 - return nil 112 - case <-em.closed: 113 - return fmt.Errorf("event manager shut down") 114 - } 115 - } 116 - 117 - func (em *LabelEventManager) Subscribe(ctx context.Context, filter func(*LabelStreamEvent) bool, since *int64) (<-chan *LabelStreamEvent, func(), error) { 118 - if filter == nil { 119 - filter = func(*LabelStreamEvent) bool { return true } 120 - } 121 - 122 - done := make(chan struct{}) 123 - sub := &LabelSubscriber{ 124 - outgoing: make(chan *LabelStreamEvent, em.bufferSize), 125 - filter: filter, 126 - done: done, 127 - } 128 - 129 - go func() { 130 - if since != nil { 131 - if err := em.persister.Playback(ctx, *since, func(e *LabelStreamEvent) error { 132 - select { 133 - case <-done: 134 - return fmt.Errorf("shutting down") 135 - case sub.outgoing <- e: 136 - return nil 137 - } 138 - }); err != nil { 139 - log.Errorf("events playback: %s", err) 140 - } 141 - } 142 - 143 - select { 144 - case em.ops <- &LabelOperation{ 145 - op: opSubscribe, 146 - sub: sub, 147 - }: 148 - case <-em.closed: 149 - log.Errorf("failed to subscribe, event manager shut down") 150 - } 151 - }() 152 - 153 - cleanup := func() { 154 - close(done) 155 - select { 156 - case em.ops <- &LabelOperation{ 157 - op: opUnsubscribe, 158 - sub: sub, 159 - }: 160 - case <-em.closed: 161 - } 162 - } 163 - 164 - return sub.outgoing, cleanup, nil 165 - }
-57
events/label_persist.go
··· 1 - package events 2 - 3 - import ( 4 - "context" 5 - "sync" 6 - ) 7 - 8 - type LabelEventPersistence interface { 9 - Persist(ctx context.Context, e *LabelStreamEvent) error 10 - Playback(ctx context.Context, since int64, cb func(*LabelStreamEvent) error) error 11 - } 12 - 13 - // MemLabelPersister is the most naive implementation of event persistence 14 - // ill do better later 15 - type MemLabelPersister struct { 16 - buf []*LabelStreamEvent 17 - lk sync.Mutex 18 - seq int64 19 - } 20 - 21 - func NewMemLabelPersister() *MemLabelPersister { 22 - return &MemLabelPersister{} 23 - } 24 - 25 - func (mp *MemLabelPersister) Persist(ctx context.Context, e *LabelStreamEvent) error { 26 - mp.lk.Lock() 27 - defer mp.lk.Unlock() 28 - mp.seq++ 29 - switch { 30 - case e.Batch != nil: 31 - e.Batch.Seq = mp.seq 32 - default: 33 - panic("no event in persist call") 34 - } 35 - mp.buf = append(mp.buf, e) 36 - 37 - return nil 38 - } 39 - 40 - func (mp *MemLabelPersister) Playback(ctx context.Context, since int64, cb func(*LabelStreamEvent) error) error { 41 - mp.lk.Lock() 42 - l := len(mp.buf) 43 - mp.lk.Unlock() 44 - 45 - if since >= int64(l) { 46 - return nil 47 - } 48 - 49 - // TODO: abusing the fact that buf[0].seq is currently always 1 50 - for _, e := range mp.buf[since:l] { 51 - if err := cb(e); err != nil { 52 - return err 53 - } 54 - } 55 - 56 - return nil 57 - }
+11 -7
events/persist.go
··· 5 5 "sync" 6 6 ) 7 7 8 + // Note that this interface looks generic, but some persisters might only work with RepoAppend or LabelBatch 8 9 type EventPersistence interface { 9 - Persist(ctx context.Context, e *RepoStreamEvent) error 10 - Playback(ctx context.Context, since int64, cb func(*RepoStreamEvent) error) error 10 + Persist(ctx context.Context, e *XRPCStreamEvent) error 11 + Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error 11 12 } 12 13 13 14 // MemPersister is the most naive implementation of event persistence 15 + // This EventPersistence option works fine with all event types 14 16 // ill do better later 15 17 type MemPersister struct { 16 - buf []*RepoStreamEvent 18 + buf []*XRPCStreamEvent 17 19 lk sync.Mutex 18 20 seq int64 19 21 } ··· 22 24 return &MemPersister{} 23 25 } 24 26 25 - func (mp *MemPersister) Persist(ctx context.Context, e *RepoStreamEvent) error { 27 + func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error { 26 28 mp.lk.Lock() 27 29 defer mp.lk.Unlock() 28 30 mp.seq++ 29 31 switch { 30 - case e.Append != nil: 31 - e.Append.Seq = mp.seq 32 + case e.RepoAppend != nil: 33 + e.RepoAppend.Seq = mp.seq 34 + case e.LabelBatch != nil: 35 + e.LabelBatch.Seq = mp.seq 32 36 default: 33 37 panic("no event in persist call") 34 38 } ··· 37 41 return nil 38 42 } 39 43 40 - func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*RepoStreamEvent) error) error { 44 + func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { 41 45 mp.lk.Lock() 42 46 l := len(mp.buf) 43 47 mp.lk.Unlock()
+1 -1
events/repostream.go
··· 15 15 16 16 func ConsumeRepoStreamLite(ctx context.Context, con *websocket.Conn, cb LiteStreamHandleFunc) error { 17 17 return HandleRepoStream(ctx, con, &RepoStreamCallbacks{ 18 - Append: func(evt *RepoAppend) error { 18 + RepoAppend: func(evt *RepoAppend) error { 19 19 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 20 20 if err != nil { 21 21 return err
+2 -2
indexer/indexer.go
··· 138 138 } 139 139 140 140 log.Infow("Sending event", "did", did) 141 - if err := ix.events.AddEvent(&events.RepoStreamEvent{ 142 - Append: &events.RepoAppend{ 141 + if err := ix.events.AddEvent(&events.XRPCStreamEvent{ 142 + RepoAppend: &events.RepoAppend{ 143 143 Repo: did, 144 144 Prev: prevstr, 145 145 Blocks: evt.RepoSlice,
+15 -15
labeling/service.go
··· 35 35 cs *carstore.CarStore 36 36 repoman *repomgr.RepoManager 37 37 bgsSlurper *bgs.Slurper 38 - levents *events.LabelEventManager 38 + evtmgr *events.EventManager 39 39 echo *echo.Echo 40 40 user *LabelmakerRepoConfig 41 41 kwl []KeywordLabeler ··· 61 61 62 62 didr := &api.PLCServer{Host: plcUrl} 63 63 kmgr := indexer.NewKeyManager(didr, serkey) 64 - levtman := events.NewLabelEventManager(events.NewMemLabelPersister()) 64 + evtmgr := events.NewEventManager(events.NewMemPersister()) 65 65 repoman := repomgr.NewRepoManager(db, cs, kmgr) 66 66 67 67 user := &LabelmakerRepoConfig{ ··· 76 76 s := &Server{ 77 77 db: db, 78 78 repoman: repoman, 79 - levents: levtman, 79 + evtmgr: evtmgr, 80 80 user: user, 81 81 kwl: []KeywordLabeler{kl}, 82 82 // sluper configured below ··· 98 98 slurp := bgs.NewSlurper(db, s.handleBgsRepoEvent, useWss) 99 99 s.bgsSlurper = slurp 100 100 101 - go levtman.Run() 101 + go evtmgr.Run() 102 102 103 103 return s, nil 104 104 } ··· 162 162 // Process incoming repo events coming from BGS, which includes new and updated 163 163 // records from any PDS. This function extracts records, handes them to the 164 164 // labeling routine, and then persists and broadcasts any resulting labels 165 - func (s *Server) handleBgsRepoEvent(ctx context.Context, pds *models.PDS, evt *events.RepoStreamEvent) error { 165 + func (s *Server) handleBgsRepoEvent(ctx context.Context, pds *models.PDS, evt *events.XRPCStreamEvent) error { 166 166 167 - if evt.Append == nil { 167 + if evt.RepoAppend == nil { 168 168 // TODO(bnewbold): is this really invalid? do we need to handle Info and Error events here? 169 169 return fmt.Errorf("invalid repo append event") 170 170 } 171 171 172 172 // quick check if we can skip processing the CAR slice entirely 173 - if !s.wantAnyRecords(ctx, evt.Append) { 173 + if !s.wantAnyRecords(ctx, evt.RepoAppend) { 174 174 return nil 175 175 } 176 176 177 177 // use an in-memory blockstore with repo wrapper to parse CAR slice 178 - sliceRepo, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Append.Blocks)) 178 + sliceRepo, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.RepoAppend.Blocks)) 179 179 if err != nil { 180 180 log.Warnw("failed to parse CAR slice", "repoErr", err) 181 181 return err ··· 184 184 now := time.Now().Format(util.ISO8601) 185 185 labels := []events.Label{} 186 186 187 - for _, op := range evt.Append.Ops { 188 - uri := "at://" + evt.Append.Repo + "/" + op.Path 187 + for _, op := range evt.RepoAppend.Ops { 188 + uri := "at://" + evt.RepoAppend.Repo + "/" + op.Path 189 189 nsid := strings.SplitN(op.Path, "/", 2)[0] 190 190 191 191 if !(op.Action == "create" || op.Action == "update") { ··· 223 223 log.Infof("persisted label: %s", labeluri) 224 224 } 225 225 226 - // ... then re-publish as LabelStreamEvent 226 + // ... then re-publish as XRPCStreamEvent 227 227 log.Infof("%s", labels) 228 228 if len(labels) > 0 { 229 - lev := events.LabelStreamEvent{ 230 - Batch: &events.LabelBatch{ 229 + lev := events.XRPCStreamEvent{ 230 + LabelBatch: &events.LabelBatch{ 231 231 // NOTE(bnewbold): seems like other code handles Seq field automatically 232 232 Labels: labels, 233 233 }, 234 234 } 235 - err = s.levents.AddEvent(&lev) 235 + err = s.evtmgr.AddEvent(&lev) 236 236 if err != nil { 237 - return fmt.Errorf("failed to publish LabelStreamEvent: %w", err) 237 + return fmt.Errorf("failed to publish XRPCStreamEvent: %w", err) 238 238 } 239 239 } 240 240 // TODO(bnewbold): persist state that we successfully processed the repo event (aka,
+7 -7
labeling/ws_endpoints.go
··· 28 28 return fmt.Errorf("upgrading websocket: %w", err) 29 29 } 30 30 31 - evts, cancel, err := s.levents.Subscribe(ctx, func(evt *events.LabelStreamEvent) bool { 31 + evts, cancel, err := s.evtmgr.Subscribe(ctx, func(evt *events.XRPCStreamEvent) bool { 32 32 return true 33 33 }, since) 34 34 if err != nil { ··· 36 36 } 37 37 defer cancel() 38 38 39 - header := events.EventHeader{Op: events.LEvtKindLabelBatch} 39 + header := events.EventHeader{Op: events.EvtKindLabelBatch} 40 40 for { 41 41 select { 42 42 case evt := <-evts: ··· 48 48 var obj lexutil.CBOR 49 49 50 50 switch { 51 - case evt.Batch != nil: 52 - header.Op = events.LEvtKindLabelBatch 53 - obj = evt.Batch 51 + case evt.LabelBatch != nil: 52 + header.Op = events.EvtKindLabelBatch 53 + obj = evt.LabelBatch 54 54 case evt.Error != nil: 55 - header.Op = events.LEvtKindErrorFrame 55 + header.Op = events.EvtKindErrorFrame 56 56 obj = evt.Error 57 57 case evt.Info != nil: 58 - header.Op = events.LEvtKindInfoFrame 58 + header.Op = events.EvtKindInfoFrame 59 59 obj = evt.Info 60 60 default: 61 61 return fmt.Errorf("unrecognized event kind")
+6 -6
pds/server.go
··· 122 122 return s.echo.Shutdown(ctx) 123 123 } 124 124 125 - func (s *Server) handleFedEvent(ctx context.Context, host *Peering, env *events.RepoStreamEvent) error { 125 + func (s *Server) handleFedEvent(ctx context.Context, host *Peering, env *events.XRPCStreamEvent) error { 126 126 fmt.Printf("[%s] got fed event from %q\n", s.serviceUrl, host.Host) 127 127 switch { 128 - case env.Append != nil: 129 - evt := env.Append 128 + case env.RepoAppend != nil: 129 + evt := env.RepoAppend 130 130 u, err := s.lookupUserByDid(ctx, evt.Repo) 131 131 if err != nil { 132 132 if !errors.Is(err, gorm.ErrRecordNotFound) { ··· 629 629 630 630 ctx := c.Request().Context() 631 631 632 - evts, cancel, err := s.events.Subscribe(ctx, func(evt *events.RepoStreamEvent) bool { 632 + evts, cancel, err := s.events.Subscribe(ctx, func(evt *events.XRPCStreamEvent) bool { 633 633 if !s.enforcePeering { 634 634 return true 635 635 } ··· 660 660 var obj util.CBOR 661 661 662 662 switch { 663 - case evt.Append != nil: 663 + case evt.RepoAppend != nil: 664 664 header.Op = events.EvtKindRepoAppend 665 - obj = evt.Append 665 + obj = evt.RepoAppend 666 666 case evt.Info != nil: 667 667 header.Op = events.EvtKindInfoFrame 668 668 obj = evt.Info
+6 -6
testing/integ_test.go
··· 46 46 47 47 fmt.Println("event 1") 48 48 e1 := evts.Next() 49 - assert.NotNil(e1.Append) 50 - assert.Equal(e1.Append.Repo, bob.DID()) 49 + assert.NotNil(e1.RepoAppend) 50 + assert.Equal(e1.RepoAppend.Repo, bob.DID()) 51 51 52 52 fmt.Println("event 2") 53 53 e2 := evts.Next() 54 - assert.NotNil(e2.Append) 55 - assert.Equal(e2.Append.Repo, alice.DID()) 54 + assert.NotNil(e2.RepoAppend) 55 + assert.Equal(e2.RepoAppend.Repo, alice.DID()) 56 56 57 57 fmt.Println("event 3") 58 58 e3 := evts.Next() 59 - assert.Equal(e3.Append.Repo, bob.DID()) 59 + assert.Equal(e3.RepoAppend.Repo, bob.DID()) 60 60 //assert.Equal(e3.RepoAppend.Ops[0].Kind, "createRecord") 61 61 62 62 fmt.Println("event 4") 63 63 e4 := evts.Next() 64 - assert.Equal(e4.Append.Repo, alice.DID()) 64 + assert.Equal(e4.RepoAppend.Repo, alice.DID()) 65 65 //assert.Equal(e4.RepoAppend.Ops[0].Kind, "createRecord") 66 66 67 67 // playback
+6 -6
testing/utils.go
··· 433 433 434 434 type eventStream struct { 435 435 lk sync.Mutex 436 - events []*events.RepoStreamEvent 436 + events []*events.XRPCStreamEvent 437 437 cancel func() 438 438 439 439 cur int ··· 470 470 471 471 go func() { 472 472 if err := events.HandleRepoStream(ctx, con, &events.RepoStreamCallbacks{ 473 - Append: func(evt *events.RepoAppend) error { 473 + RepoAppend: func(evt *events.RepoAppend) error { 474 474 fmt.Println("received event: ", evt.Seq, evt.Repo) 475 475 es.lk.Lock() 476 - es.events = append(es.events, &events.RepoStreamEvent{Append: evt}) 476 + es.events = append(es.events, &events.XRPCStreamEvent{RepoAppend: evt}) 477 477 es.lk.Unlock() 478 478 return nil 479 479 }, ··· 485 485 return es 486 486 } 487 487 488 - func (es *eventStream) Next() *events.RepoStreamEvent { 488 + func (es *eventStream) Next() *events.XRPCStreamEvent { 489 489 defer es.lk.Unlock() 490 490 for { 491 491 es.lk.Lock() ··· 498 498 } 499 499 } 500 500 501 - func (es *eventStream) All() []*events.RepoStreamEvent { 501 + func (es *eventStream) All() []*events.XRPCStreamEvent { 502 502 es.lk.Lock() 503 503 defer es.lk.Unlock() 504 - out := make([]*events.RepoStreamEvent, len(es.events)) 504 + out := make([]*events.XRPCStreamEvent, len(es.events)) 505 505 for i, e := range es.events { 506 506 out[i] = e 507 507 }