this repo has no description
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

initial iteration of slinky tool (untested)

+278 -1
+274
cmd/slinky/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "log/slog" 8 + "os" 9 + "sync" 10 + 11 + comatproto "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/atproto/client" 13 + "github.com/bluesky-social/indigo/atproto/data" 14 + "github.com/bluesky-social/indigo/atproto/repo" 15 + "github.com/bluesky-social/indigo/atproto/syntax" 16 + "tangled.sh/bnewbold.net/cobalt/atproto/netclient" 17 + 18 + "github.com/ipfs/go-cid" 19 + "github.com/urfave/cli/v3" 20 + ) 21 + 22 + func userAgent() string { 23 + return "cobalt-slinky" 24 + } 25 + 26 + func main() { 27 + app := cli.Command{ 28 + Name: "slinky", 29 + Usage: "minimal atproto network dump/backfill tool", 30 + Flags: []cli.Flag{ 31 + &cli.StringFlag{ 32 + Name: "relay-host", 33 + Usage: "relay (and collectiondir) host, including URL scheme", 34 + Value: "https://relay1.us-west.bsky.network", 35 + Sources: cli.EnvVars("ATP_RELAY_HOST", "RELAY_HOST"), 36 + }, 37 + &cli.IntFlag{ 38 + Name: "jobs", 39 + Aliases: []string{"j"}, 40 + Usage: "worker concurrency", 41 + Value: 4, 42 + }, 43 + }, 44 + } 45 + app.Commands = []*cli.Command{ 46 + { 47 + Name: "dump-record", 48 + Usage: "enumerates and prints all instances of a specific record (collection+rkey)", 49 + ArgsUsage: "<collection>", 50 + Action: runDumpRecord, 51 + Flags: []cli.Flag{ 52 + &cli.StringFlag{ 53 + Name: "rkey", 54 + Usage: "fixed record key", 55 + Value: "self", 56 + }, 57 + }, 58 + }, 59 + { 60 + Name: "dump-collection", 61 + Usage: "enumerates and prints all records of a given type in the network", 62 + ArgsUsage: "<collection>", 63 + Action: runDumpCollection, 64 + }, 65 + } 66 + 67 + // default logging to stderr 68 + h := slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}) 69 + slog.SetDefault(slog.New(h)) 70 + 71 + if err := app.Run(context.Background(), os.Args); err != nil { 72 + fmt.Fprintf(os.Stderr, "error: %v\n", err) 73 + os.Exit(-1) 74 + } 75 + } 76 + 77 + // XXX: error handling (errgroup?) 78 + func enumerateAccounts(cmd *cli.Command, collection syntax.NSID, tasks chan syntax.DID) error { 79 + ctx := context.Background() 80 + defer close(tasks) 81 + 82 + c := client.NewAPIClient(cmd.String("relay-host")) 83 + c.Headers.Set("User-Agent", userAgent()) 84 + // TODO: robust HTTP client (retries, longer timeout) 85 + 86 + cursor := "" 87 + for { 88 + page, err := comatproto.SyncListReposByCollection(ctx, c, collection.String(), cursor, 0) 89 + if err != nil { 90 + slog.Error("enumerating accounts", "host", c.Host, "cursor", cursor, "err", err) 91 + return err 92 + } 93 + if page.Cursor == nil || *page.Cursor == "" || *page.Cursor == cursor { 94 + break 95 + } 96 + cursor = *page.Cursor 97 + 98 + for _, row := range page.Repos { 99 + did, err := syntax.ParseDID(row.Did) 100 + if err != nil { 101 + slog.Warn("invalid DID syntax in enumeration", "did", row.Did, "err", err) 102 + continue 103 + } 104 + tasks <- did 105 + } 106 + } 107 + return nil 108 + } 109 + 110 + type RecordLine struct { 111 + Value json.RawMessage `json:"value"` 112 + DID syntax.DID `json:"did"` 113 + Collection syntax.NSID `json:"collection"` 114 + RecordKey syntax.RecordKey `json:"rkey"` 115 + CID syntax.CID `json:"cid"` 116 + } 117 + 118 + func runDumpRecord(ctx context.Context, cmd *cli.Command) error { 119 + 120 + if cmd.Args().First() == "" { 121 + return fmt.Errorf("need to provide collection as an argument") 122 + } 123 + collection, err := syntax.ParseNSID(cmd.Args().First()) 124 + if err != nil { 125 + return err 126 + } 127 + 128 + rkey, err := syntax.ParseRecordKey(cmd.String("rkey")) 129 + if err != nil { 130 + return err 131 + } 132 + 133 + nc := netclient.NewNetClient() 134 + nc.UserAgent = userAgent() 135 + 136 + tasks := make(chan syntax.DID, 5000) 137 + wg := sync.WaitGroup{} 138 + for range cmd.Int("jobs") { 139 + wg.Add(1) 140 + go func() { 141 + ctx := context.Background() 142 + defer wg.Done() 143 + for did := range tasks { 144 + // do work here 145 + fmt.Println(did) // XXX 146 + 147 + var rec json.RawMessage 148 + recCID, err := nc.GetRecord(ctx, did, collection, rkey, &rec) 149 + if err != nil { 150 + slog.Error("failed fetching record from PDS", "did", did, "collection", collection, "rkey", rkey, "err", err) 151 + continue 152 + } 153 + 154 + line := RecordLine{ 155 + Value: rec, 156 + DID: did, 157 + Collection: collection, 158 + RecordKey: rkey, 159 + CID: recCID, 160 + } 161 + 162 + b, err := json.Marshal(line) 163 + if err != nil { 164 + slog.Error("failed serializing JSON", "err", err) 165 + continue 166 + } 167 + fmt.Println(string(b)) 168 + } 169 + }() 170 + } 171 + 172 + go enumerateAccounts(cmd, collection, tasks) 173 + 174 + wg.Wait() 175 + 176 + return nil 177 + } 178 + 179 + func runDumpCollection(ctx context.Context, cmd *cli.Command) error { 180 + 181 + if cmd.Args().First() == "" { 182 + return fmt.Errorf("need to provide collection as an argument") 183 + } 184 + collection, err := syntax.ParseNSID(cmd.Args().First()) 185 + if err != nil { 186 + return err 187 + } 188 + 189 + nc := netclient.NewNetClient() 190 + nc.UserAgent = userAgent() 191 + 192 + tasks := make(chan syntax.DID, 5000) 193 + wg := sync.WaitGroup{} 194 + for range cmd.Int("jobs") { 195 + wg.Add(1) 196 + go func() { 197 + ctx := context.Background() 198 + defer wg.Done() 199 + for did := range tasks { 200 + 201 + stream, err := nc.GetRepoCAR(ctx, did) 202 + if err != nil { 203 + slog.Error("failed fetching repo CAR", "did", did, "err", err) 204 + continue 205 + } 206 + // NOTE: must Close() stream in all code paths! 207 + 208 + _, repo, err := repo.LoadRepoFromCAR(ctx, stream) 209 + stream.Close() 210 + if err != nil { 211 + slog.Error("failed parsing repo CAR", "did", did, "err", err) 212 + continue 213 + } 214 + 215 + err = repo.MST.Walk(func(key []byte, val cid.Cid) error { 216 + wcoll, rkey, err := syntax.ParseRepoPath(string(key)) 217 + if err != nil { 218 + return err 219 + } 220 + 221 + // only visit records in collection 222 + if wcoll != collection { 223 + return nil 224 + } 225 + 226 + recBytes, recCID, err := repo.GetRecordBytes(ctx, collection, rkey) 227 + if err != nil { 228 + return err 229 + } 230 + 231 + recVal, err := data.UnmarshalCBOR(recBytes) 232 + if err != nil { 233 + return err 234 + } 235 + 236 + recJSON, err := json.Marshal(recVal) 237 + if err != nil { 238 + return err 239 + } 240 + 241 + var recRaw json.RawMessage 242 + if err := json.Unmarshal(recJSON, recRaw); err != nil { 243 + return err 244 + } 245 + 246 + line := RecordLine{ 247 + Value: recRaw, 248 + DID: did, 249 + Collection: collection, 250 + RecordKey: rkey, 251 + CID: syntax.CID(recCID.String()), 252 + } 253 + 254 + b, err := json.Marshal(line) 255 + if err != nil { 256 + return err 257 + } 258 + fmt.Println(string(b)) 259 + return nil 260 + }) 261 + if err != nil { 262 + slog.Error("failed processing record", "did", did, "err", err) 263 + continue 264 + } 265 + } 266 + }() 267 + } 268 + 269 + go enumerateAccounts(cmd, collection, tasks) 270 + 271 + wg.Wait() 272 + 273 + return nil 274 + }
+2 -1
go.mod
··· 9 9 github.com/flosch/pongo2/v6 v6.0.0 10 10 github.com/hashicorp/golang-lru/v2 v2.0.7 11 11 github.com/ipfs/go-cid v0.4.1 12 - github.com/ipfs/go-ipld-cbor v0.1.0 13 12 github.com/joho/godotenv v1.5.1 14 13 github.com/labstack/echo/v4 v4.11.3 15 14 github.com/miekg/dns v1.1.66 ··· 17 16 github.com/samber/slog-echo v1.8.0 18 17 github.com/stretchr/testify v1.10.0 19 18 github.com/urfave/cli/v2 v2.27.7 19 + github.com/urfave/cli/v3 v3.4.1 20 20 golang.org/x/net v0.39.0 21 21 gorm.io/driver/sqlite v1.5.5 22 22 gorm.io/gorm v1.25.9 ··· 46 46 github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect 47 47 github.com/ipfs/go-ipfs-exchange-interface v0.2.1 // indirect 48 48 github.com/ipfs/go-ipfs-util v0.0.3 // indirect 49 + github.com/ipfs/go-ipld-cbor v0.1.0 // indirect 49 50 github.com/ipfs/go-ipld-format v0.6.0 // indirect 50 51 github.com/ipfs/go-ipld-legacy v0.2.1 // indirect 51 52 github.com/ipfs/go-log v1.0.5 // indirect
+2
go.sum
··· 254 254 github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= 255 255 github.com/urfave/cli/v2 v2.27.7 h1:bH59vdhbjLv3LAvIu6gd0usJHgoTTPhCFib8qqOwXYU= 256 256 github.com/urfave/cli/v2 v2.27.7/go.mod h1:CyNAG/xg+iAOg0N4MPGZqVmv2rCoP267496AOXUZjA4= 257 + github.com/urfave/cli/v3 v3.4.1 h1:1M9UOCy5bLmGnuu1yn3t3CB4rG79Rtoxuv1sPhnm6qM= 258 + github.com/urfave/cli/v3 v3.4.1/go.mod h1:FJSKtM/9AiiTOJL4fJ6TbMUkxBXn7GO9guZqoZtpYpo= 257 259 github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= 258 260 github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= 259 261 github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=