this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

copy over some updates from the relay

+15 -34
+15 -34
splitter/splitter.go
··· 16 16 "github.com/bluesky-social/indigo/bgs" 17 17 events "github.com/bluesky-social/indigo/events" 18 18 "github.com/bluesky-social/indigo/events/schedulers/sequential" 19 - lexutil "github.com/bluesky-social/indigo/lex/util" 20 19 "github.com/gorilla/websocket" 21 20 logging "github.com/ipfs/go-log" 22 21 "github.com/labstack/echo/v4" ··· 43 42 } 44 43 45 44 func NewSplitter(host string) *Splitter { 46 - erb := NewEventRingBuffer(20_000, 1000) 45 + erb := NewEventRingBuffer(20_000, 10_000) 47 46 48 47 em := events.NewEventManager(erb) 49 48 return &Splitter{ ··· 255 254 "consumer_id", consumerID, 256 255 ) 257 256 258 - header := events.EventHeader{Op: events.EvtKindMessage} 259 257 for { 260 258 select { 261 - case evt := <-evts: 259 + case evt, ok := <-evts: 260 + if !ok { 261 + log.Error("event stream closed unexpectedly") 262 + return nil 263 + } 264 + 262 265 wc, err := conn.NextWriter(websocket.BinaryMessage) 263 266 if err != nil { 264 267 log.Errorf("failed to get next writer: %s", err) 265 268 return err 266 269 } 267 270 268 - var obj lexutil.CBOR 269 - 270 - switch { 271 - case evt.Error != nil: 272 - header.Op = events.EvtKindErrorFrame 273 - obj = evt.Error 274 - case evt.RepoCommit != nil: 275 - header.MsgType = "#commit" 276 - obj = evt.RepoCommit 277 - case evt.RepoHandle != nil: 278 - header.MsgType = "#handle" 279 - obj = evt.RepoHandle 280 - case evt.RepoInfo != nil: 281 - header.MsgType = "#info" 282 - obj = evt.RepoInfo 283 - case evt.RepoMigrate != nil: 284 - header.MsgType = "#migrate" 285 - obj = evt.RepoMigrate 286 - case evt.RepoTombstone != nil: 287 - header.MsgType = "#tombstone" 288 - obj = evt.RepoTombstone 289 - default: 290 - return fmt.Errorf("unrecognized event kind") 271 + if evt.Preserialized != nil { 272 + _, err = wc.Write(evt.Preserialized) 273 + } else { 274 + err = evt.Serialize(wc) 291 275 } 292 - 293 - if err := header.MarshalCBOR(wc); err != nil { 294 - return fmt.Errorf("failed to write header: %w", err) 295 - } 296 - 297 - if err := obj.MarshalCBOR(wc); err != nil { 276 + if err != nil { 298 277 return fmt.Errorf("failed to write event: %w", err) 299 278 } 300 279 301 280 if err := wc.Close(); err != nil { 302 - return fmt.Errorf("failed to flush-close our event write: %w", err) 281 + log.Warnf("failed to flush-close our event write: %s", err) 282 + return nil 303 283 } 284 + 304 285 lastWriteLk.Lock() 305 286 lastWrite = time.Now() 306 287 lastWriteLk.Unlock()