this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Track websocket consumers, expose them via an admin endpoint, list th… (#235)

…em in the UI, clean up connections when they stop responding

![CleanShot 2023-07-20 at 10 58
20](https://github.com/bluesky-social/indigo/assets/1617325/e7a08fc5-d00b-4f79-aca2-e90ae1026c1d)

authored by

Jaz and committed by
GitHub
72c340b8 17a856f2

+515 -3
+31
bgs/admin.go
··· 5 5 "net/http" 6 6 "strconv" 7 7 "strings" 8 + "time" 8 9 9 10 "github.com/bluesky-social/indigo/models" 10 11 "github.com/labstack/echo/v4" ··· 124 125 } 125 126 126 127 return e.JSON(200, enrichedPDSs) 128 + } 129 + 130 + type consumer struct { 131 + ID uint64 `json:"id"` 132 + RemoteAddr string `json:"remote_addr"` 133 + UserAgent string `json:"user_agent"` 134 + EventsConsumed uint64 `json:"events_consumed"` 135 + ConnectedAt time.Time `json:"connected_at"` 136 + } 137 + 138 + func (bgs *BGS) handleAdminListConsumers(e echo.Context) error { 139 + bgs.consumersLk.RLock() 140 + defer bgs.consumersLk.RUnlock() 141 + 142 + consumers := make([]consumer, 0, len(bgs.consumers)) 143 + for id, c := range bgs.consumers { 144 + var m = &dto.Metric{} 145 + if err := eventsReceivedCounter.WithLabelValues(c.RemoteAddr).Write(m); err != nil { 146 + continue 147 + } 148 + consumers = append(consumers, consumer{ 149 + ID: id, 150 + RemoteAddr: c.RemoteAddr, 151 + UserAgent: c.UserAgent, 152 + EventsConsumed: uint64(m.Counter.GetValue()), 153 + ConnectedAt: c.ConnectedAt, 154 + }) 155 + } 156 + 157 + return e.JSON(200, consumers) 127 158 } 128 159 129 160 func (bgs *BGS) handleAdminKillUpstreamConn(e echo.Context) error {
+92 -3
bgs/bgs.go
··· 35 35 "github.com/labstack/echo/v4" 36 36 "github.com/labstack/echo/v4/middleware" 37 37 promclient "github.com/prometheus/client_golang/prometheus" 38 + dto "github.com/prometheus/client_model/go" 38 39 "go.opentelemetry.io/otel" 39 40 "go.opentelemetry.io/otel/attribute" 40 41 "gorm.io/gorm" ··· 70 71 extUserLk sync.Mutex 71 72 72 73 repoman *repomgr.RepoManager 74 + 75 + // Management of Socket Consumers 76 + consumersLk sync.RWMutex 77 + nextConsumerID uint64 78 + consumers map[uint64]*SocketConsumer 79 + } 80 + 81 + type SocketConsumer struct { 82 + UserAgent string 83 + RemoteAddr string 84 + ConnectedAt time.Time 85 + EventsSent promclient.Counter 73 86 } 74 87 75 88 func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, blobs blobs.BlobStore, hr api.HandleResolver, ssl bool) (*BGS, error) { ··· 87 100 events: evtman, 88 101 didr: didr, 89 102 blobs: blobs, 103 + 104 + consumersLk: sync.RWMutex{}, 105 + consumers: make(map[uint64]*SocketConsumer), 90 106 } 91 107 92 108 ix.CreateExternalUser = bgs.createExternalUser ··· 287 303 admin.POST("/pds/block", bgs.handleBlockPDS) 288 304 admin.POST("/pds/unblock", bgs.handleUnblockPDS) 289 305 306 + // Consumer-related Admin API 307 + admin.GET("/consumers/list", bgs.handleAdminListConsumers) 308 + 290 309 // In order to support booting on random ports in tests, we need to tell the 291 310 // Echo instance it's already got a port, and then use its StartServer 292 311 // method to re-use that listener. ··· 384 403 Host string `json:"host"` 385 404 } 386 405 406 + func (bgs *BGS) registerConsumer(c *SocketConsumer) uint64 { 407 + bgs.consumersLk.Lock() 408 + defer bgs.consumersLk.Unlock() 409 + 410 + id := bgs.nextConsumerID 411 + bgs.nextConsumerID++ 412 + 413 + bgs.consumers[id] = c 414 + 415 + return id 416 + } 417 + 418 + func (bgs *BGS) cleanupConsumer(id uint64) { 419 + bgs.consumersLk.Lock() 420 + defer bgs.consumersLk.Unlock() 421 + 422 + c := bgs.consumers[id] 423 + 424 + var m = &dto.Metric{} 425 + if err := c.EventsSent.Write(m); err != nil { 426 + log.Errorf("failed to get sent counter: %s", err) 427 + } 428 + 429 + log.Infow("consumer disconnected", 430 + "consumer_id", id, 431 + "remote_addr", c.RemoteAddr, 432 + "user_agent", c.UserAgent, 433 + "events_sent", m.Counter.GetValue()) 434 + 435 + delete(bgs.consumers, id) 436 + } 437 + 387 438 func (bgs *BGS) EventsHandler(c echo.Context) error { 388 439 var since *int64 389 440 if sinceVal := c.QueryParam("cursor"); sinceVal != "" { ··· 394 445 since = &sval 395 446 } 396 447 397 - ctx := c.Request().Context() 448 + ctx, cancel := context.WithCancel(c.Request().Context()) 449 + defer cancel() 398 450 399 451 // TODO: authhhh 400 452 conn, err := websocket.Upgrade(c.Response(), c.Request(), c.Response().Header(), 1<<10, 1<<10) ··· 402 454 return fmt.Errorf("upgrading websocket: %w", err) 403 455 } 404 456 405 - evts, cancel, err := bgs.events.Subscribe(ctx, func(evt *events.XRPCStreamEvent) bool { return true }, since) 457 + // Start a goroutine to ping the client every 30 seconds to check if it's 458 + // still alive. If the client doesn't respond to a ping within 5 seconds, 459 + // we'll close the connection and teardown the consumer. 460 + go func() { 461 + ticker := time.NewTicker(30 * time.Second) 462 + defer ticker.Stop() 463 + 464 + for { 465 + select { 466 + case <-ticker.C: 467 + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil { 468 + log.Errorf("failed to ping client: %s", err) 469 + cancel() 470 + return 471 + } 472 + case <-ctx.Done(): 473 + return 474 + } 475 + } 476 + }() 477 + 478 + evts, cleanup, err := bgs.events.Subscribe(ctx, func(evt *events.XRPCStreamEvent) bool { return true }, since) 406 479 if err != nil { 407 480 return err 408 481 } 409 - defer cancel() 482 + defer cleanup() 483 + 484 + // Keep track of the consumer for metrics and admin endpoints 485 + consumer := SocketConsumer{ 486 + RemoteAddr: c.RealIP(), 487 + UserAgent: c.Request().UserAgent(), 488 + ConnectedAt: time.Now(), 489 + } 490 + sentCounter := eventsSentCounter.WithLabelValues(consumer.RemoteAddr, consumer.UserAgent) 491 + consumer.EventsSent = sentCounter 492 + 493 + consumerID := bgs.registerConsumer(&consumer) 494 + defer bgs.cleanupConsumer(consumerID) 495 + 496 + log.Infow("new consumer", "remote_addr", consumer.RemoteAddr, "user_agent", consumer.UserAgent) 410 497 411 498 header := events.EventHeader{Op: events.EvtKindMessage} 412 499 for { ··· 414 501 case evt := <-evts: 415 502 wc, err := conn.NextWriter(websocket.BinaryMessage) 416 503 if err != nil { 504 + log.Errorf("failed to get next writer: %s", err) 417 505 return err 418 506 } 419 507 ··· 453 541 if err := wc.Close(); err != nil { 454 542 return fmt.Errorf("failed to flush-close our event write: %w", err) 455 543 } 544 + sentCounter.Inc() 456 545 case <-ctx.Done(): 457 546 return nil 458 547 }
+5
bgs/metrics.go
··· 19 19 Name: "event_rebases", 20 20 Help: "The total number of rebase events received", 21 21 }, []string{"pds"}) 22 + 23 + var eventsSentCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 24 + Name: "events_sent_counter", 25 + Help: "The total number of events sent to consumers", 26 + }, []string{"remote_addr", "user_agent"})
+16
ts/bgs-dash/src/App.tsx
··· 13 13 import Logout from "./components/Logout/Logout"; 14 14 import Domains from "./components/Domains/Domains"; 15 15 import Repos from "./components/Repos/Repos"; 16 + import Consumers from "./components/Consumers/Consumers"; 16 17 17 18 function classNames(...classes: string[]) { 18 19 return classes.filter(Boolean).join(" "); ··· 49 50 <main> 50 51 <div className="mx-auto max-w-7xl px-2 py-6 sm:px-6 lg:px-8"> 51 52 <Dash /> 53 + </div> 54 + </main> 55 + </RequireAuth> 56 + ), 57 + requrieAuth: true, 58 + }, 59 + { 60 + path: "/consumers", 61 + name: "Consumers", 62 + element: ( 63 + <RequireAuth> 64 + <Nav /> 65 + <main> 66 + <div className="mx-auto max-w-7xl px-2 py-6 sm:px-6 lg:px-8"> 67 + <Consumers /> 52 68 </div> 53 69 </main> 54 70 </RequireAuth>
+352
ts/bgs-dash/src/components/Consumers/Consumers.tsx
··· 1 + import { ChevronDownIcon, ChevronUpIcon } from "@heroicons/react/24/solid"; 2 + import { FC, useEffect, useState } from "react"; 3 + import Notification, { 4 + NotificationMeta, 5 + NotificationType, 6 + } from "../Notification/Notification"; 7 + 8 + import { BGS_HOST } from "../../constants"; 9 + 10 + import { useNavigate } from "react-router-dom"; 11 + import { Consumer, ConsumerKey, ConsumerResponse } from "../../models/consumer"; 12 + 13 + const Consumers: FC<{}> = () => { 14 + const [consumerList, setConsumerList] = useState<Consumer[] | null>(null); 15 + const [sortField, setSortField] = useState<ConsumerKey>("ID"); 16 + const [sortOrder, setSortOrder] = useState<string>("asc"); 17 + 18 + // Notification Management 19 + const [shouldShowNotification, setShouldShowNotification] = 20 + useState<boolean>(false); 21 + const [notification, setNotification] = useState<NotificationMeta>({ 22 + message: "", 23 + alertType: "", 24 + }); 25 + 26 + const [adminToken, setAdminToken] = useState<string>( 27 + localStorage.getItem("admin_route_token") || "" 28 + ); 29 + const navigate = useNavigate(); 30 + 31 + const setAlertWithTimeout = ( 32 + type: NotificationType, 33 + message: string, 34 + dismiss: boolean 35 + ) => { 36 + setNotification({ 37 + message, 38 + alertType: type, 39 + autodismiss: dismiss, 40 + }); 41 + setShouldShowNotification(true); 42 + }; 43 + 44 + useEffect(() => { 45 + const token = localStorage.getItem("admin_route_token"); 46 + if (token) { 47 + setAdminToken(token); 48 + } else { 49 + navigate("/login"); 50 + } 51 + }, []); 52 + 53 + const refreshPDSList = () => { 54 + fetch(`${BGS_HOST}/admin/consumers/list`, { 55 + method: "GET", 56 + headers: { 57 + "Content-Type": "application/json", 58 + Authorization: `Bearer ${adminToken}`, 59 + }, 60 + }) 61 + .then((res) => res.json()) 62 + .then((res: ConsumerResponse[]) => { 63 + if ("error" in res) { 64 + setAlertWithTimeout( 65 + "failure", 66 + `Failed to fetch Consumer list: ${res.error}`, 67 + true 68 + ); 69 + return; 70 + } 71 + const list: Consumer[] = res.map((consumer) => { 72 + return { 73 + RemoteAddr: consumer.remote_addr, 74 + UserAgent: consumer.user_agent, 75 + ID: consumer.id, 76 + EventsConsumed: consumer.events_consumed, 77 + ConnectedAt: new Date(Date.parse(consumer.connected_at)), 78 + }; 79 + }); 80 + 81 + const sortedList = sortConsumerList(list); 82 + setConsumerList(sortedList); 83 + }) 84 + .catch((err) => { 85 + setAlertWithTimeout( 86 + "failure", 87 + `Failed to fetch Consumer list: ${err}`, 88 + true 89 + ); 90 + }); 91 + }; 92 + 93 + const sortConsumerList = (list: Consumer[]): Consumer[] => { 94 + const sortedConsumers: Consumer[] = [...list].sort((a, b) => { 95 + if (sortOrder === "asc") { 96 + if (a[sortField]! < b[sortField]!) { 97 + return -1; 98 + } 99 + if (a[sortField]! > b[sortField]!) { 100 + return 1; 101 + } 102 + } else { 103 + if (a[sortField]! < b[sortField]!) { 104 + return 1; 105 + } 106 + if (a[sortField]! > b[sortField]!) { 107 + return -1; 108 + } 109 + } 110 + return 0; 111 + }); 112 + return sortedConsumers; 113 + }; 114 + 115 + useEffect(() => { 116 + if (!consumerList) { 117 + return; 118 + } 119 + setConsumerList(sortConsumerList(consumerList)); 120 + }, [sortOrder, sortField]); 121 + 122 + useEffect(() => { 123 + refreshPDSList(); 124 + // Refresh stats every 10 seconds 125 + const interval = setInterval(() => { 126 + refreshPDSList(); 127 + }, 10 * 1000); 128 + 129 + return () => clearInterval(interval); 130 + }, [sortField, sortOrder]); 131 + 132 + return ( 133 + <div className="mx-auto max-w-full"> 134 + {shouldShowNotification ? ( 135 + <Notification 136 + message={notification.message} 137 + alertType={notification.alertType} 138 + subMessage={notification.subMessage} 139 + autodismiss={notification.autodismiss} 140 + unshow={() => { 141 + setShouldShowNotification(false); 142 + setNotification({ message: "", alertType: "" }); 143 + }} 144 + show={shouldShowNotification} 145 + ></Notification> 146 + ) : ( 147 + <></> 148 + )} 149 + <div className="sm:flex sm:items-center"> 150 + <div className="sm:flex-auto"> 151 + <h1 className="text-2xl font-semibold leading-6 text-gray-900"> 152 + Consumer Connections 153 + </h1> 154 + <p className="mt-2 text-sm text-gray-700"> 155 + A list of all websocket consumers actively connected to the BGS 156 + </p> 157 + </div> 158 + </div> 159 + <div className="mt-8 flow-root"> 160 + <div className="shadow ring-1 ring-black ring-opacity-5 sm:rounded-lg sm:rounded-b-none overflow-x-auto"> 161 + <table className="min-w-full divide-y divide-gray-300"> 162 + <thead className="bg-gray-50"> 163 + <tr> 164 + <th 165 + scope="col" 166 + className="py-3.5 pl-4 pr-3 text-left text-sm font-semibold text-gray-900 sm:pl-6" 167 + > 168 + <a 169 + href="#" 170 + className="group inline-flex" 171 + onClick={() => { 172 + setSortField("ID"); 173 + setSortOrder(sortOrder === "asc" ? "desc" : "asc"); 174 + }} 175 + > 176 + ID 177 + <span 178 + className={`ml-2 flex-none rounded text-gray-400 ${ 179 + sortField === "ID" 180 + ? "group-hover:bg-gray-200" 181 + : "invisible group-hover:visible group-focus:visible" 182 + }`} 183 + > 184 + {sortField === "ID" && sortOrder === "asc" ? ( 185 + <ChevronUpIcon className="h-5 w-5" aria-hidden="true" /> 186 + ) : ( 187 + <ChevronDownIcon 188 + className="h-5 w-5" 189 + aria-hidden="true" 190 + /> 191 + )} 192 + </span> 193 + </a> 194 + </th> 195 + <th 196 + scope="col" 197 + className="px-3 py-3.5 text-left text-sm font-semibold text-gray-900" 198 + > 199 + <a 200 + href="#" 201 + className="group inline-flex" 202 + onClick={() => { 203 + setSortField("RemoteAddr"); 204 + setSortOrder(sortOrder === "asc" ? "desc" : "asc"); 205 + }} 206 + > 207 + Remote Address 208 + <span 209 + className={`ml-2 flex-none rounded text-gray-400 ${ 210 + sortField === "RemoteAddr" 211 + ? "group-hover:bg-gray-200" 212 + : "invisible group-hover:visible group-focus:visible" 213 + }`} 214 + > 215 + {sortField === "RemoteAddr" && sortOrder === "asc" ? ( 216 + <ChevronUpIcon className="h-5 w-5" aria-hidden="true" /> 217 + ) : ( 218 + <ChevronDownIcon 219 + className="h-5 w-5" 220 + aria-hidden="true" 221 + /> 222 + )} 223 + </span> 224 + </a> 225 + </th> 226 + <th 227 + scope="col" 228 + className="px-3 py-3.5 text-right text-sm font-semibold text-gray-900 pr-6 whitespace-nowrap" 229 + > 230 + <a 231 + href="#" 232 + className="group inline-flex" 233 + onClick={() => { 234 + setSortField("UserAgent"); 235 + setSortOrder(sortOrder === "asc" ? "desc" : "asc"); 236 + }} 237 + > 238 + User Agent 239 + <span 240 + className={`ml-2 flex-none rounded text-gray-400 ${ 241 + sortField === "UserAgent" 242 + ? "group-hover:bg-gray-200" 243 + : "invisible group-hover:visible group-focus:visible" 244 + }`} 245 + > 246 + {sortField === "UserAgent" && sortOrder === "asc" ? ( 247 + <ChevronUpIcon className="h-5 w-5" aria-hidden="true" /> 248 + ) : ( 249 + <ChevronDownIcon 250 + className="h-5 w-5" 251 + aria-hidden="true" 252 + /> 253 + )} 254 + </span> 255 + </a> 256 + </th> 257 + <th 258 + scope="col" 259 + className="px-3 py-3.5 text-right text-sm font-semibold text-gray-900 pr-6 whitespace-nowrap" 260 + > 261 + <a 262 + href="#" 263 + className="group inline-flex" 264 + onClick={() => { 265 + setSortField("EventsConsumed"); 266 + setSortOrder(sortOrder === "asc" ? "desc" : "asc"); 267 + }} 268 + > 269 + Events Consumed 270 + <span 271 + className={`ml-2 flex-none rounded text-gray-400 ${ 272 + sortField === "EventsConsumed" 273 + ? "group-hover:bg-gray-200" 274 + : "invisible group-hover:visible group-focus:visible" 275 + }`} 276 + > 277 + {sortField === "EventsConsumed" && sortOrder === "asc" ? ( 278 + <ChevronUpIcon className="h-5 w-5" aria-hidden="true" /> 279 + ) : ( 280 + <ChevronDownIcon 281 + className="h-5 w-5" 282 + aria-hidden="true" 283 + /> 284 + )} 285 + </span> 286 + </a> 287 + </th> 288 + <th 289 + scope="col" 290 + className="px-3 py-3.5 text-right text-sm font-semibold text-gray-900 pr-6 whitespace-nowrap" 291 + > 292 + <a 293 + href="#" 294 + className="group inline-flex" 295 + onClick={() => { 296 + setSortField("ConnectedAt"); 297 + setSortOrder(sortOrder === "asc" ? "desc" : "asc"); 298 + }} 299 + > 300 + Connected At 301 + <span 302 + className={`ml-2 flex-none rounded text-gray-400 ${ 303 + sortField === "ConnectedAt" 304 + ? "group-hover:bg-gray-200" 305 + : "invisible group-hover:visible group-focus:visible" 306 + }`} 307 + > 308 + {sortField === "ConnectedAt" && sortOrder === "asc" ? ( 309 + <ChevronUpIcon className="h-5 w-5" aria-hidden="true" /> 310 + ) : ( 311 + <ChevronDownIcon 312 + className="h-5 w-5" 313 + aria-hidden="true" 314 + /> 315 + )} 316 + </span> 317 + </a> 318 + </th> 319 + </tr> 320 + </thead> 321 + <tbody className="divide-y divide-gray-200 bg-white"> 322 + {consumerList && 323 + consumerList.map((consumer) => { 324 + return ( 325 + <tr key={consumer.ID}> 326 + <td className="whitespace-nowrap py-4 pl-4 pr-3 text-sm font-medium text-gray-900 sm:pl-6 text-left"> 327 + {consumer.ID} 328 + </td> 329 + <td className="whitespace-nowrap px-3 py-4 text-sm text-gray-500 text-left"> 330 + {consumer.RemoteAddr} 331 + </td> 332 + <td className="whitespace-nowrap px-3 py-2 text-sm text-gray-400 w-8 pr-6"> 333 + {consumer.UserAgent} 334 + </td> 335 + <td className="whitespace-nowrap px-3 py-2 text-sm text-gray-400 w-8 pr-6"> 336 + {consumer.EventsConsumed?.toLocaleString()} 337 + </td> 338 + <td className="whitespace-nowrap px-3 py-2 text-sm text-gray-400 text-center w-8 pr-6"> 339 + {consumer.ConnectedAt.toLocaleString()} 340 + </td> 341 + </tr> 342 + ); 343 + })} 344 + </tbody> 345 + </table> 346 + </div> 347 + </div> 348 + </div> 349 + ); 350 + }; 351 + 352 + export default Consumers;
+19
ts/bgs-dash/src/models/consumer.ts
··· 1 + interface Consumer { 2 + RemoteAddr: string; 3 + UserAgent: string; 4 + EventsConsumed: number; 5 + ConnectedAt: Date; 6 + ID: number; 7 + } 8 + 9 + interface ConsumerResponse { 10 + id: number; 11 + remote_addr: string; 12 + user_agent: string; 13 + events_consumed: number; 14 + connected_at: string; 15 + } 16 + 17 + type ConsumerKey = keyof Consumer; 18 + 19 + export type { Consumer, ConsumerResponse, ConsumerKey };
ts/bgs-dash/src/models/pds.tsx ts/bgs-dash/src/models/pds.ts