Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: profile jetstream watcher io timeouts

+34 -1
+34 -1
internal/firehose/profile_watcher.go
··· 167 167 pw.connMu.Unlock() 168 168 }() 169 169 170 + const pingInterval = 30 * time.Second 171 + const readTimeout = 90 * time.Second 172 + 173 + conn.SetPongHandler(func(string) error { 174 + conn.SetReadDeadline(time.Now().Add(readTimeout)) 175 + return nil 176 + }) 177 + 178 + pingTicker := time.NewTicker(pingInterval) 179 + defer pingTicker.Stop() 180 + 181 + go func() { 182 + for { 183 + select { 184 + case <-pingTicker.C: 185 + pw.connMu.Lock() 186 + c := pw.conn 187 + pw.connMu.Unlock() 188 + if c == nil { 189 + return 190 + } 191 + c.SetWriteDeadline(time.Now().Add(10 * time.Second)) 192 + if err := c.WriteMessage(websocket.PingMessage, nil); err != nil { 193 + return 194 + } 195 + case <-ctx.Done(): 196 + return 197 + case <-pw.stopCh: 198 + return 199 + } 200 + } 201 + }() 202 + 170 203 for { 171 204 select { 172 205 case <-ctx.Done(): ··· 176 209 default: 177 210 } 178 211 179 - conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 212 + conn.SetReadDeadline(time.Now().Add(readTimeout)) 180 213 _, message, err := conn.ReadMessage() 181 214 if err != nil { 182 215 return fmt.Errorf("read error: %w", err)