this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

requestCrawl forwarding overhaul each forwarding recipient has a processing thread and buffering chan adds API: POST /v1/requestRequestCrawl body={"url":""}

+201 -37
+13 -5
cmd/rainbow/main.go
··· 93 93 Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl, comma separated list", 94 94 EnvVars: []string{"RELAY_NEXT_CRAWLER"}, 95 95 }, 96 + &cli.IntFlag{ 97 + Name: "max-request-crawl-errors", 98 + Usage: "maximum number of errors a requestCrawl forwarding target may fail before we give up", 99 + EnvVars: []string{"RAINBOW_RC_ERR_LIMIT"}, 100 + Value: 10, 101 + }, 96 102 } 97 103 98 104 // TODO: slog.SetDefault and set module `var log *slog.Logger` based on flags and env ··· 161 167 MaxBytes: uint64(cctx.Int64("persist-bytes")), 162 168 } 163 169 conf := splitter.SplitterConfig{ 164 - UpstreamHost: upstreamHost, 165 - CursorFile: cctx.String("cursor-file"), 166 - PebbleOptions: &ppopts, 170 + UpstreamHost: upstreamHost, 171 + CursorFile: cctx.String("cursor-file"), 172 + PebbleOptions: &ppopts, 173 + MaxRequestCrawlForwardErrors: cctx.Int("max-request-crawl-errors"), 167 174 } 168 175 spl, err = splitter.NewSplitter(conf, nextCrawlers) 169 176 } else { 170 177 log.Info("building in-memory splitter") 171 178 conf := splitter.SplitterConfig{ 172 - UpstreamHost: upstreamHost, 173 - CursorFile: cctx.String("cursor-file"), 179 + UpstreamHost: upstreamHost, 180 + CursorFile: cctx.String("cursor-file"), 181 + MaxRequestCrawlForwardErrors: cctx.Int("max-request-crawl-errors"), 174 182 } 175 183 spl, err = splitter.NewSplitter(conf, nextCrawlers) 176 184 }
+188 -32
splitter/splitter.go
··· 40 40 pp *pebblepersist.PebblePersist 41 41 events *events.EventManager 42 42 43 + ctx context.Context 44 + ctxCancel context.CancelFunc 45 + 43 46 // Management of Socket Consumers 44 47 consumersLk sync.RWMutex 45 48 nextConsumerID uint64 ··· 49 52 50 53 log *slog.Logger 51 54 52 - httpC *http.Client 53 - nextCrawlers []*url.URL 55 + // for nextCrawlers and requestCrawlClients 56 + forwardingLock sync.Mutex 57 + 58 + clientWg sync.WaitGroup 59 + 60 + httpC *http.Client 61 + 62 + // requestCrawlClients subscribed by /v1/requestRequestCrawl 63 + // they are subject to backoff, or being forgotten if they sufficiently fail forwarded messages and health checks. 64 + // they can, like requestCrawl, always be renewed by a call to /v1/requestRequestCrawl 65 + requestCrawlClients []*rcClient 54 66 } 55 67 56 68 type SplitterConfig struct { 57 69 UpstreamHost string 58 70 CursorFile string 59 71 PebbleOptions *pebblepersist.PebblePersistOptions 72 + 73 + MaxRequestCrawlForwardErrors int 60 74 } 61 75 62 76 func (sc *SplitterConfig) XrpcRootUrl() string { ··· 111 125 return nil, fmt.Errorf("failed to parse upstream url %#v: %w", conf.UpstreamUrl(), err) 112 126 } 113 127 128 + ctx, cancel := context.WithCancel(context.Background()) 129 + 114 130 s := &Splitter{ 115 - conf: conf, 116 - consumers: make(map[uint64]*SocketConsumer), 117 - log: log, 118 - httpC: util.RobustHTTPClient(), 119 - nextCrawlers: nextCrawlerURLs, 131 + conf: conf, 132 + consumers: make(map[uint64]*SocketConsumer), 133 + log: log, 134 + httpC: util.RobustHTTPClient(), 135 + ctx: ctx, 136 + ctxCancel: cancel, 137 + } 138 + 139 + for _, nextCrawlerURL := range nextCrawlers { 140 + err = s.newRCClient(nextCrawlerURL, true) 141 + if err != nil { 142 + return nil, err 143 + } 120 144 } 121 145 122 146 if conf.PebbleOptions == nil { ··· 189 213 } 190 214 191 215 func (s *Splitter) Shutdown() error { 216 + s.ctxCancel() 192 217 return nil 193 218 } 194 219 ··· 251 276 e.GET("/xrpc/_health", s.HandleHealthCheck) 252 277 e.GET("/_health", s.HandleHealthCheck) 253 278 e.GET("/", s.HandleHomeMessage) 279 + e.POST("/v1/requestRequestCrawl", s.RequestRequestCrawlHandler) 254 280 255 281 // In order to support booting on random ports in tests, we need to tell the 256 282 // Echo instance it's already got a port, and then use its StartServer ··· 288 314 Message string `json:"message"` 289 315 } 290 316 317 + // POST /xrpc/com.atproto.sync.requestCrawl 318 + // and then forward that on to other services that want to receive requestCrawl data 291 319 func (s *Splitter) RequestCrawlHandler(c echo.Context) error { 292 320 ctx := c.Request().Context() 293 321 var body comatproto.SyncRequestCrawl_Input ··· 338 366 // Maybe we could do something with this response later 339 367 _ = desc 340 368 341 - if len(s.nextCrawlers) != 0 { 342 - blob, err := json.Marshal(body) 343 - if err != nil { 344 - s.log.Warn("could not forward requestCrawl, json err", "err", err) 345 - } else { 346 - go func(bodyBlob []byte) { 347 - for _, remote := range s.nextCrawlers { 348 - if remote == nil { 349 - continue 350 - } 369 + bodyBlob, err := json.Marshal(body) 370 + if err != nil { 371 + s.log.Warn("could not forward requestCrawl, json err", "err", err) 372 + bodyBlob = nil 373 + } 351 374 352 - pu := remote.JoinPath("/xrpc/com.atproto.sync.requestCrawl") 353 - response, err := s.httpC.Post(pu.String(), "application/json", bytes.NewReader(bodyBlob)) 354 - if response != nil && response.Body != nil { 355 - response.Body.Close() 356 - } 357 - if err != nil || response == nil { 358 - s.log.Warn("requestCrawl forward failed", "host", remote, "err", err) 359 - } else if response.StatusCode != http.StatusOK { 360 - s.log.Warn("requestCrawl forward failed", "host", remote, "status", response.Status) 361 - } else { 362 - s.log.Info("requestCrawl forward successful", "host", remote) 363 - } 364 - } 365 - }(blob) 375 + s.forwardingLock.Lock() 376 + defer s.forwardingLock.Unlock() 377 + if bodyBlob != nil && len(s.requestCrawlClients) != 0 { 378 + for _, dest := range s.requestCrawlClients { 379 + select { 380 + case dest.postBodies <- bodyBlob: 381 + default: 382 + s.log.Warn("requestCrawl fell behind, giving up", "url", dest.requestCrawlUrl) 383 + dest.cancel() 384 + } 366 385 } 367 386 } 368 387 369 388 return c.JSON(200, HealthStatus{Status: "ok"}) 389 + } 390 + 391 + type MessageResponse struct { 392 + Message string `json:"message"` 393 + } 394 + 395 + type RequestRequestCrawlRequest struct { 396 + Url string `json:"url"` 397 + } 398 + 399 + // POST /v1/requestRequestCrawl body={"url":""} 400 + // URL path will be replaced with /xrpc/com.atproto.sync.requestCrawl to receieve forwarded requestCrawl messages 401 + func (s *Splitter) RequestRequestCrawlHandler(c echo.Context) error { 402 + var requestBody RequestRequestCrawlRequest 403 + err := c.Bind(&requestBody) 404 + if err != nil { 405 + s.log.Info("rrc bad body", "err", err) 406 + return c.JSON(http.StatusBadRequest, MessageResponse{Message: "failed to unpack json struct"}) 407 + } 408 + err = s.newRCClient(requestBody.Url, false) 409 + if err != nil { 410 + s.log.Info("rrc bad url", "err", err) 411 + return c.JSON(http.StatusBadRequest, MessageResponse{Message: "bad url, " + err.Error()}) 412 + } 413 + return c.JSON(http.StatusOK, MessageResponse{Message: "ok"}) 370 414 } 371 415 372 416 func (s *Splitter) HandleComAtprotoSyncListRepos(c echo.Context) error { ··· 411 455 ctx, cancel := context.WithCancel(c.Request().Context()) 412 456 defer cancel() 413 457 414 - // TODO: authhhh 415 458 conn, err := websocket.Upgrade(c.Response(), c.Request(), c.Response().Header(), 10<<10, 10<<10) 416 459 if err != nil { 417 460 return fmt.Errorf("upgrading websocket: %w", err) ··· 703 746 func (s *Splitter) writeCursor(curs int64) error { 704 747 return os.WriteFile(s.conf.CursorFile, []byte(fmt.Sprint(curs)), 0664) 705 748 } 749 + 750 + func (s *Splitter) unregisterRCClient(rcc *rcClient) { 751 + s.forwardingLock.Lock() 752 + defer s.forwardingLock.Unlock() 753 + 754 + for i, fwd := range s.requestCrawlClients { 755 + if fwd == rcc { 756 + lasti := len(s.requestCrawlClients) - 1 757 + if i < lasti { 758 + s.requestCrawlClients[i] = s.requestCrawlClients[lasti] 759 + } 760 + s.requestCrawlClients = s.requestCrawlClients[:lasti] 761 + } 762 + } 763 + } 764 + 765 + var ErrDuplicateClient = errors.New("duplicate client") 766 + 767 + func (s *Splitter) newRCClient(baseUrl string, permanent bool) error { 768 + xu, err := url.Parse(baseUrl) 769 + if err != nil { 770 + return fmt.Errorf("bad base url, %w", err) 771 + } 772 + ctx, cancel := context.WithCancel(s.ctx) 773 + rcc := &rcClient{ 774 + requestCrawlUrl: xu.JoinPath("/xrpc/com.atproto.sync.requestCrawl").String(), 775 + postBodies: make(chan []byte, 100), 776 + log: s.log, 777 + splitter: s, 778 + permanent: permanent, 779 + ctx: ctx, 780 + cancel: cancel, 781 + } 782 + s.forwardingLock.Lock() 783 + defer s.forwardingLock.Unlock() 784 + for _, prevc := range s.requestCrawlClients { 785 + if prevc.requestCrawlUrl == rcc.requestCrawlUrl { 786 + return ErrDuplicateClient 787 + } 788 + } 789 + s.requestCrawlClients = append(s.requestCrawlClients, rcc) 790 + go rcc.forwarderThread() 791 + return nil 792 + } 793 + 794 + // a recipeient which subscribed by /v1/requestRequestCrawl 795 + type rcClient struct { 796 + firstSeen time.Time 797 + lastGood time.Time 798 + errCount int 799 + 800 + requestCrawlUrl string // e.g. http://host:port/xrpc/com.atproto.sync.requestCrawl 801 + 802 + postBodies chan []byte 803 + 804 + client http.Client 805 + 806 + log *slog.Logger 807 + 808 + splitter *Splitter 809 + 810 + ctx context.Context 811 + cancel context.CancelFunc 812 + 813 + // never stop trying retry (configured at startup) 814 + permanent bool 815 + } 816 + 817 + func (rcc *rcClient) forwarderThread() { 818 + defer rcc.splitter.unregisterRCClient(rcc) 819 + defer rcc.splitter.clientWg.Done() 820 + done := rcc.ctx.Done() 821 + 822 + maxErrCount := rcc.splitter.conf.MaxRequestCrawlForwardErrors 823 + 824 + for { 825 + var nextBody []byte 826 + var sourceOk bool 827 + select { 828 + case <-done: 829 + return 830 + case nextBody, sourceOk = <-rcc.postBodies: 831 + if !sourceOk { 832 + return 833 + } 834 + } 835 + 836 + response, err := rcc.client.Post(rcc.requestCrawlUrl, "application/json", bytes.NewReader(nextBody)) 837 + if response != nil && response.Body != nil { 838 + _ = response.Body.Close() 839 + } 840 + if err != nil || response == nil { 841 + rcc.log.Warn("requestCrawl forward failed", "url", rcc.requestCrawlUrl, "err", err) 842 + // TODO: metric 843 + rcc.errCount++ 844 + } else if response.StatusCode != http.StatusOK { 845 + rcc.log.Warn("requestCrawl forward failed", "url", rcc.requestCrawlUrl, "status", response.Status) 846 + // TODO: metric 847 + rcc.errCount++ 848 + } else { 849 + rcc.log.Debug("requestCrawl forward successful", "url", rcc.requestCrawlUrl) 850 + // TODO: metric 851 + rcc.lastGood = time.Now() 852 + rcc.errCount = 0 853 + } 854 + if rcc.permanent { 855 + // don't check errCount, always keep trying 856 + } else if rcc.errCount > maxErrCount { 857 + rcc.log.Error("too many errors, giving up on destination", "url", rcc.requestCrawlUrl) 858 + return 859 + } 860 + } 861 + }