this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

wip

dholms 72ea73b0 a2294269

+194 -38
+35 -9
nexus/firehose.go
··· 6 6 "log/slog" 7 7 "net/http" 8 8 "net/url" 9 - "strings" 10 9 11 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 + "github.com/bluesky-social/indigo/atproto/data" 12 + "github.com/bluesky-social/indigo/atproto/repo" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 12 14 "github.com/bluesky-social/indigo/events" 13 15 "github.com/bluesky-social/indigo/events/schedulers/parallel" 14 16 "github.com/gorilla/websocket" ··· 74 76 if _, exists := nexus.filterDids[evt.Repo]; !exists { 75 77 return nil 76 78 } 79 + r, err := repo.VerifyCommitMessage(ctx, evt) 80 + if err != nil { 81 + nexus.logger.Info("failed to invert commit MST", "err", err) 82 + return err 83 + } 77 84 for _, op := range evt.Ops { 78 - parts := strings.Split(op.Path, "/") 79 - collection := parts[0] 80 - rkey := parts[1] 81 - err := nexus.outbox.Send(&Op{ 82 - DID: evt.Repo, 83 - Collection: collection, 84 - Rkey: rkey, 85 - }) 85 + coll, rkey, err := syntax.ParseRepoPath(op.Path) 86 + if err != nil { 87 + return err 88 + } 89 + 90 + outOp := &Op{ 91 + Did: evt.Repo, 92 + Collection: coll.String(), 93 + Rkey: rkey.String(), 94 + Action: op.Action, 95 + } 96 + 97 + // For creates and updates, get the record 98 + if op.Action == "create" || op.Action == "update" { 99 + recBytes, recCid, err := r.GetRecordBytes(ctx, coll, rkey) 100 + if err != nil { 101 + return err 102 + } 103 + rec, err := data.UnmarshalCBOR(recBytes) 104 + if err != nil { 105 + return err 106 + } 107 + outOp.Record = rec 108 + outOp.Cid = recCid.String() 109 + } 110 + 111 + err = nexus.outbox.Send(outOp) 86 112 if err != nil { 87 113 return err 88 114 }
+52
nexus/handlers.go
··· 1 1 package main 2 2 3 3 import ( 4 + "encoding/json" 4 5 "net/http" 5 6 6 7 "github.com/bluesky-social/indigo/nexus/models" ··· 28 29 } 29 30 30 31 func (n *Nexus) handleListen(c echo.Context) error { 32 + // Check if already connected 33 + if err := n.outbox.Connect(); err != nil { 34 + n.logger.Error("connection refused", "error", err) 35 + return echo.NewHTTPError(http.StatusConflict, "websocket already connected") 36 + } 37 + defer n.outbox.Disconnect() 38 + 31 39 // Upgrade to WebSocket 32 40 ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) 33 41 if err != nil { ··· 37 45 38 46 n.logger.Info("websocket connected") 39 47 48 + // Send buffered events first 49 + var bufferedEvts []models.BufferedEvt 50 + if err := n.db.Order("id ASC").Find(&bufferedEvts).Error; err != nil { 51 + n.logger.Error("failed to load buffered events", "error", err) 52 + return err 53 + } 54 + 55 + if len(bufferedEvts) > 0 { 56 + n.logger.Info("draining buffered events", "count", len(bufferedEvts)) 57 + for _, evt := range bufferedEvts { 58 + op := &Op{ 59 + Did: evt.Did, 60 + Collection: evt.Collection, 61 + Rkey: evt.Rkey, 62 + Action: evt.Action, 63 + Cid: evt.Cid, 64 + } 65 + 66 + if evt.Record != "" { 67 + var record map[string]interface{} 68 + if err := json.Unmarshal([]byte(evt.Record), &record); err != nil { 69 + n.logger.Error("failed to unmarshal record", "error", err, "id", evt.ID) 70 + continue 71 + } 72 + op.Record = record 73 + } 74 + 75 + if err := ws.WriteJSON(op); err != nil { 76 + n.logger.Info("websocket write error", "error", err) 77 + return nil 78 + } 79 + } 80 + 81 + // Delete buffered events 82 + if err := n.db.Delete(&bufferedEvts).Error; err != nil { 83 + n.logger.Error("failed to delete buffered events", "error", err) 84 + } else { 85 + n.logger.Info("cleared buffered events", "count", len(bufferedEvts)) 86 + } 87 + } 88 + 89 + // Mark that we're done draining and ready for live events 90 + n.outbox.StartLive() 91 + n.logger.Info("starting live event stream") 40 92 for op := range n.outbox.outCh { 41 93 if err := ws.WriteJSON(op); err != nil { 42 94 n.logger.Info("websocket write error", "error", err)
+7 -7
nexus/models/models.go
··· 4 4 Did string `gorm:"primaryKey"` 5 5 } 6 6 7 - type FilterCollection struct { 8 - Collection string `gorm:"primaryKey"` 9 - } 10 - 11 7 type BufferedEvt struct { 12 - ID uint `gorm:"primaryKey"` 13 - Did string `gorm:"not null;index"` 14 - Evt map[string]interface{} `gorm:"serializer:json"` 8 + ID uint `gorm:"primaryKey"` 9 + Did string `gorm:"not null;index"` 10 + Collection string `gorm:"not null"` 11 + Rkey string `gorm:"not null"` 12 + Action string `gorm:"not null"` 13 + Cid string `gorm:"type:text"` 14 + Record string `gorm:"type:text"` 15 15 }
+7 -4
nexus/nexus.go
··· 21 21 } 22 22 23 23 type Op struct { 24 - DID string `json:"did"` 25 - Collection string `json:"collection"` 26 - Rkey string `json:"rkey"` 24 + Did string `json:"did"` 25 + Collection string `json:"collection"` 26 + Rkey string `json:"rkey"` 27 + Action string `json:"action"` 28 + Record map[string]interface{} `json:"record,omitempty"` 29 + Cid string `json:"cid,omitempty"` 27 30 } 28 31 29 32 type NexusConfig struct { ··· 38 41 } 39 42 40 43 // Auto-migrate the schema 41 - if err := db.AutoMigrate(&models.BufferedEvt{}, &models.FilterCollection{}, &models.FilterDid{}); err != nil { 44 + if err := db.AutoMigrate(&models.BufferedEvt{}, &models.FilterDid{}); err != nil { 42 45 return nil, err 43 46 } 44 47
+93 -18
nexus/outbox.go
··· 1 1 package main 2 2 3 - import "gorm.io/gorm" 3 + import ( 4 + "encoding/json" 5 + "errors" 6 + "sync" 7 + 8 + "github.com/bluesky-social/indigo/nexus/models" 9 + "gorm.io/gorm" 10 + ) 11 + 12 + var ErrAlreadyConnected = errors.New("websocket already connected") 4 13 5 14 type Outbox struct { 6 - db *gorm.DB 7 - outCh chan *Op 15 + db *gorm.DB 16 + outCh chan *Op 17 + mu sync.RWMutex 18 + connected bool 19 + drainingBuf bool 8 20 } 9 21 10 22 func NewOutbox(db *gorm.DB) *Outbox { 11 23 return &Outbox{ 12 - db: db, 13 - outCh: make(chan *Op), 24 + db: db, 25 + outCh: make(chan *Op, 100), 26 + connected: false, 27 + drainingBuf: false, 14 28 } 29 + } 30 + 31 + func (o *Outbox) Connect() error { 32 + o.mu.Lock() 33 + defer o.mu.Unlock() 34 + 35 + if o.connected { 36 + return ErrAlreadyConnected 37 + } 38 + 39 + o.connected = true 40 + o.drainingBuf = true 41 + return nil 42 + } 43 + 44 + func (o *Outbox) StartLive() { 45 + o.mu.Lock() 46 + defer o.mu.Unlock() 47 + o.drainingBuf = false 48 + } 49 + 50 + func (o *Outbox) Disconnect() { 51 + o.mu.Lock() 52 + defer o.mu.Unlock() 53 + o.connected = false 54 + o.drainingBuf = false 55 + } 56 + 57 + func (o *Outbox) IsConnected() bool { 58 + o.mu.RLock() 59 + defer o.mu.RUnlock() 60 + return o.connected 15 61 } 16 62 17 63 func (o *Outbox) Send(op *Op) error { 18 - o.outCh <- op 19 - return nil 64 + o.mu.RLock() 65 + connected := o.connected 66 + drainingBuf := o.drainingBuf 67 + o.mu.RUnlock() 68 + 69 + // If connected but still draining buffered events, buffer to DB 70 + if drainingBuf { 71 + return o.bufferToDB(op) 72 + } 73 + 74 + if connected { 75 + select { 76 + case o.outCh <- op: 77 + return nil 78 + default: 79 + // Channel full, buffer to DB 80 + return o.bufferToDB(op) 81 + } 82 + } else { 83 + return o.bufferToDB(op) 84 + } 20 85 } 21 86 22 - // func (nexus *Nexus) handleOp(op *Op) error { 23 - // select { 24 - // case nexus.evtCh <- op: 25 - // default: 26 - // err := nexus.db.Create(&BufferedEvt{Did: op.DID, Evt: *op}).Error 27 - // if err != nil { 28 - // return err 29 - // } 30 - // } 31 - // return nil 32 - // } 87 + func (o *Outbox) bufferToDB(op *Op) error { 88 + var recordJSON string 89 + if op.Record != nil { 90 + bytes, err := json.Marshal(op.Record) 91 + if err != nil { 92 + return err 93 + } 94 + recordJSON = string(bytes) 95 + } 96 + 97 + err := o.db.Create(&models.BufferedEvt{ 98 + Did: op.Did, 99 + Collection: op.Collection, 100 + Rkey: op.Rkey, 101 + Action: op.Action, 102 + Cid: op.Cid, 103 + Record: recordJSON, 104 + }).Error 105 + 106 + return err 107 + }