this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

export command; dau logging; comments

authored by

Brian Olson and committed by
Brian Olson
f0433c99 29bad144

+80 -13
+60 -2
cmd/collectiondir/collectiondir.go
··· 11 11 "io" 12 12 "log/slog" 13 13 "os" 14 + "strconv" 14 15 "strings" 15 16 ) 16 17 ··· 29 30 crawlCmd, 30 31 buildCmd, 31 32 statsCmd, 33 + exportCmd, 32 34 }, 33 35 } 34 36 err := app.Run(os.Args) ··· 39 41 } 40 42 41 43 var statsCmd = &cli.Command{ 42 - Name: "stats", 44 + Name: "stats", 45 + Usage: "read stats from a pebble db", 43 46 Flags: []cli.Flag{ 44 47 &cli.StringFlag{ 45 48 Name: "pebble", 46 - Usage: "path to store pebble db", 49 + Usage: "path to pebble db", 47 50 Required: true, 48 51 }, 49 52 }, ··· 70 73 blob, err := json.MarshalIndent(stats, "", " ") 71 74 os.Stdout.Write(blob) 72 75 os.Stdout.Write([]byte{'\n'}) 76 + return nil 77 + }, 78 + } 79 + 80 + var exportCmd = &cli.Command{ 81 + Name: "export", 82 + Usage: "export a pebble db to CSV on stdout", 83 + Flags: []cli.Flag{ 84 + &cli.StringFlag{ 85 + Name: "pebble", 86 + Usage: "path to pebble db", 87 + Required: true, 88 + }, 89 + }, 90 + Action: func(cctx *cli.Context) error { 91 + logLevel := slog.LevelInfo 92 + if cctx.Bool("verbose") { 93 + logLevel = slog.LevelDebug 94 + } 95 + log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) 96 + slog.SetDefault(log) 97 + pebblePath := cctx.String("pebble") 98 + var db PebbleCollectionDirectory 99 + db.log = log 100 + err := db.Open(pebblePath) 101 + if err != nil { 102 + return err 103 + } 104 + defer db.Close() 105 + 106 + rows := make(chan CollectionDidTime, 100) 107 + go func() { 108 + err := db.ReadAllPrimary(cctx.Context, rows) 109 + if err != nil { 110 + log.Error("db read", "path", pebblePath, "err", err) 111 + } 112 + }() 113 + 114 + writer := csv.NewWriter(os.Stdout) 115 + defer writer.Flush() 116 + err = writer.Write([]string{"did", "collection", "millis"}) 117 + if err != nil { 118 + log.Error("csv write header", "err", err) 119 + } 120 + var row [3]string 121 + for rowi := range rows { 122 + row[0] = rowi.Did 123 + row[1] = rowi.Collection 124 + row[2] = strconv.FormatInt(rowi.UnixMillis, 10) 125 + err = writer.Write(row[:]) 126 + if err != nil { 127 + log.Error("csv write row", "err", err) 128 + } 129 + } 130 + 73 131 return nil 74 132 }, 75 133 }
+20 -11
cmd/collectiondir/serve.go
··· 225 225 cs.dauDirectoryPath = fpath 226 226 cs.dauDay = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) 227 227 cs.dauTomorrow = now.AddDate(0, 0, 1) 228 + cs.log.Info("DAU db opened", "path", fpath) 228 229 return nil 229 230 } 230 231 ··· 406 407 return lv 407 408 } 408 409 409 - // /v1/getDidsForCollection?collection={}&cursor={} 410 + // /v1/getDidsForCollection?collection={}&cursor={}&limit={50<=N<=1000} 410 411 // 411 412 // returns 412 413 // {"dids":["did:A", "..."], "cursor":"opaque text"} ··· 653 654 654 655 // write {dauDirectoryDir}/d{YYYY-MM-DD}.pebble stats summary to {dauDirectoryDir}/d{YYYY-MM-DD}.csv.gz 655 656 func dauStats(oldDau *PebbleCollectionDirectory, dauDay time.Time, dauDir string, log *slog.Logger) { 657 + fname := fmt.Sprintf("d%s.csv.gz", dauDay.Format("2006-01-02")) 658 + outstatsPath := filepath.Join(dauDir, fname) 659 + log = log.With("path", outstatsPath) 660 + log.Info("DAU stats summarize") 656 661 stats, err := oldDau.GetCollectionStats() 657 662 e2 := oldDau.Close() 658 663 if e2 != nil { 659 - log.Error("old dau close", "err", e2) 664 + log.Error("old DAU close", "err", e2) 660 665 } 661 666 if err != nil { 662 - log.Error("old dau stats", "err", err) 667 + log.Error("old DAU stats", "err", err) 663 668 } else { 664 - fname := fmt.Sprintf("d%s.csv.gz", dauDay.Format("2006-01-02")) 665 - outstatsPath := filepath.Join(dauDir, fname) 669 + log.Info("DAU stats summarized", "rows", len(stats.CollectionCounts)) 666 670 pcdStatsToCsvGz(stats, outstatsPath, log) 667 671 } 668 672 } ··· 670 674 func pcdStatsToCsvGz(stats CollectionStats, outpath string, log *slog.Logger) { 671 675 fout, err := os.Create(outpath) 672 676 if err != nil { 673 - log.Error("dau stats open", "err", err) 677 + log.Error("DAU stats open", "err", err) 674 678 return 675 679 } 676 680 defer fout.Close() ··· 680 684 defer gzout.Close() 681 685 err = csvout.Write([]string{"collection", "count"}) 682 686 if err != nil { 683 - log.Error("dau stats header", "err", err) 687 + log.Error("DAU stats header", "err", err) 684 688 return 685 689 } 686 690 var row [2]string 691 + rowcount := 0 687 692 for collection, count := range stats.CollectionCounts { 688 693 row[0] = collection 689 694 row[1] = strconv.FormatUint(count, 10) 690 695 err = csvout.Write(row[:]) 691 696 if err != nil { 692 - log.Error("dau stats row", "err", err) 697 + log.Error("DAU stats row", "err", err) 693 698 return 694 699 } 700 + rowcount++ 695 701 } 702 + log.Info("DAU stats ok", "rows", rowcount) 696 703 } 697 704 698 705 func (cs *collectionServer) maybeDauWrite(didc DidCollection) error { ··· 744 751 return false 745 752 } 746 753 747 - // /v1/crawlRequest 754 + // /admin/pds/requestCrawl 755 + // same API signature as relay admin requestCrawl 756 + // starts a crawl and returns. See /v1/crawlStatus 748 757 // requires header `Authorization: Bearer {admin token}` 749 758 // 750 - // POST {"host":"one hostname or URL", "hosts":["up to 1000 hosts", "..."]} 759 + // POST {"hostname":"one hostname or URL", "hosts":["up to 1000 hosts", "..."]} 751 760 // OR 752 - // POST /v1/crawlRequest?host={one host} 761 + // POST /admin/pds/requestCrawl?hostname={one host} 753 762 func (cs *collectionServer) crawlPds(c echo.Context) error { 754 763 isAdmin := cs.isAdmin(c) 755 764 if !isAdmin {