this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

admin crawl command (start PDS crawls from text or csv list of hosts) dau fix

authored by

Brian Olson and committed by
Brian Olson
2a3ff8e1 e2fb048a

+166 -20
+140 -1
cmd/collectiondir/collectiondir.go
··· 1 1 package main 2 2 3 3 import ( 4 + "bufio" 5 + "bytes" 4 6 "compress/gzip" 5 7 "encoding/csv" 6 8 "encoding/json" ··· 10 12 "github.com/urfave/cli/v2" 11 13 "io" 12 14 "log/slog" 15 + "net/http" 16 + "net/url" 13 17 "os" 14 18 "strconv" 15 19 "strings" 20 + "time" 16 21 ) 17 22 18 23 func main() { ··· 27 32 }, 28 33 Commands: []*cli.Command{ 29 34 serveCmd, 30 - crawlCmd, 35 + offlineCrawlCmd, 31 36 buildCmd, 32 37 statsCmd, 33 38 exportCmd, 39 + adminCrawlCmd, 34 40 }, 35 41 } 36 42 err := app.Run(os.Args) ··· 207 213 return nil 208 214 }, 209 215 } 216 + 217 + var adminCrawlCmd = &cli.Command{ 218 + Name: "crawl", 219 + Usage: "admin service to crawl one or more PDSes", 220 + Flags: []cli.Flag{ 221 + &cli.StringFlag{ 222 + Name: "csv", 223 + Usage: "path to load csv from, use column 'host' or 'hostname'", 224 + }, 225 + &cli.StringFlag{ 226 + Name: "list", 227 + Usage: "path to load hostname list from, one per line", 228 + }, 229 + &cli.StringFlag{ 230 + Name: "url", 231 + Usage: "host:port of collectiondir server", 232 + Required: true, 233 + EnvVars: []string{"COLLECTIONDIR_URL"}, 234 + }, 235 + &cli.StringFlag{ 236 + Name: "auth", 237 + Usage: "Auth token for admin api", 238 + EnvVars: []string{"ADMIN_AUTH"}, 239 + }, 240 + }, 241 + Action: func(cctx *cli.Context) error { 242 + logLevel := slog.LevelInfo 243 + if cctx.Bool("verbose") { 244 + logLevel = slog.LevelDebug 245 + } 246 + log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) 247 + slog.SetDefault(log) 248 + 249 + var hostList []string 250 + 251 + serverUrl, err := url.Parse(cctx.String("url")) 252 + if err != nil { 253 + var e2 error 254 + // try to fixup a bare host:port which can confuse url.Parse 255 + serverUrl, e2 = url.Parse("http://" + cctx.String("url")) 256 + if e2 != nil { 257 + return fmt.Errorf("could not parse url, %w", err) 258 + } 259 + } 260 + requestCrawlUrl := serverUrl.JoinPath("/admin/pds/requestCrawl") 261 + 262 + if cctx.IsSet("list") { 263 + fin, err := os.Open(cctx.String("list")) 264 + if err != nil { 265 + return fmt.Errorf("%s: could not open, %w", cctx.String("list"), err) 266 + } 267 + defer fin.Close() 268 + bufin := bufio.NewScanner(fin) 269 + for bufin.Scan() { 270 + hostList = append(hostList, bufin.Text()) 271 + } 272 + err = bufin.Err() 273 + if err != nil { 274 + return fmt.Errorf("%s: error reading, %w", cctx.String("list"), err) 275 + } 276 + } else if cctx.IsSet("csv") { 277 + fin, err := os.Open(cctx.String("csv")) 278 + if err != nil { 279 + return fmt.Errorf("%s: could not open, %w", cctx.String("csv"), err) 280 + } 281 + defer fin.Close() 282 + data, err := csv.NewReader(fin).ReadAll() 283 + if err != nil { 284 + return fmt.Errorf("%s: could not read, %w", cctx.String("csv"), err) 285 + } 286 + if len(data) < 2 { 287 + return fmt.Errorf("%s: empty CSV file", cctx.String("csv")) 288 + } 289 + headerRow := data[0] 290 + hostCol := -1 291 + for i, v := range headerRow { 292 + v = strings.ToLower(v) 293 + if v == "host" || v == "hostname" { 294 + hostCol = i 295 + break 296 + } 297 + } 298 + if hostCol < 0 { 299 + return fmt.Errorf("%s: header missing 'host' or 'hostname'", cctx.String("csv")) 300 + } 301 + for _, row := range data[1:] { 302 + hostList = append(hostList, row[hostCol]) 303 + } 304 + } 305 + 306 + if len(hostList) == 0 { 307 + fmt.Println("no hosts") 308 + } 309 + 310 + client := http.Client{Timeout: 1 * time.Second} 311 + var headers http.Header = make(http.Header) 312 + if cctx.IsSet("auth") { 313 + headers.Add("Authorization", "Bearer "+cctx.String("auth")) 314 + } 315 + headers.Add("Content-Type", "application/json") 316 + var response *http.Response 317 + postReqeust := CrawlRequest{ 318 + Hosts: hostList, 319 + } 320 + reqBlob, err := json.Marshal(postReqeust) 321 + reqReader := bytes.NewReader(reqBlob) 322 + 323 + for try := 0; try < 3; try++ { 324 + req, err := http.NewRequest("POST", requestCrawlUrl.String(), reqReader) 325 + if err != nil { 326 + return fmt.Errorf("could not create request, %w", err) 327 + } 328 + req.Header = headers 329 + response, err = client.Do(req) 330 + if err == nil && response.StatusCode == 200 { 331 + break 332 + } else { 333 + log.Info("http err", "err", err, "status", response.StatusCode) 334 + if try < 2 { 335 + time.Sleep(time.Duration(try+1) * 2 * time.Second) 336 + } 337 + } 338 + } 339 + if err != nil { 340 + return fmt.Errorf("POST %s err %w", requestCrawlUrl.String(), err) 341 + } 342 + if response.StatusCode != http.StatusOK { 343 + return fmt.Errorf("POST %s err %s", requestCrawlUrl.String(), response.Status) 344 + } 345 + 346 + return nil 347 + }, 348 + }
+10 -4
cmd/collectiondir/crawl.go
··· 10 10 "net/http" 11 11 "net/url" 12 12 "os" 13 + "strings" 13 14 "sync/atomic" 14 15 15 16 "github.com/urfave/cli/v2" ··· 34 35 } 35 36 } 36 37 37 - var crawlCmd = &cli.Command{ 38 - Name: "crawl", 39 - Usage: "crawl a PDS", 38 + var offlineCrawlCmd = &cli.Command{ 39 + Name: "offline_crawl", 40 + Usage: "crawl a PDS to csv out", 40 41 Flags: []cli.Flag{ 41 42 &cli.StringFlag{ 42 43 Name: "host", ··· 141 142 limiter.Wait(cr.Ctx) 142 143 desc, err := atproto.RepoDescribeRepo(cr.Ctx, cr.RpcClient, xr.Did) 143 144 if err != nil { 144 - slog.Error("repo desc", "host", cr.RpcClient.Host, "did", xr.Did, "err", err) 145 + erst := err.Error() 146 + if strings.Contains(erst, "RepoDeactivated") || strings.Contains(erst, "RepoTakendown") { 147 + slog.Info("repo unavail", "host", cr.RpcClient.Host, "did", xr.Did, "err", err) 148 + } else { 149 + slog.Warn("repo desc", "host", cr.RpcClient.Host, "did", xr.Did, "err", err) 150 + } 145 151 continue 146 152 } 147 153 for _, collection := range desc.Collections {
+16 -15
cmd/collectiondir/serve.go
··· 276 276 cs.dauDirectory = daud 277 277 cs.dauDirectoryPath = fpath 278 278 cs.dauDay = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) 279 - cs.dauTomorrow = now.AddDate(0, 0, 1) 279 + cs.dauTomorrow = cs.dauDay.AddDate(0, 0, 1) 280 280 cs.log.Info("DAU db opened", "path", fpath) 281 281 return nil 282 282 } ··· 738 738 return nil 739 739 } 740 740 741 + func (cs *collectionServer) maybeDauWrite(didc DidCollection) error { 742 + now := time.Now() 743 + if now.After(cs.dauTomorrow) { 744 + go dauStats(cs.dauDirectory, cs.dauDay, cs.dauDirectoryDir, cs.log) 745 + cs.dauDirectory = nil 746 + err := cs.openDau() 747 + if err != nil { 748 + return fmt.Errorf("dau reopen, %w", err) 749 + } 750 + } 751 + return cs.dauDirectory.MaybeSetCollection(didc.Did, didc.Collection) 752 + } 753 + 741 754 // write {dauDirectoryDir}/d{YYYY-MM-DD}.pebble stats summary to {dauDirectoryDir}/d{YYYY-MM-DD}.csv.gz 742 755 func dauStats(oldDau *PebbleCollectionDirectory, dauDay time.Time, dauDir string, log *slog.Logger) { 743 756 fname := fmt.Sprintf("d%s.csv.gz", dauDay.Format("2006-01-02")) ··· 765 778 } 766 779 defer fout.Close() 767 780 gzout := gzip.NewWriter(fout) 781 + defer gzout.Close() 768 782 csvout := csv.NewWriter(gzout) 769 783 defer csvout.Flush() 770 - defer gzout.Close() 771 784 err = csvout.Write([]string{"collection", "count"}) 772 785 if err != nil { 773 786 log.Error("DAU stats header", "err", err) ··· 788 801 log.Info("DAU stats ok", "rows", rowcount) 789 802 } 790 803 791 - func (cs *collectionServer) maybeDauWrite(didc DidCollection) error { 792 - now := time.Now() 793 - if now.After(cs.dauTomorrow) { 794 - go dauStats(cs.dauDirectory, cs.dauDay, cs.dauDirectoryDir, cs.log) 795 - cs.dauDirectory = nil 796 - err := cs.openDau() 797 - if err != nil { 798 - return fmt.Errorf("dau reopen, %w", err) 799 - } 800 - } 801 - return cs.dauDirectory.MaybeSetCollection(didc.Did, didc.Collection) 802 - } 803 - 804 804 type CrawlRequest struct { 805 805 Host string `json:"hostname,omitempty"` 806 806 Hosts []string `json:"hosts,omitempty"` ··· 859 859 var req CrawlRequest 860 860 err := c.Bind(&req) 861 861 if err != nil { 862 + cs.log.Info("bad crawl bind", "err", err) 862 863 return c.String(http.StatusBadRequest, err.Error()) 863 864 } 864 865 if req.Host != "" {