this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

labelmaker: initial pass at XRPC proxying

+74 -26
+15 -1
cmd/labelmaker/main.go
··· 103 103 EnvVars: []string{"LABELMAKER_BIND"}, 104 104 }, 105 105 &cli.StringFlag{ 106 + Name: "xrpc-proxy-url", 107 + Usage: "backend URL to proxy (some) XRPC requests to", 108 + Value: "http://localhost:2583", 109 + EnvVars: []string{"ATP_XRPC_PROXY_URL"}, 110 + }, 111 + &cli.StringFlag{ 112 + Name: "xrpc-proxy-admin-password", 113 + Usage: "admin auth password for XRPC proxy requests", 114 + Value: "admin", 115 + EnvVars: []string{"ATP_XRPC_PROXY_ADMIN_PASSWORD"}, 116 + }, 117 + &cli.StringFlag{ 106 118 Name: "keyword-file", 107 119 Usage: "keyword filter config, as JSON file", 108 120 EnvVars: []string{"LABELMAKER_KEYWORD_FILE"}, ··· 181 193 repoHandle := cctx.String("repo-handle") 182 194 signingSecretKeyJwk := cctx.String("signing-secret-key-jwk") 183 195 bind := cctx.String("bind") 196 + xrpcProxyURL := cctx.String("xrpc-proxy-url") 197 + xrpcProxyAdminPassword := cctx.String("xrpc-proxy-admin-password") 184 198 microNSFWImgURL := cctx.String("micro-nsfw-img-url") 185 199 hiveAIToken := cctx.String("hiveai-api-token") 186 200 sqrlURL := cctx.String("sqrl-url") ··· 205 219 UserId: 1, 206 220 } 207 221 208 - srv, err := labeling.NewServer(db, cstore, repoUser, plcURL, blobPdsURL, useWss) 222 + srv, err := labeling.NewServer(db, cstore, repoUser, plcURL, blobPdsURL, xrpcProxyURL, xrpcProxyAdminPassword, useWss) 209 223 if err != nil { 210 224 return err 211 225 }
+37 -20
labeling/service.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 + "encoding/base64" 6 7 "fmt" 7 8 "io" 8 9 "net/http" 10 + "net/url" 9 11 "strings" 10 12 "time" 11 13 ··· 37 39 var log = logging.Logger("labelmaker") 38 40 39 41 type Server struct { 40 - db *gorm.DB 41 - cs *carstore.CarStore 42 - repoman *repomgr.RepoManager 43 - bgsSlurper *bgs.Slurper 44 - evtmgr *events.EventManager 45 - echo *echo.Echo 46 - user *RepoConfig 47 - blobPdsURL string 48 - kwLabelers []KeywordLabeler 49 - muNSFWImgLabeler *MicroNSFWImgLabeler 50 - hiveAILabeler *HiveAILabeler 51 - sqrlLabeler *SQRLLabeler 42 + db *gorm.DB 43 + cs *carstore.CarStore 44 + repoman *repomgr.RepoManager 45 + bgsSlurper *bgs.Slurper 46 + evtmgr *events.EventManager 47 + echo *echo.Echo 48 + user *RepoConfig 49 + blobPdsURL string 50 + xrpcProxyURL *url.URL 51 + xrpcProxyAuthHeader string 52 + kwLabelers []KeywordLabeler 53 + muNSFWImgLabeler *MicroNSFWImgLabeler 54 + hiveAILabeler *HiveAILabeler 55 + sqrlLabeler *SQRLLabeler 52 56 } 53 57 54 58 type RepoConfig struct { ··· 60 64 61 65 // In addition to configuring the service, will connect to upstream BGS and start processing events. Won't handle HTTP or WebSocket endpoints until RunAPI() is called. 62 66 // 'useWss' is a flag to use SSL for outbound WebSocket connections 63 - func NewServer(db *gorm.DB, cs *carstore.CarStore, repoUser RepoConfig, plcURL, blobPdsURL string, useWss bool) (*Server, error) { 67 + func NewServer(db *gorm.DB, cs *carstore.CarStore, repoUser RepoConfig, plcURL, blobPdsURL, xrpcProxyURL, xrpcProxyAdminPassword string, useWss bool) (*Server, error) { 64 68 65 69 db.AutoMigrate(models.PDS{}) 66 70 db.AutoMigrate(models.Label{}) ··· 72 76 kmgr := indexer.NewKeyManager(didr, repoUser.SigningKey) 73 77 evtmgr := events.NewEventManager(events.NewMemPersister()) 74 78 repoman := repomgr.NewRepoManager(db, cs, kmgr) 79 + 80 + proxyURL, err := url.ParseRequestURI(xrpcProxyURL) 81 + if err != nil { 82 + return nil, fmt.Errorf("could not parse XRPC proxy URL (%v): %v", xrpcProxyURL, err) 83 + } 84 + xrpcProxyAuthHeader := "Basic " + base64.StdEncoding.EncodeToString([]byte("admin:"+xrpcProxyAdminPassword)) 75 85 76 86 s := &Server{ 77 - db: db, 78 - repoman: repoman, 79 - evtmgr: evtmgr, 80 - user: &repoUser, 81 - blobPdsURL: blobPdsURL, 87 + db: db, 88 + repoman: repoman, 89 + evtmgr: evtmgr, 90 + user: &repoUser, 91 + blobPdsURL: blobPdsURL, 92 + xrpcProxyURL: proxyURL, 93 + xrpcProxyAuthHeader: xrpcProxyAuthHeader, 82 94 // sluper configured below 83 95 } 84 96 ··· 404 416 ctx.Response().WriteHeader(500) 405 417 } 406 418 407 - s.RegisterHandlersComAtproto(e) 408 - // TODO(bnewbold): this is a speculative endpoint name 419 + if err := s.RegisterHandlersComAtproto(e); err != nil { 420 + return err 421 + } 422 + if err := s.RegisterProxyHandlers(e); err != nil { 423 + return err 424 + } 425 + // single websocket endpoint 409 426 e.GET("/xrpc/com.atproto.label.subscribeLabels", s.EventsLabelsWebsocket) 410 427 411 428 log.Infof("starting labelmaker XRPC and WebSocket daemon at: %s", listen)
+3 -1
labeling/service_test.go
··· 39 39 40 40 plcURL := "http://did-plc-test.dummy" 41 41 blobPdsURL := "http://pds-test.dummy" 42 + xrpcProxyURL := "http://pds-test.dummy" 43 + xrpcProxyAdminPassword := "dummy-password" 42 44 repoUser := RepoConfig{ 43 45 Handle: "test.handle.dummy", 44 46 Did: "did:plc:testdummy", ··· 46 48 UserId: 1, 47 49 } 48 50 49 - lm, err := NewServer(db, cs, repoUser, plcURL, blobPdsURL, false) 51 + lm, err := NewServer(db, cs, repoUser, plcURL, blobPdsURL, xrpcProxyURL, xrpcProxyAdminPassword, false) 50 52 if err != nil { 51 53 t.Fatal(err) 52 54 }
+19 -4
labeling/xrpc_endpoints.go
··· 2 2 3 3 import ( 4 4 "io" 5 + "net/http/httputil" 5 6 "strconv" 6 7 7 8 atproto "github.com/bluesky-social/indigo/api/atproto" ··· 30 31 e.POST("/xrpc/com.atproto.admin.reverseModerationAction", s.HandleComAtprotoAdminReverseModerationAction) 31 32 e.POST("/xrpc/com.atproto.admin.takeModerationAction", s.HandleComAtprotoAdminTakeModerationAction) 32 33 e.POST("/xrpc/com.atproto.report.create", s.HandleComAtprotoReportCreate) 33 - // TODO: proxy the rest to BGS? 34 - //e.GET("/xrpc/com.atproto.admin.getRecord", s.HandleComAtprotoAdminGetRecord) 35 - //e.GET("/xrpc/com.atproto.admin.getRepo", s.HandleComAtprotoAdminGetRepo) 36 - //e.GET("/xrpc/com.atproto.admin.searchRepos", s.HandleComAtprotoAdminSearchRepos) 34 + 35 + return nil 36 + } 37 + 38 + func (s *Server) rewriteProxyRequestAdmin(r *httputil.ProxyRequest) { 39 + r.SetXForwarded() 40 + r.SetURL(s.xrpcProxyURL) 41 + r.Out.Header.Set("Authorization", s.xrpcProxyAuthHeader) 42 + } 43 + 44 + func (s *Server) RegisterProxyHandlers(e *echo.Echo) error { 45 + 46 + rp := &httputil.ReverseProxy{Rewrite: s.rewriteProxyRequestAdmin} 47 + 48 + // Proxy some admin requests 49 + e.GET("/xrpc/com.atproto.admin.getRecord", echo.WrapHandler(rp)) 50 + e.GET("/xrpc/com.atproto.admin.getRepo", echo.WrapHandler(rp)) 51 + e.GET("/xrpc/com.atproto.admin.searchRepos", echo.WrapHandler(rp)) 37 52 38 53 return nil 39 54 }