this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

allow readStream to unpack records (#195)

authored by

Whyrusleeping and committed by
GitHub
31095afa 2d17f95a

+73 -1
+73 -1
cmd/gosky/main.go
··· 6 6 "context" 7 7 "encoding/json" 8 8 "fmt" 9 + "io" 9 10 "net/http" 10 11 "os" 11 12 "os/signal" ··· 27 28 28 29 "github.com/gorilla/websocket" 29 30 "github.com/ipfs/go-cid" 31 + "github.com/ipfs/go-datastore" 32 + blockstore "github.com/ipfs/go-ipfs-blockstore" 33 + "github.com/ipld/go-car" 30 34 31 35 _ "github.com/joho/godotenv/autoload" 32 36 ··· 929 933 &cli.BoolFlag{ 930 934 Name: "json", 931 935 }, 936 + &cli.BoolFlag{ 937 + Name: "unpack", 938 + }, 932 939 }, 933 940 ArgsUsage: `[<repo> [cursor]]`, 934 941 Action: func(cctx *cli.Context) error { ··· 951 958 } 952 959 953 960 jsonfmt := cctx.Bool("json") 961 + unpack := cctx.Bool("unpack") 954 962 955 963 fmt.Println("Stream Started", time.Now().Format(time.RFC3339)) 956 964 defer func() { ··· 975 983 } 976 984 out["blocks"] = fmt.Sprintf("[%d bytes]", len(evt.Blocks)) 977 985 986 + if unpack { 987 + recs, err := unpackRecords(evt.Blocks, evt.Ops) 988 + if err != nil { 989 + fmt.Println("Failed to unpack records: ", err) 990 + } 991 + out["records"] = recs 992 + } 993 + 978 994 b, err = json.Marshal(out) 979 995 if err != nil { 980 996 return err 981 997 } 982 998 fmt.Println(string(b)) 983 - 984 999 } else { 985 1000 pstr := "<nil>" 986 1001 if evt.Prev != nil && evt.Prev.Defined() { 987 1002 pstr = evt.Prev.String() 988 1003 } 989 1004 fmt.Printf("(%d) RepoAppend: %s (%s -> %s)\n", evt.Seq, evt.Repo, pstr, evt.Commit.String()) 1005 + 1006 + if unpack { 1007 + recs, err := unpackRecords(evt.Blocks, evt.Ops) 1008 + if err != nil { 1009 + fmt.Println("failed to unpack records: ", err) 1010 + } 1011 + 1012 + for _, rec := range recs { 1013 + switch rec := rec.(type) { 1014 + case *bsky.FeedPost: 1015 + fmt.Printf("\tPost: %q\n", strings.Replace(rec.Text, "\n", " ", -1)) 1016 + } 1017 + } 1018 + 1019 + } 990 1020 } 991 1021 992 1022 return nil ··· 1011 1041 } 1012 1042 return events.HandleRepoStream(ctx, con, &events.SequentialScheduler{rsc.EventHandler}) 1013 1043 }, 1044 + } 1045 + 1046 + func unpackRecords(blks []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) ([]any, error) { 1047 + ctx := context.TODO() 1048 + 1049 + bstore := blockstore.NewBlockstore(datastore.NewMapDatastore()) 1050 + carr, err := car.NewCarReader(bytes.NewReader(blks)) 1051 + if err != nil { 1052 + return nil, err 1053 + } 1054 + 1055 + for { 1056 + blk, err := carr.Next() 1057 + if err != nil { 1058 + if err == io.EOF { 1059 + break 1060 + } 1061 + return nil, err 1062 + } 1063 + if err := bstore.Put(ctx, blk); err != nil { 1064 + return nil, err 1065 + } 1066 + } 1067 + 1068 + r, err := repo.OpenRepo(ctx, bstore, carr.Header.Roots[0], false) 1069 + if err != nil { 1070 + return nil, err 1071 + } 1072 + 1073 + var out []any 1074 + for _, op := range ops { 1075 + if op.Action == "create" { 1076 + _, rec, err := r.GetRecord(ctx, op.Path) 1077 + if err != nil { 1078 + return nil, err 1079 + } 1080 + 1081 + out = append(out, rec) 1082 + } 1083 + } 1084 + 1085 + return out, nil 1014 1086 } 1015 1087 1016 1088 var getRecordCmd = &cli.Command{