this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

other delivery modes

dholms 092f704f da526d34

+232 -28
+19 -6
nexus/handlers.go
··· 27 27 } 28 28 29 29 func (n *Nexus) handleListen(c echo.Context) error { 30 + if n.outbox.mode == OutboxModeWebhook { 31 + return echo.NewHTTPError(http.StatusBadRequest, "websocket not available in webhook mode") 32 + } 33 + 30 34 ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) 31 35 if err != nil { 32 36 return err ··· 35 39 36 40 n.logger.Info("websocket connected") 37 41 38 - // read loop so we can detect if the client disconnects 42 + // read loop to detect disconnects and handle acks in websocket-ack mode 39 43 disconnected := make(chan struct{}) 44 + 40 45 go func() { 41 46 for { 42 - if _, _, err := ws.ReadMessage(); err != nil { 47 + var msg AckMessage 48 + if err := ws.ReadJSON(&msg); err != nil { 43 49 close(disconnected) 44 50 return 45 51 } 52 + 53 + // Process acks directly in websocket-ack mode 54 + if n.outbox.mode == OutboxModeWebsocketAck { 55 + n.outbox.AckEvent(msg.ID) 56 + } 46 57 } 47 58 }() 48 - 49 - evtCh := n.outbox.Subscribe(c.Request().Context()) 50 59 51 60 for { 52 61 select { 53 62 case <-disconnected: 54 63 n.logger.Info("websocket disconnected") 55 64 return nil 56 - case evt, ok := <-evtCh: 65 + case evt, ok := <-n.outbox.events: 57 66 if !ok { 58 67 return nil 59 68 } ··· 61 70 n.logger.Info("websocket write error", "error", err) 62 71 return err 63 72 } 64 - close(evt.AckCh) 73 + // In fire-and-forget mode, ack immediately after write succeeds 74 + // In websocket-ack mode, wait for client to send ack and handle in read loop 75 + if n.outbox.mode == OutboxModeFireAndForget { 76 + n.outbox.AckEvent(evt.ID) 77 + } 65 78 } 66 79 } 67 80 }
+12 -1
nexus/nexus.go
··· 40 40 FirehoseParallelism int 41 41 FirehoseCursorSaveInterval time.Duration 42 42 FullNetworkMode bool 43 + DisableAcks bool 44 + WebhookURL string 43 45 } 44 46 45 47 func NewNexus(config NexusConfig) (*Nexus, error) { ··· 63 65 } 64 66 cdir := identity.NewCacheDirectory(&bdir, 2_000_000, time.Hour*24, time.Minute*2, time.Minute*5) 65 67 68 + var outboxMode OutboxMode 69 + if config.WebhookURL != "" { 70 + outboxMode = OutboxModeWebhook 71 + } else if config.DisableAcks { 72 + outboxMode = OutboxModeFireAndForget 73 + } else { 74 + outboxMode = OutboxModeWebsocketAck 75 + } 76 + 66 77 n := &Nexus{ 67 78 db: db, 68 79 echo: e, ··· 70 81 71 82 Dir: &cdir, 72 83 73 - outbox: NewOutbox(db), 84 + outbox: NewOutbox(db, outboxMode, config.WebhookURL), 74 85 75 86 FullNetworkMode: config.FullNetworkMode, 76 87 RelayHost: config.RelayHost,
+179 -20
nexus/outbox.go
··· 1 1 package main 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "encoding/json" 7 + "fmt" 6 8 "log/slog" 9 + "net/http" 10 + "sync" 7 11 "time" 8 12 9 13 "github.com/bluesky-social/indigo/nexus/models" 10 14 "gorm.io/gorm" 11 15 ) 12 16 17 + type InFlightEvent struct { 18 + ID uint 19 + DID string 20 + Event *OutboxEvt 21 + SentAt time.Time 22 + } 23 + 13 24 type Outbox struct { 14 - db *gorm.DB 15 - logger *slog.Logger 16 - notify chan struct{} 17 - events chan *OutboxEvt 25 + db *gorm.DB 26 + logger *slog.Logger 27 + notify chan struct{} 28 + events chan *OutboxEvt 29 + mode OutboxMode 30 + webhookURL string 31 + httpClient *http.Client 32 + inFlightEvents map[uint]*InFlightEvent 33 + inFlightDIDs map[string]uint 34 + inFlightMu sync.RWMutex 18 35 } 19 36 20 - func NewOutbox(db *gorm.DB) *Outbox { 37 + func NewOutbox(db *gorm.DB, mode OutboxMode, webhookURL string) *Outbox { 21 38 return &Outbox{ 22 - db: db, 23 - logger: slog.Default().With("system", "outbox"), 24 - notify: make(chan struct{}), 25 - events: make(chan *OutboxEvt, 1000), 39 + db: db, 40 + logger: slog.Default().With("system", "outbox"), 41 + notify: make(chan struct{}), 42 + events: make(chan *OutboxEvt, 1000), 43 + mode: mode, 44 + webhookURL: webhookURL, 45 + httpClient: &http.Client{ 46 + Timeout: 30 * time.Second, 47 + }, 48 + inFlightEvents: make(map[uint]*InFlightEvent), 49 + inFlightDIDs: make(map[string]uint), 26 50 } 27 51 } 28 52 ··· 30 54 ticker := time.NewTicker(100 * time.Millisecond) 31 55 defer ticker.Stop() 32 56 57 + // Start timeout checker for websocket-ack mode 58 + // Webhook mode doesn't need this since sendWebhook() retries indefinitely 59 + if o.mode == OutboxModeWebsocketAck { 60 + go o.checkTimeouts(ctx) 61 + } 62 + 33 63 for { 34 64 select { 35 65 case <-ctx.Done(): ··· 42 72 } 43 73 } 44 74 75 + // notify outbox of new event in buffer 76 + // for active outboxes, channel will basically always be firing 77 + func (o *Outbox) Notify() { 78 + select { 79 + case o.notify <- struct{}{}: 80 + default: 81 + } 82 + } 83 + 45 84 func (o *Outbox) deliverPending() { 46 85 var events []models.OutboxBuffer 47 86 if err := o.db.Order("id ASC").Limit(100).Find(&events).Error; err != nil { ··· 60 99 continue 61 100 } 62 101 63 - outboxEvt.AckCh = make(chan struct{}) 102 + // Set the ID from the database record 103 + outboxEvt.ID = evt.ID 104 + did := outboxEvt.DID() 105 + 106 + // Check if this DID already has an in-flight event 107 + o.inFlightMu.RLock() 108 + _, hasInFlight := o.inFlightDIDs[did] 109 + o.inFlightMu.RUnlock() 110 + 111 + if hasInFlight { 112 + // Skip this event for now, will be retried later 113 + continue 114 + } 115 + 116 + o.trackInFlight(&evt, &outboxEvt, did) 117 + 118 + // Deliver based on consumer mode 119 + switch o.mode { 120 + case OutboxModeFireAndForget: 121 + o.events <- &outboxEvt 122 + case OutboxModeWebsocketAck: 123 + o.events <- &outboxEvt 124 + case OutboxModeWebhook: 125 + go o.sendWebhook(evt.ID, &outboxEvt) 126 + } 127 + } 128 + } 129 + 130 + func (o *Outbox) trackInFlight(dbEvt *models.OutboxBuffer, outboxEvt *OutboxEvt, did string) { 131 + o.inFlightMu.Lock() 132 + o.inFlightEvents[dbEvt.ID] = &InFlightEvent{ 133 + ID: dbEvt.ID, 134 + DID: did, 135 + Event: outboxEvt, 136 + SentAt: time.Now(), 137 + } 138 + o.inFlightDIDs[did] = dbEvt.ID 139 + o.inFlightMu.Unlock() 140 + } 141 + 142 + func (o *Outbox) AckEvent(eventID uint) { 143 + o.inFlightMu.Lock() 144 + defer o.inFlightMu.Unlock() 64 145 65 - o.events <- &outboxEvt 146 + // Check if this was a tracked event (websocket-ack or webhook mode) 147 + inFlight, exists := o.inFlightEvents[eventID] 148 + if exists { 149 + // Remove from in-flight tracking 150 + delete(o.inFlightDIDs, inFlight.DID) 151 + delete(o.inFlightEvents, eventID) 152 + o.logger.Info("event acked", "id", eventID, "did", inFlight.DID) 153 + } 66 154 67 - <-outboxEvt.AckCh 155 + // Delete from DB (for both tracked and untracked events) 156 + if err := o.db.Delete(&models.OutboxBuffer{}, eventID).Error; err != nil { 157 + o.logger.Error("failed to delete acked event", "error", err, "id", eventID) 158 + } 159 + } 68 160 69 - if err := o.db.Delete(&evt).Error; err != nil { 70 - o.logger.Error("failed to delete outbox event", "error", err, "id", evt.ID) 161 + func (o *Outbox) sendWebhook(eventID uint, evt *OutboxEvt) { 162 + backoff := 0 163 + for { 164 + if err := o.postWebhook(evt); err != nil { 165 + o.logger.Warn("webhook failed, retrying", "error", err, "id", eventID, "backoff", backoff) 166 + time.Sleep(webhookBackoff(backoff)) 167 + backoff++ 168 + continue 71 169 } 170 + 171 + // Success - ack the event 172 + o.AckEvent(eventID) 173 + return 72 174 } 73 175 } 74 176 75 - func (o *Outbox) Notify() { 76 - select { 77 - case o.notify <- struct{}{}: 78 - default: 177 + func webhookBackoff(attempt int) time.Duration { 178 + if attempt == 0 { 179 + return time.Second 180 + } 181 + // Exponential: 1s, 2s, 4s, 8s, cap at 10s 182 + duration := time.Second * (1 << attempt) 183 + if duration > 10*time.Second { 184 + duration = 10 * time.Second 185 + } 186 + return duration 187 + } 188 + 189 + func (o *Outbox) postWebhook(evt *OutboxEvt) error { 190 + body, err := json.Marshal(evt) 191 + if err != nil { 192 + return fmt.Errorf("failed to marshal event: %w", err) 193 + } 194 + 195 + req, err := http.NewRequest("POST", o.webhookURL, bytes.NewReader(body)) 196 + if err != nil { 197 + return fmt.Errorf("failed to create request: %w", err) 198 + } 199 + 200 + req.Header.Set("Content-Type", "application/json") 201 + 202 + resp, err := o.httpClient.Do(req) 203 + if err != nil { 204 + return fmt.Errorf("failed to send request: %w", err) 205 + } 206 + defer resp.Body.Close() 207 + 208 + if resp.StatusCode < 200 || resp.StatusCode >= 300 { 209 + return fmt.Errorf("webhook returned non-2xx status: %d", resp.StatusCode) 210 + } 211 + 212 + return nil 213 + } 214 + 215 + func (o *Outbox) checkTimeouts(ctx context.Context) { 216 + ticker := time.NewTicker(1 * time.Second) 217 + defer ticker.Stop() 218 + 219 + for { 220 + select { 221 + case <-ctx.Done(): 222 + return 223 + case <-ticker.C: 224 + o.retryTimedOutEvents() 225 + } 79 226 } 80 227 } 81 228 82 - func (o *Outbox) Subscribe(ctx context.Context) <-chan *OutboxEvt { 83 - return o.events 229 + func (o *Outbox) retryTimedOutEvents() { 230 + o.inFlightMu.Lock() 231 + defer o.inFlightMu.Unlock() 232 + 233 + now := time.Now() 234 + for id, inFlight := range o.inFlightEvents { 235 + if now.Sub(inFlight.SentAt) > 10*time.Second { 236 + o.logger.Info("event timed out, resending", "id", id, "did", inFlight.DID) 237 + 238 + // Remove from in-flight tracking so it can be retried 239 + delete(o.inFlightDIDs, inFlight.DID) 240 + delete(o.inFlightEvents, id) 241 + } 242 + } 84 243 }
+22 -1
nexus/types.go
··· 2 2 3 3 import "github.com/bluesky-social/indigo/nexus/models" 4 4 5 + type OutboxMode string 6 + 7 + const ( 8 + OutboxModeFireAndForget OutboxMode = "fire-and-forget" 9 + OutboxModeWebsocketAck OutboxMode = "websocket-ack" 10 + OutboxModeWebhook OutboxMode = "webhook" 11 + ) 12 + 5 13 type Commit struct { 6 14 Did string `json:"did"` 7 15 Rev string `json:"rev"` ··· 51 59 } 52 60 53 61 type OutboxEvt struct { 62 + ID uint `json:"id"` 54 63 Type string `json:"type"` 55 64 RecordEvt *RecordEvt `json:"record,omitempty"` 56 65 UserEvt *UserEvt `json:"user,omitempty"` 66 + } 57 67 58 - AckCh chan struct{} `json:"-"` 68 + func (evt *OutboxEvt) DID() string { 69 + if evt.RecordEvt != nil { 70 + return evt.RecordEvt.Did 71 + } 72 + if evt.UserEvt != nil { 73 + return evt.UserEvt.Did 74 + } 75 + return "" 76 + } 77 + 78 + type AckMessage struct { 79 + ID uint `json:"id"` 59 80 }