this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fixing up events after refactor

+800 -311
+10 -10
api/atproto/repobatchWrite.go
··· 15 15 } 16 16 17 17 type RepoBatchWrite_Create struct { 18 - LexiconTypeID string `json:"$type,omitempty"` 19 - Action string `json:"action" cborgen:"action"` 20 - Collection string `json:"collection" cborgen:"collection"` 21 - Rkey *string `json:"rkey,omitempty" cborgen:"rkey"` 22 - Value any `json:"value" cborgen:"value"` 18 + LexiconTypeID string `json:"$type,omitempty"` 19 + Action string `json:"action" cborgen:"action"` 20 + Collection string `json:"collection" cborgen:"collection"` 21 + Rkey *string `json:"rkey,omitempty" cborgen:"rkey"` 22 + Value util.LexiconTypeDecoder `json:"value" cborgen:"value"` 23 23 } 24 24 25 25 type RepoBatchWrite_Delete struct { ··· 80 80 } 81 81 82 82 type RepoBatchWrite_Update struct { 83 - LexiconTypeID string `json:"$type,omitempty"` 84 - Action string `json:"action" cborgen:"action"` 85 - Collection string `json:"collection" cborgen:"collection"` 86 - Rkey string `json:"rkey" cborgen:"rkey"` 87 - Value any `json:"value" cborgen:"value"` 83 + LexiconTypeID string `json:"$type,omitempty"` 84 + Action string `json:"action" cborgen:"action"` 85 + Collection string `json:"collection" cborgen:"collection"` 86 + Rkey string `json:"rkey" cborgen:"rkey"` 87 + Value util.LexiconTypeDecoder `json:"value" cborgen:"value"` 88 88 } 89 89 90 90 func RepoBatchWrite(ctx context.Context, c *xrpc.Client, input *RepoBatchWrite_Input) error {
+6 -5
api/atproto/repocreateRecord.go
··· 3 3 import ( 4 4 "context" 5 5 6 + "github.com/whyrusleeping/gosky/lex/util" 6 7 "github.com/whyrusleeping/gosky/xrpc" 7 8 ) 8 9 ··· 12 13 } 13 14 14 15 type RepoCreateRecord_Input struct { 15 - LexiconTypeID string `json:"$type,omitempty"` 16 - Collection string `json:"collection" cborgen:"collection"` 17 - Did string `json:"did" cborgen:"did"` 18 - Record any `json:"record" cborgen:"record"` 19 - Validate *bool `json:"validate,omitempty" cborgen:"validate"` 16 + LexiconTypeID string `json:"$type,omitempty"` 17 + Collection string `json:"collection" cborgen:"collection"` 18 + Did string `json:"did" cborgen:"did"` 19 + Record util.LexiconTypeDecoder `json:"record" cborgen:"record"` 20 + Validate *bool `json:"validate,omitempty" cborgen:"validate"` 20 21 } 21 22 22 23 type RepoCreateRecord_Output struct {
+7 -6
api/atproto/repodescribe.go
··· 3 3 import ( 4 4 "context" 5 5 6 + "github.com/whyrusleeping/gosky/lex/util" 6 7 "github.com/whyrusleeping/gosky/xrpc" 7 8 ) 8 9 ··· 12 13 } 13 14 14 15 type RepoDescribe_Output struct { 15 - LexiconTypeID string `json:"$type,omitempty"` 16 - Collections []string `json:"collections" cborgen:"collections"` 17 - Did string `json:"did" cborgen:"did"` 18 - DidDoc any `json:"didDoc" cborgen:"didDoc"` 19 - Handle string `json:"handle" cborgen:"handle"` 20 - HandleIsCorrect bool `json:"handleIsCorrect" cborgen:"handleIsCorrect"` 16 + LexiconTypeID string `json:"$type,omitempty"` 17 + Collections []string `json:"collections" cborgen:"collections"` 18 + Did string `json:"did" cborgen:"did"` 19 + DidDoc util.LexiconTypeDecoder `json:"didDoc" cborgen:"didDoc"` 20 + Handle string `json:"handle" cborgen:"handle"` 21 + HandleIsCorrect bool `json:"handleIsCorrect" cborgen:"handleIsCorrect"` 21 22 } 22 23 23 24 func RepoDescribe(ctx context.Context, c *xrpc.Client, user string) (*RepoDescribe_Output, error) {
+5 -4
api/atproto/repogetRecord.go
··· 3 3 import ( 4 4 "context" 5 5 6 + "github.com/whyrusleeping/gosky/lex/util" 6 7 "github.com/whyrusleeping/gosky/xrpc" 7 8 ) 8 9 ··· 12 13 } 13 14 14 15 type RepoGetRecord_Output struct { 15 - LexiconTypeID string `json:"$type,omitempty"` 16 - Cid *string `json:"cid,omitempty" cborgen:"cid"` 17 - Uri string `json:"uri" cborgen:"uri"` 18 - Value any `json:"value" cborgen:"value"` 16 + LexiconTypeID string `json:"$type,omitempty"` 17 + Cid *string `json:"cid,omitempty" cborgen:"cid"` 18 + Uri string `json:"uri" cborgen:"uri"` 19 + Value util.LexiconTypeDecoder `json:"value" cborgen:"value"` 19 20 } 20 21 21 22 func RepoGetRecord(ctx context.Context, c *xrpc.Client, cid string, collection string, rkey string, user string) (*RepoGetRecord_Output, error) {
+5 -4
api/atproto/repolistRecords.go
··· 3 3 import ( 4 4 "context" 5 5 6 + "github.com/whyrusleeping/gosky/lex/util" 6 7 "github.com/whyrusleeping/gosky/xrpc" 7 8 ) 8 9 ··· 18 19 } 19 20 20 21 type RepoListRecords_Record struct { 21 - LexiconTypeID string `json:"$type,omitempty"` 22 - Cid string `json:"cid" cborgen:"cid"` 23 - Uri string `json:"uri" cborgen:"uri"` 24 - Value any `json:"value" cborgen:"value"` 22 + LexiconTypeID string `json:"$type,omitempty"` 23 + Cid string `json:"cid" cborgen:"cid"` 24 + Uri string `json:"uri" cborgen:"uri"` 25 + Value util.LexiconTypeDecoder `json:"value" cborgen:"value"` 25 26 } 26 27 27 28 func RepoListRecords(ctx context.Context, c *xrpc.Client, after string, before string, collection string, limit int64, reverse bool, user string) (*RepoListRecords_Output, error) {
+7 -6
api/atproto/repoputRecord.go
··· 3 3 import ( 4 4 "context" 5 5 6 + "github.com/whyrusleeping/gosky/lex/util" 6 7 "github.com/whyrusleeping/gosky/xrpc" 7 8 ) 8 9 ··· 12 13 } 13 14 14 15 type RepoPutRecord_Input struct { 15 - LexiconTypeID string `json:"$type,omitempty"` 16 - Collection string `json:"collection" cborgen:"collection"` 17 - Did string `json:"did" cborgen:"did"` 18 - Record any `json:"record" cborgen:"record"` 19 - Rkey string `json:"rkey" cborgen:"rkey"` 20 - Validate *bool `json:"validate,omitempty" cborgen:"validate"` 16 + LexiconTypeID string `json:"$type,omitempty"` 17 + Collection string `json:"collection" cborgen:"collection"` 18 + Did string `json:"did" cborgen:"did"` 19 + Record util.LexiconTypeDecoder `json:"record" cborgen:"record"` 20 + Rkey string `json:"rkey" cborgen:"rkey"` 21 + Validate *bool `json:"validate,omitempty" cborgen:"validate"` 21 22 } 22 23 23 24 type RepoPutRecord_Output struct {
+1 -1
api/bsky/actorprofile.go
··· 7 7 // schema: app.bsky.actor.profile 8 8 9 9 func init() { 10 - util.RegisterType("app.bsky.actor.profile", ActorProfile{}) 10 + util.RegisterType("app.bsky.actor.profile", &ActorProfile{}) 11 11 } 12 12 13 13 // RECORDTYPE: ActorProfile
+4 -4
api/bsky/actorupdateProfile.go
··· 22 22 } 23 23 24 24 type ActorUpdateProfile_Output struct { 25 - LexiconTypeID string `json:"$type,omitempty"` 26 - Cid string `json:"cid" cborgen:"cid"` 27 - Record any `json:"record" cborgen:"record"` 28 - Uri string `json:"uri" cborgen:"uri"` 25 + LexiconTypeID string `json:"$type,omitempty"` 26 + Cid string `json:"cid" cborgen:"cid"` 27 + Record util.LexiconTypeDecoder `json:"record" cborgen:"record"` 28 + Uri string `json:"uri" cborgen:"uri"` 29 29 } 30 30 31 31 func ActorUpdateProfile(ctx context.Context, c *xrpc.Client, input *ActorUpdateProfile_Input) (*ActorUpdateProfile_Output, error) {
+392
api/bsky/cbor_gen.go
··· 2952 2952 2953 2953 return nil 2954 2954 } 2955 + func (t *GraphAssertion) MarshalCBOR(w io.Writer) error { 2956 + if t == nil { 2957 + _, err := w.Write(cbg.CborNull) 2958 + return err 2959 + } 2960 + 2961 + cw := cbg.NewCborWriter(w) 2962 + 2963 + if _, err := cw.Write([]byte{164}); err != nil { 2964 + return err 2965 + } 2966 + 2967 + // t.LexiconTypeID (string) (string) 2968 + if len("$type") > cbg.MaxLength { 2969 + return xerrors.Errorf("Value in field \"$type\" was too long") 2970 + } 2971 + 2972 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("$type"))); err != nil { 2973 + return err 2974 + } 2975 + if _, err := io.WriteString(w, string("$type")); err != nil { 2976 + return err 2977 + } 2978 + 2979 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.graph.assertion"))); err != nil { 2980 + return err 2981 + } 2982 + if _, err := io.WriteString(w, string("app.bsky.graph.assertion")); err != nil { 2983 + return err 2984 + } 2985 + 2986 + // t.Subject (schemagen.ActorRef) (struct) 2987 + if len("subject") > cbg.MaxLength { 2988 + return xerrors.Errorf("Value in field \"subject\" was too long") 2989 + } 2990 + 2991 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("subject"))); err != nil { 2992 + return err 2993 + } 2994 + if _, err := io.WriteString(w, string("subject")); err != nil { 2995 + return err 2996 + } 2997 + 2998 + if err := t.Subject.MarshalCBOR(cw); err != nil { 2999 + return err 3000 + } 3001 + 3002 + // t.Assertion (string) (string) 3003 + if len("assertion") > cbg.MaxLength { 3004 + return xerrors.Errorf("Value in field \"assertion\" was too long") 3005 + } 3006 + 3007 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("assertion"))); err != nil { 3008 + return err 3009 + } 3010 + if _, err := io.WriteString(w, string("assertion")); err != nil { 3011 + return err 3012 + } 3013 + 3014 + if len(t.Assertion) > cbg.MaxLength { 3015 + return xerrors.Errorf("Value in field t.Assertion was too long") 3016 + } 3017 + 3018 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Assertion))); err != nil { 3019 + return err 3020 + } 3021 + if _, err := io.WriteString(w, string(t.Assertion)); err != nil { 3022 + return err 3023 + } 3024 + 3025 + // t.CreatedAt (string) (string) 3026 + if len("createdAt") > cbg.MaxLength { 3027 + return xerrors.Errorf("Value in field \"createdAt\" was too long") 3028 + } 3029 + 3030 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("createdAt"))); err != nil { 3031 + return err 3032 + } 3033 + if _, err := io.WriteString(w, string("createdAt")); err != nil { 3034 + return err 3035 + } 3036 + 3037 + if len(t.CreatedAt) > cbg.MaxLength { 3038 + return xerrors.Errorf("Value in field t.CreatedAt was too long") 3039 + } 3040 + 3041 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.CreatedAt))); err != nil { 3042 + return err 3043 + } 3044 + if _, err := io.WriteString(w, string(t.CreatedAt)); err != nil { 3045 + return err 3046 + } 3047 + return nil 3048 + } 3049 + 3050 + func (t *GraphAssertion) UnmarshalCBOR(r io.Reader) (err error) { 3051 + *t = GraphAssertion{} 3052 + 3053 + cr := cbg.NewCborReader(r) 3054 + 3055 + maj, extra, err := cr.ReadHeader() 3056 + if err != nil { 3057 + return err 3058 + } 3059 + defer func() { 3060 + if err == io.EOF { 3061 + err = io.ErrUnexpectedEOF 3062 + } 3063 + }() 3064 + 3065 + if maj != cbg.MajMap { 3066 + return fmt.Errorf("cbor input should be of type map") 3067 + } 3068 + 3069 + if extra > cbg.MaxLength { 3070 + return fmt.Errorf("GraphAssertion: map struct too large (%d)", extra) 3071 + } 3072 + 3073 + var name string 3074 + n := extra 3075 + 3076 + for i := uint64(0); i < n; i++ { 3077 + 3078 + { 3079 + sval, err := cbg.ReadString(cr) 3080 + if err != nil { 3081 + return err 3082 + } 3083 + 3084 + name = string(sval) 3085 + } 3086 + 3087 + switch name { 3088 + // t.LexiconTypeID (string) (string) 3089 + case "$type": 3090 + 3091 + { 3092 + sval, err := cbg.ReadString(cr) 3093 + if err != nil { 3094 + return err 3095 + } 3096 + 3097 + t.LexiconTypeID = string(sval) 3098 + } 3099 + // t.Subject (schemagen.ActorRef) (struct) 3100 + case "subject": 3101 + 3102 + { 3103 + 3104 + b, err := cr.ReadByte() 3105 + if err != nil { 3106 + return err 3107 + } 3108 + if b != cbg.CborNull[0] { 3109 + if err := cr.UnreadByte(); err != nil { 3110 + return err 3111 + } 3112 + t.Subject = new(ActorRef) 3113 + if err := t.Subject.UnmarshalCBOR(cr); err != nil { 3114 + return xerrors.Errorf("unmarshaling t.Subject pointer: %w", err) 3115 + } 3116 + } 3117 + 3118 + } 3119 + // t.Assertion (string) (string) 3120 + case "assertion": 3121 + 3122 + { 3123 + sval, err := cbg.ReadString(cr) 3124 + if err != nil { 3125 + return err 3126 + } 3127 + 3128 + t.Assertion = string(sval) 3129 + } 3130 + // t.CreatedAt (string) (string) 3131 + case "createdAt": 3132 + 3133 + { 3134 + sval, err := cbg.ReadString(cr) 3135 + if err != nil { 3136 + return err 3137 + } 3138 + 3139 + t.CreatedAt = string(sval) 3140 + } 3141 + 3142 + default: 3143 + // Field doesn't exist on this type, so ignore it 3144 + cbg.ScanForLinks(r, func(cid.Cid) {}) 3145 + } 3146 + } 3147 + 3148 + return nil 3149 + } 3150 + func (t *GraphConfirmation) MarshalCBOR(w io.Writer) error { 3151 + if t == nil { 3152 + _, err := w.Write(cbg.CborNull) 3153 + return err 3154 + } 3155 + 3156 + cw := cbg.NewCborWriter(w) 3157 + 3158 + if _, err := cw.Write([]byte{164}); err != nil { 3159 + return err 3160 + } 3161 + 3162 + // t.LexiconTypeID (string) (string) 3163 + if len("$type") > cbg.MaxLength { 3164 + return xerrors.Errorf("Value in field \"$type\" was too long") 3165 + } 3166 + 3167 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("$type"))); err != nil { 3168 + return err 3169 + } 3170 + if _, err := io.WriteString(w, string("$type")); err != nil { 3171 + return err 3172 + } 3173 + 3174 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.graph.confirmation"))); err != nil { 3175 + return err 3176 + } 3177 + if _, err := io.WriteString(w, string("app.bsky.graph.confirmation")); err != nil { 3178 + return err 3179 + } 3180 + 3181 + // t.Assertion (schemagen.RepoStrongRef) (struct) 3182 + if len("assertion") > cbg.MaxLength { 3183 + return xerrors.Errorf("Value in field \"assertion\" was too long") 3184 + } 3185 + 3186 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("assertion"))); err != nil { 3187 + return err 3188 + } 3189 + if _, err := io.WriteString(w, string("assertion")); err != nil { 3190 + return err 3191 + } 3192 + 3193 + if err := t.Assertion.MarshalCBOR(cw); err != nil { 3194 + return err 3195 + } 3196 + 3197 + // t.CreatedAt (string) (string) 3198 + if len("createdAt") > cbg.MaxLength { 3199 + return xerrors.Errorf("Value in field \"createdAt\" was too long") 3200 + } 3201 + 3202 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("createdAt"))); err != nil { 3203 + return err 3204 + } 3205 + if _, err := io.WriteString(w, string("createdAt")); err != nil { 3206 + return err 3207 + } 3208 + 3209 + if len(t.CreatedAt) > cbg.MaxLength { 3210 + return xerrors.Errorf("Value in field t.CreatedAt was too long") 3211 + } 3212 + 3213 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.CreatedAt))); err != nil { 3214 + return err 3215 + } 3216 + if _, err := io.WriteString(w, string(t.CreatedAt)); err != nil { 3217 + return err 3218 + } 3219 + 3220 + // t.Originator (schemagen.ActorRef) (struct) 3221 + if len("originator") > cbg.MaxLength { 3222 + return xerrors.Errorf("Value in field \"originator\" was too long") 3223 + } 3224 + 3225 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("originator"))); err != nil { 3226 + return err 3227 + } 3228 + if _, err := io.WriteString(w, string("originator")); err != nil { 3229 + return err 3230 + } 3231 + 3232 + if err := t.Originator.MarshalCBOR(cw); err != nil { 3233 + return err 3234 + } 3235 + return nil 3236 + } 3237 + 3238 + func (t *GraphConfirmation) UnmarshalCBOR(r io.Reader) (err error) { 3239 + *t = GraphConfirmation{} 3240 + 3241 + cr := cbg.NewCborReader(r) 3242 + 3243 + maj, extra, err := cr.ReadHeader() 3244 + if err != nil { 3245 + return err 3246 + } 3247 + defer func() { 3248 + if err == io.EOF { 3249 + err = io.ErrUnexpectedEOF 3250 + } 3251 + }() 3252 + 3253 + if maj != cbg.MajMap { 3254 + return fmt.Errorf("cbor input should be of type map") 3255 + } 3256 + 3257 + if extra > cbg.MaxLength { 3258 + return fmt.Errorf("GraphConfirmation: map struct too large (%d)", extra) 3259 + } 3260 + 3261 + var name string 3262 + n := extra 3263 + 3264 + for i := uint64(0); i < n; i++ { 3265 + 3266 + { 3267 + sval, err := cbg.ReadString(cr) 3268 + if err != nil { 3269 + return err 3270 + } 3271 + 3272 + name = string(sval) 3273 + } 3274 + 3275 + switch name { 3276 + // t.LexiconTypeID (string) (string) 3277 + case "$type": 3278 + 3279 + { 3280 + sval, err := cbg.ReadString(cr) 3281 + if err != nil { 3282 + return err 3283 + } 3284 + 3285 + t.LexiconTypeID = string(sval) 3286 + } 3287 + // t.Assertion (schemagen.RepoStrongRef) (struct) 3288 + case "assertion": 3289 + 3290 + { 3291 + 3292 + b, err := cr.ReadByte() 3293 + if err != nil { 3294 + return err 3295 + } 3296 + if b != cbg.CborNull[0] { 3297 + if err := cr.UnreadByte(); err != nil { 3298 + return err 3299 + } 3300 + t.Assertion = new(schemagen.RepoStrongRef) 3301 + if err := t.Assertion.UnmarshalCBOR(cr); err != nil { 3302 + return xerrors.Errorf("unmarshaling t.Assertion pointer: %w", err) 3303 + } 3304 + } 3305 + 3306 + } 3307 + // t.CreatedAt (string) (string) 3308 + case "createdAt": 3309 + 3310 + { 3311 + sval, err := cbg.ReadString(cr) 3312 + if err != nil { 3313 + return err 3314 + } 3315 + 3316 + t.CreatedAt = string(sval) 3317 + } 3318 + // t.Originator (schemagen.ActorRef) (struct) 3319 + case "originator": 3320 + 3321 + { 3322 + 3323 + b, err := cr.ReadByte() 3324 + if err != nil { 3325 + return err 3326 + } 3327 + if b != cbg.CborNull[0] { 3328 + if err := cr.UnreadByte(); err != nil { 3329 + return err 3330 + } 3331 + t.Originator = new(ActorRef) 3332 + if err := t.Originator.UnmarshalCBOR(cr); err != nil { 3333 + return xerrors.Errorf("unmarshaling t.Originator pointer: %w", err) 3334 + } 3335 + } 3336 + 3337 + } 3338 + 3339 + default: 3340 + // Field doesn't exist on this type, so ignore it 3341 + cbg.ScanForLinks(r, func(cid.Cid) {}) 3342 + } 3343 + } 3344 + 3345 + return nil 3346 + }
+13 -13
api/bsky/feedpost.go
··· 14 14 // schema: app.bsky.feed.post 15 15 16 16 func init() { 17 - util.RegisterType("app.bsky.feed.post", FeedPost{}) 17 + util.RegisterType("app.bsky.feed.post", &FeedPost{}) 18 18 } 19 19 20 20 // RECORDTYPE: FeedPost ··· 115 115 } 116 116 117 117 type FeedPost_View struct { 118 - LexiconTypeID string `json:"$type,omitempty"` 119 - Author *ActorRef_WithInfo `json:"author" cborgen:"author"` 120 - Cid string `json:"cid" cborgen:"cid"` 121 - DownvoteCount int64 `json:"downvoteCount" cborgen:"downvoteCount"` 122 - Embed *FeedPost_View_Embed `json:"embed,omitempty" cborgen:"embed"` 123 - IndexedAt string `json:"indexedAt" cborgen:"indexedAt"` 124 - Record any `json:"record" cborgen:"record"` 125 - ReplyCount int64 `json:"replyCount" cborgen:"replyCount"` 126 - RepostCount int64 `json:"repostCount" cborgen:"repostCount"` 127 - UpvoteCount int64 `json:"upvoteCount" cborgen:"upvoteCount"` 128 - Uri string `json:"uri" cborgen:"uri"` 129 - Viewer *FeedPost_ViewerState `json:"viewer" cborgen:"viewer"` 118 + LexiconTypeID string `json:"$type,omitempty"` 119 + Author *ActorRef_WithInfo `json:"author" cborgen:"author"` 120 + Cid string `json:"cid" cborgen:"cid"` 121 + DownvoteCount int64 `json:"downvoteCount" cborgen:"downvoteCount"` 122 + Embed *FeedPost_View_Embed `json:"embed,omitempty" cborgen:"embed"` 123 + IndexedAt string `json:"indexedAt" cborgen:"indexedAt"` 124 + Record util.LexiconTypeDecoder `json:"record" cborgen:"record"` 125 + ReplyCount int64 `json:"replyCount" cborgen:"replyCount"` 126 + RepostCount int64 `json:"repostCount" cborgen:"repostCount"` 127 + UpvoteCount int64 `json:"upvoteCount" cborgen:"upvoteCount"` 128 + Uri string `json:"uri" cborgen:"uri"` 129 + Viewer *FeedPost_ViewerState `json:"viewer" cborgen:"viewer"` 130 130 } 131 131 132 132 type FeedPost_View_Embed struct {
+1 -1
api/bsky/feedrepost.go
··· 8 8 // schema: app.bsky.feed.repost 9 9 10 10 func init() { 11 - util.RegisterType("app.bsky.feed.repost", FeedRepost{}) 11 + util.RegisterType("app.bsky.feed.repost", &FeedRepost{}) 12 12 } 13 13 14 14 // RECORDTYPE: FeedRepost
+1 -1
api/bsky/feedtrend.go
··· 8 8 // schema: app.bsky.feed.trend 9 9 10 10 func init() { 11 - util.RegisterType("app.bsky.feed.trend", FeedTrend{}) 11 + util.RegisterType("app.bsky.feed.trend", &FeedTrend{}) 12 12 } 13 13 14 14 // RECORDTYPE: FeedTrend
+1 -1
api/bsky/feedvote.go
··· 8 8 // schema: app.bsky.feed.vote 9 9 10 10 func init() { 11 - util.RegisterType("app.bsky.feed.vote", FeedVote{}) 11 + util.RegisterType("app.bsky.feed.vote", &FeedVote{}) 12 12 } 13 13 14 14 // RECORDTYPE: FeedVote
+1 -1
api/bsky/graphassertion.go
··· 7 7 // schema: app.bsky.graph.assertion 8 8 9 9 func init() { 10 - util.RegisterType("app.bsky.graph.assertion", GraphAssertion{}) 10 + util.RegisterType("app.bsky.graph.assertion", &GraphAssertion{}) 11 11 } 12 12 13 13 // RECORDTYPE: GraphAssertion
+1 -1
api/bsky/graphconfirmation.go
··· 8 8 // schema: app.bsky.graph.confirmation 9 9 10 10 func init() { 11 - util.RegisterType("app.bsky.graph.confirmation", GraphConfirmation{}) 11 + util.RegisterType("app.bsky.graph.confirmation", &GraphConfirmation{}) 12 12 } 13 13 14 14 // RECORDTYPE: GraphConfirmation
+1 -1
api/bsky/graphfollow.go
··· 7 7 // schema: app.bsky.graph.follow 8 8 9 9 func init() { 10 - util.RegisterType("app.bsky.graph.follow", GraphFollow{}) 10 + util.RegisterType("app.bsky.graph.follow", &GraphFollow{}) 11 11 } 12 12 13 13 // RECORDTYPE: GraphFollow
+10 -9
api/bsky/notificationlist.go
··· 3 3 import ( 4 4 "context" 5 5 6 + "github.com/whyrusleeping/gosky/lex/util" 6 7 "github.com/whyrusleeping/gosky/xrpc" 7 8 ) 8 9 ··· 12 13 } 13 14 14 15 type NotificationList_Notification struct { 15 - LexiconTypeID string `json:"$type,omitempty"` 16 - Author *ActorRef_WithInfo `json:"author" cborgen:"author"` 17 - Cid string `json:"cid" cborgen:"cid"` 18 - IndexedAt string `json:"indexedAt" cborgen:"indexedAt"` 19 - IsRead bool `json:"isRead" cborgen:"isRead"` 20 - Reason string `json:"reason" cborgen:"reason"` 21 - ReasonSubject *string `json:"reasonSubject,omitempty" cborgen:"reasonSubject"` 22 - Record any `json:"record" cborgen:"record"` 23 - Uri string `json:"uri" cborgen:"uri"` 16 + LexiconTypeID string `json:"$type,omitempty"` 17 + Author *ActorRef_WithInfo `json:"author" cborgen:"author"` 18 + Cid string `json:"cid" cborgen:"cid"` 19 + IndexedAt string `json:"indexedAt" cborgen:"indexedAt"` 20 + IsRead bool `json:"isRead" cborgen:"isRead"` 21 + Reason string `json:"reason" cborgen:"reason"` 22 + ReasonSubject *string `json:"reasonSubject,omitempty" cborgen:"reasonSubject"` 23 + Record util.LexiconTypeDecoder `json:"record" cborgen:"record"` 24 + Uri string `json:"uri" cborgen:"uri"` 24 25 } 25 26 26 27 type NotificationList_Output struct {
+1 -1
api/bsky/systemdeclaration.go
··· 7 7 // schema: app.bsky.system.declaration 8 8 9 9 func init() { 10 - util.RegisterType("app.bsky.system.declaration", SystemDeclaration{}) 10 + util.RegisterType("app.bsky.system.declaration", &SystemDeclaration{}) 11 11 } 12 12 13 13 // RECORDTYPE: SystemDeclaration
+15 -9
events/events.go
··· 68 68 } 69 69 70 70 const ( 71 - EvtKindCreateRecord = "createRecord" 72 - EvtKindUpdateRecord = "updateRecord" 73 - EvtKindDeleteRecord = "deleteRecord" 71 + EvtKindRepoChange = "repoChange" 74 72 ) 75 73 76 74 type Event struct { 77 - Kind string 78 75 79 76 // User is the DID of the user this event is about 80 77 User string 81 78 82 - Collection string 83 - Rkey string 84 - DID string 85 - CarSlice []byte 79 + Kind string 86 80 87 - // some private fields for processing metadata 81 + RepoOps []*RepoOp 82 + RepoReset bool 83 + CarSlice []byte 84 + 85 + // some private fields for internal routing perf 88 86 PrivUid uint `json:"-"` 89 87 PrivPdsId uint `json:"-"` 88 + PrivRelevantPds []uint `json:"-"` 89 + } 90 + 91 + type RepoOp struct { 92 + Kind string 93 + Collection string 94 + Rkey string 95 + 90 96 PrivRelevantPds []uint `json:"-"` 91 97 } 92 98
+1 -1
gen/main.go
··· 33 33 // RECORDTYPE: FeedRepost 34 34 // RECORDTYPE: FeedPost 35 35 36 - if err := cbg.WriteMapEncodersToFile("api/bsky/cbor_gen.go", "schemagen", bsky.FeedPost{}, bsky.FeedRepost{}, bsky.FeedTrend{}, bsky.FeedVote{}, bsky.FeedPost_Entity{}, bsky.FeedPost_ReplyRef{}, bsky.FeedPost_TextSlice{}, bsky.EmbedImages{}, bsky.EmbedImages_PresentedImage{}, bsky.EmbedExternal{}, bsky.EmbedExternal_External{}, bsky.EmbedImages_Image{}, bsky.GraphFollow{}, bsky.ActorRef{}, bsky.ActorProfile{}, bsky.SystemDeclaration{}); err != nil { 36 + if err := cbg.WriteMapEncodersToFile("api/bsky/cbor_gen.go", "schemagen", bsky.FeedPost{}, bsky.FeedRepost{}, bsky.FeedTrend{}, bsky.FeedVote{}, bsky.FeedPost_Entity{}, bsky.FeedPost_ReplyRef{}, bsky.FeedPost_TextSlice{}, bsky.EmbedImages{}, bsky.EmbedImages_PresentedImage{}, bsky.EmbedExternal{}, bsky.EmbedExternal_External{}, bsky.EmbedImages_Image{}, bsky.GraphFollow{}, bsky.ActorRef{}, bsky.ActorProfile{}, bsky.SystemDeclaration{}, bsky.GraphAssertion{}, bsky.GraphConfirmation{}); err != nil { 37 37 panic(err) 38 38 } 39 39
+87 -71
indexer/indexer.go
··· 48 48 return nil 49 49 } 50 50 51 - func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) { 51 + func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) error { 52 52 ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent") 53 53 defer span.End() 54 54 55 55 if err := ix.catchup(ctx, evt); err != nil { 56 - log.Println("failed to catch up on user repo changes, processing events off base: ", err) 56 + return fmt.Errorf("failed to catch up on user repo changes, processing events off base: %w", err) 57 57 } 58 58 59 - fmt.Println("Handling Event!", evt.Kind) 59 + fmt.Println("Handling Event!") 60 + var relpds []uint 61 + var repoOps []*events.RepoOp 62 + for _, op := range evt.Ops { 63 + switch op.Kind { 64 + case repomgr.EvtKindCreateRecord: 65 + rop, err := ix.handleRecordCreate(ctx, evt, &op, true) 66 + if err != nil { 67 + return fmt.Errorf("handle recordCreate: %w", err) 68 + } 69 + repoOps = append(repoOps, rop) 70 + relpds = append(relpds, rop.PrivRelevantPds...) 71 + case repomgr.EvtKindInitActor: 72 + rop, err := ix.handleInitActor(ctx, evt, &op) 73 + if err != nil { 74 + log.Println("handle initActor: ", err) 75 + } 60 76 61 - switch evt.Kind { 62 - case repomgr.EvtKindCreateRecord: 63 - if err := ix.handleRecordCreate(ctx, evt, true); err != nil { 64 - log.Println("handle recordCreate: ", err) 77 + repoOps = append(repoOps, rop) 78 + default: 79 + return fmt.Errorf("unrecognized repo event type: %q", op.Kind) 65 80 } 66 - case repomgr.EvtKindInitActor: 67 - if err := ix.handleInitActor(ctx, evt); err != nil { 68 - log.Println("handle initActor: ", err) 69 - } 70 - default: 71 - log.Println("unrecognized repo event type: ", evt.Kind) 81 + } 82 + 83 + did, err := ix.DidForUser(ctx, evt.User) 84 + if err != nil { 85 + return err 86 + } 87 + 88 + fmt.Println("Sending event: ", relpds) 89 + if err := ix.events.AddEvent(&events.Event{ 90 + Kind: events.EvtKindRepoChange, 91 + CarSlice: evt.RepoSlice, 92 + PrivUid: evt.User, 93 + RepoOps: repoOps, 94 + User: did, 95 + PrivRelevantPds: relpds, 96 + }); err != nil { 97 + return fmt.Errorf("failed to push event: %s", err) 72 98 } 99 + 100 + return nil 73 101 } 74 102 75 - func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEvent, local bool) error { 76 - fmt.Println("record create event", evt.Collection) 77 - var relevantPds []uint 78 - switch rec := evt.Record.(type) { 103 + func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) (*events.RepoOp, error) { 104 + fmt.Println("record create event", op.Collection) 105 + out := &events.RepoOp{ 106 + Kind: string(repomgr.EvtKindCreateRecord), 107 + Collection: op.Collection, 108 + Rkey: op.Rkey, 109 + } 110 + switch rec := op.Record.(type) { 79 111 case *bsky.FeedPost: 80 112 var replyid uint 81 113 if rec.Reply != nil { 82 114 replyto, err := ix.GetPost(ctx, rec.Reply.Parent.Uri) 83 115 if err != nil { 84 - return err 116 + return nil, err 85 117 } 86 118 87 119 replyid = replyto.ID 88 120 } 89 121 90 122 fp := types.FeedPost{ 91 - Rkey: evt.Rkey, 92 - Cid: evt.RecCid.String(), 123 + Rkey: op.Rkey, 124 + Cid: op.RecCid.String(), 93 125 Author: evt.User, 94 126 ReplyTo: replyid, 95 127 } 96 128 if err := ix.db.Create(&fp).Error; err != nil { 97 - return err 129 + return nil, err 98 130 } 99 131 100 132 if err := ix.addNewPostNotification(ctx, rec, &fp); err != nil { 101 - return err 133 + return nil, err 102 134 } 103 135 104 136 case *bsky.FeedRepost: 105 137 fp, err := ix.GetPost(ctx, rec.Subject.Uri) 106 138 if err != nil { 107 - return err 139 + return nil, err 108 140 } 109 141 110 142 author, err := ix.lookupUser(ctx, fp.Author) 111 143 if err != nil { 112 - return err 144 + return nil, err 113 145 } 114 146 115 - relevantPds = append(relevantPds, author.PDS) 147 + out.PrivRelevantPds = append(out.PrivRelevantPds, author.PDS) 116 148 117 149 rr := types.RepostRecord{ 118 150 RecCreated: rec.CreatedAt, 119 151 Post: fp.ID, 120 152 Reposter: evt.User, 121 153 Author: fp.Author, 122 - RecCid: evt.RecCid.String(), 123 - Rkey: evt.Rkey, 154 + RecCid: op.RecCid.String(), 155 + Rkey: op.Rkey, 124 156 } 125 157 if err := ix.db.Create(&rr).Error; err != nil { 126 - return err 158 + return nil, err 127 159 } 128 160 129 161 if err := ix.notifman.AddRepost(ctx, fp.Author, rr.ID, evt.User); err != nil { 130 - return err 162 + return nil, err 131 163 } 132 164 133 165 case *bsky.FeedVote: ··· 141 173 val = -1 142 174 dbdir = types.VoteDirDown 143 175 default: 144 - return fmt.Errorf("invalid vote direction: %q", rec.Direction) 176 + return nil, fmt.Errorf("invalid vote direction: %q", rec.Direction) 145 177 } 146 178 147 179 puri, err := parseAtUri(rec.Subject.Uri) 148 180 if err != nil { 149 - return err 181 + return nil, err 150 182 } 151 183 152 184 act, err := ix.lookupUserByDid(ctx, puri.Did) 153 185 if err != nil { 154 - return err 186 + return nil, err 155 187 } 156 188 157 - relevantPds = append(relevantPds, act.PDS) 189 + out.PrivRelevantPds = append(out.PrivRelevantPds, act.PDS) 158 190 159 191 var post types.FeedPost 160 192 if err := ix.db.First(&post, "rkey = ? AND author = ?", puri.Rkey, act.Uid).Error; err != nil { 161 - return err 193 + return nil, err 162 194 } 163 195 164 196 vr := types.VoteRecord{ ··· 166 198 Voter: evt.User, 167 199 Post: post.ID, 168 200 Created: rec.CreatedAt, 169 - Rkey: evt.Rkey, 170 - Cid: evt.RecCid.String(), 201 + Rkey: op.Rkey, 202 + Cid: op.RecCid.String(), 171 203 } 172 204 if err := ix.db.Create(&vr).Error; err != nil { 173 - return err 205 + return nil, err 174 206 } 175 207 176 208 if err := ix.db.Model(types.FeedPost{}).Where("id = ?", post.ID).Update("up_count", gorm.Expr("up_count + ?", val)).Error; err != nil { 177 - return err 209 + return nil, err 178 210 } 179 211 180 212 if rec.Direction == "up" { 181 213 if err := ix.addNewVoteNotification(ctx, act.ID, &vr); err != nil { 182 - return err 214 + return nil, err 183 215 } 184 216 } 185 217 ··· 187 219 subj, err := ix.lookupUserByDid(ctx, rec.Subject.Did) 188 220 if err != nil { 189 221 if !errors.Is(err, gorm.ErrRecordNotFound) { 190 - return err 222 + return nil, fmt.Errorf("failed to lookup user: %w", err) 191 223 } 192 224 nu, err := ix.CreateExternalUser(ctx, rec.Subject.Did) 193 225 if err != nil { 194 - return err 226 + return nil, fmt.Errorf("create external user: %w", err) 195 227 } 196 228 197 229 subj = nu 198 230 } 199 231 200 232 if subj.PDS != 0 { 201 - relevantPds = append(relevantPds, subj.PDS) 233 + out.PrivRelevantPds = append(out.PrivRelevantPds, subj.PDS) 202 234 } 203 235 204 236 // 'follower' followed 'target' 205 237 fr := types.FollowRecord{ 206 238 Follower: evt.User, 207 239 Target: subj.ID, 208 - Rkey: evt.Rkey, 209 - Cid: evt.RecCid.String(), 240 + Rkey: op.Rkey, 241 + Cid: op.RecCid.String(), 210 242 } 211 243 if err := ix.db.Create(&fr).Error; err != nil { 212 - return err 244 + return nil, err 213 245 } 214 246 215 247 if err := ix.notifman.AddFollow(ctx, fr.Follower, fr.Target, fr.ID); err != nil { 216 - return err 248 + return nil, err 217 249 } 218 250 219 251 if local && subj.PDS != 0 { ··· 223 255 } 224 256 225 257 default: 226 - return fmt.Errorf("unrecognized record type: %T", rec) 227 - } 228 - 229 - did, err := ix.DidForUser(ctx, evt.User) 230 - if err != nil { 231 - return err 258 + return nil, fmt.Errorf("unrecognized record type: %T", rec) 232 259 } 233 260 234 - fmt.Println("Sending event: ", evt.Collection, relevantPds) 235 - if err := ix.events.AddEvent(&events.Event{ 236 - CarSlice: evt.RepoSlice, 237 - Kind: events.EvtKindCreateRecord, 238 - PrivUid: evt.User, 239 - User: did, 240 - Collection: evt.Collection, 241 - Rkey: evt.Rkey, 242 - PrivRelevantPds: relevantPds, 243 - }); err != nil { 244 - log.Println("failed to push event: ", err) 245 - } 246 - 247 - return nil 261 + return out, nil 248 262 } 249 263 250 264 func (ix *Indexer) DidForUser(ctx context.Context, uid uint) (string, error) { ··· 316 330 return ix.notifman.AddUpVote(ctx, vr.Voter, vr.Post, vr.ID, postauthor) 317 331 } 318 332 319 - func (ix *Indexer) handleInitActor(ctx context.Context, evt *repomgr.RepoEvent) error { 320 - ai := evt.ActorInfo 333 + func (ix *Indexer) handleInitActor(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) (*events.RepoOp, error) { 334 + ai := op.ActorInfo 321 335 if err := ix.db.Create(&types.ActorInfo{ 322 336 Uid: evt.User, 323 337 Handle: ai.Handle, ··· 326 340 DeclRefCid: ai.DeclRefCid, 327 341 Type: ai.Type, 328 342 }).Error; err != nil { 329 - return err 343 + return nil, err 330 344 } 331 345 332 346 if err := ix.db.Create(&types.FollowRecord{ 333 347 Follower: evt.User, 334 348 Target: evt.User, 335 349 }).Error; err != nil { 336 - return err 350 + return nil, err 337 351 } 338 352 339 - return nil 353 + return &events.RepoOp{ 354 + Kind: string(repomgr.EvtKindInitActor), 355 + }, nil 340 356 } 341 357 342 358 func (ix *Indexer) GetPost(ctx context.Context, uri string) (*types.FeedPost, error) {
+2 -2
lex/gen.go
··· 290 290 if t.Type.defName != "" { 291 291 id = id + "#" + t.Type.defName 292 292 } 293 - fmt.Fprintf(w, "util.RegisterType(%q, %s{})\n", id, t.Name) 293 + fmt.Fprintf(w, "util.RegisterType(%q, &%s{})\n", id, t.Name) 294 294 } 295 295 } 296 296 fmt.Fprintln(w, "}") ··· 967 967 // TODO: maybe do a native type? 968 968 return "string", nil 969 969 case "unknown": 970 - return "util.LexconTypeDecoder", nil 970 + return "util.LexiconTypeDecoder", nil 971 971 case "union": 972 972 return "*" + name + "_" + strings.Title(k), nil 973 973 case "image":
+44 -7
lex/util/decoder.go
··· 5 5 "encoding/json" 6 6 "fmt" 7 7 "reflect" 8 + "strings" 8 9 9 10 cbg "github.com/whyrusleeping/cbor-gen" 10 11 ) ··· 15 16 lexTypesMap = make(map[string]reflect.Type) 16 17 } 17 18 18 - func RegisterType(id string, val any) { 19 + func RegisterType(id string, val cbg.CBORMarshaler) { 19 20 t := reflect.TypeOf(val) 20 21 21 22 if t.Kind() == reflect.Pointer { ··· 59 60 return ival, nil 60 61 } 61 62 62 - func CborDecodeValue(b []byte) (any, error) { 63 + type CBOR interface { 64 + cbg.CBORUnmarshaler 65 + cbg.CBORMarshaler 66 + } 67 + 68 + func CborDecodeValue(b []byte) (CBOR, error) { 63 69 tstr, err := CborTypeExtract(b) 64 70 if err != nil { 65 71 return nil, fmt.Errorf("cbor type extract: %w", err) ··· 72 78 73 79 val := reflect.New(t) 74 80 75 - ival, ok := val.Interface().(cbg.CBORUnmarshaler) 81 + ival, ok := val.Interface().(CBOR) 76 82 if !ok { 77 83 return nil, fmt.Errorf("registered type did not have proper cbor hooks") 78 84 } ··· 84 90 return ival, nil 85 91 } 86 92 87 - type LexconTypeDecoder struct { 88 - Val any 93 + type LexiconTypeDecoder struct { 94 + Val cbg.CBORMarshaler 89 95 } 90 96 91 - func (ltd *LexconTypeDecoder) UnmarshalJSON(b []byte) error { 97 + func (ltd *LexiconTypeDecoder) UnmarshalJSON(b []byte) error { 92 98 val, err := JsonDecodeValue(b) 93 99 if err != nil { 94 100 return err 95 101 } 96 102 97 - ltd.Val = val 103 + ltd.Val = val.(cbg.CBORMarshaler) 98 104 99 105 return nil 100 106 } 107 + 108 + func (ltd *LexiconTypeDecoder) MarshalJSON() ([]byte, error) { 109 + v := reflect.ValueOf(ltd.Val) 110 + t := v.Type() 111 + sf, ok := t.Elem().FieldByName("LexiconTypeID") 112 + if !ok { 113 + return nil, fmt.Errorf("lexicon type decoder can only handle record fields") 114 + } 115 + 116 + tag, ok := sf.Tag.Lookup("cborgen") 117 + if !ok { 118 + return nil, fmt.Errorf("lexicon type decoder can only handle record fields with const $type") 119 + } 120 + 121 + parts := strings.Split(tag, ",") 122 + 123 + var cval string 124 + for _, p := range parts { 125 + if strings.HasPrefix(p, "const=") { 126 + cval = strings.TrimPrefix(p, "const=") 127 + break 128 + } 129 + } 130 + if cval == "" { 131 + return nil, fmt.Errorf("must have const $type field") 132 + } 133 + 134 + v.Elem().FieldByName("LexiconTypeID").SetString(cval) 135 + 136 + return json.Marshal(ltd.Val) 137 + }
+5 -4
notifs/notifs.go
··· 7 7 8 8 "github.com/ipfs/go-cid" 9 9 appbskytypes "github.com/whyrusleeping/gosky/api/bsky" 10 + "github.com/whyrusleeping/gosky/lex/util" 10 11 "github.com/whyrusleeping/gosky/repomgr" 11 12 "github.com/whyrusleeping/gosky/types" 12 13 "gorm.io/gorm" ··· 158 159 rsub := "at://" + postAuthor.Did + "/app.bsky.feed.post/" + votedOn.Rkey 159 160 160 161 return &appbskytypes.NotificationList_Notification{ 161 - Record: rec, 162 + Record: util.LexiconTypeDecoder{Val: rec}, 162 163 IsRead: nrec.CreatedAt.Before(lastSeen), 163 164 IndexedAt: nrec.CreatedAt.Format(time.RFC3339), 164 165 Uri: "at://" + voter.Did + "/app.bsky.feed.vote/" + vote.Rkey, ··· 198 199 rsub := "at://" + postAuthor.Did + "/app.bsky.feed.post/" + reposted.Rkey 199 200 200 201 return &appbskytypes.NotificationList_Notification{ 201 - Record: rec, 202 + Record: util.LexiconTypeDecoder{rec}, 202 203 IsRead: nrec.CreatedAt.Before(lastSeen), 203 204 IndexedAt: nrec.CreatedAt.Format(time.RFC3339), 204 205 Uri: "at://" + reposter.Did + "/app.bsky.feed.repost/" + repost.Rkey, ··· 238 239 rsub := "at://" + opAuthor.Did + "/app.bsky.feed.post/" + replyTo.Rkey 239 240 240 241 return &appbskytypes.NotificationList_Notification{ 241 - Record: rec, 242 + Record: util.LexiconTypeDecoder{rec}, 242 243 IsRead: nrec.CreatedAt.Before(lastSeen), 243 244 IndexedAt: nrec.CreatedAt.Format(time.RFC3339), 244 245 Uri: "at://" + author.Did + "/app.bsky.feed.post/" + fp.Rkey, ··· 266 267 } 267 268 268 269 return &appbskytypes.NotificationList_Notification{ 269 - Record: rec, 270 + Record: util.LexiconTypeDecoder{rec}, 270 271 IsRead: nrec.CreatedAt.Before(lastSeen), 271 272 IndexedAt: nrec.CreatedAt.Format(time.RFC3339), 272 273 Uri: "at://" + follower.Did + "/app.bsky.graph.follow/" + frec.Rkey,
+2 -1
repo/repo.go
··· 10 10 blockstore "github.com/ipfs/go-ipfs-blockstore" 11 11 cbor "github.com/ipfs/go-ipld-cbor" 12 12 "github.com/ipld/go-car/v2" 13 + cbg "github.com/whyrusleeping/cbor-gen" 13 14 "github.com/whyrusleeping/gosky/lex/util" 14 15 "github.com/whyrusleeping/gosky/mst" 15 16 "go.opentelemetry.io/otel" ··· 277 278 return nil 278 279 } 279 280 280 - func (r *Repo) GetRecord(ctx context.Context, rpath string) (cid.Cid, any, error) { 281 + func (r *Repo) GetRecord(ctx context.Context, rpath string) (cid.Cid, cbg.CBORMarshaler, error) { 281 282 ctx, span := otel.Tracer("repo").Start(ctx, "GetRecord") 282 283 defer span.End() 283 284
+125 -86
repomgr/repomgr.go
··· 11 11 atproto "github.com/whyrusleeping/gosky/api/atproto" 12 12 apibsky "github.com/whyrusleeping/gosky/api/bsky" 13 13 "github.com/whyrusleeping/gosky/carstore" 14 + "github.com/whyrusleeping/gosky/events" 14 15 "github.com/whyrusleeping/gosky/repo" 15 16 "go.opentelemetry.io/otel" 16 17 "gorm.io/gorm" ··· 50 51 } 51 52 52 53 type RepoEvent struct { 54 + User uint 55 + OldRoot cid.Cid 56 + NewRoot cid.Cid 57 + RepoSlice []byte 58 + PDS uint 59 + Ops []RepoOp 60 + } 61 + 62 + type RepoOp struct { 53 63 Kind EventKind 54 - User uint 55 - OldRoot cid.Cid 56 - NewRoot cid.Cid 57 64 Collection string 58 65 Rkey string 59 66 RecCid cid.Cid 60 67 Record any 61 68 ActorInfo *ActorInfo 62 - RepoSlice []byte 63 - PDS uint 64 69 } 65 70 66 71 type EventKind string ··· 189 194 190 195 if rm.events != nil { 191 196 rm.events(ctx, &RepoEvent{ 192 - Kind: EvtKindCreateRecord, 193 - User: user, 194 - OldRoot: head, 195 - NewRoot: nroot, 196 - Collection: collection, 197 - Rkey: tid, 198 - Record: rec, 199 - RecCid: cc, 200 - RepoSlice: rslice, 197 + User: user, 198 + OldRoot: head, 199 + NewRoot: nroot, 200 + Ops: []RepoOp{{ 201 + Kind: EvtKindCreateRecord, 202 + Collection: collection, 203 + Rkey: tid, 204 + Record: rec, 205 + RecCid: cc, 206 + }}, 207 + RepoSlice: rslice, 201 208 }) 202 209 } 203 210 ··· 249 256 250 257 if rm.events != nil { 251 258 rm.events(ctx, &RepoEvent{ 252 - Kind: EvtKindUpdateRecord, 253 - User: user, 254 - OldRoot: head, 255 - NewRoot: nroot, 256 - Collection: collection, 257 - Rkey: rkey, 258 - Record: rec, 259 - RecCid: cc, 260 - RepoSlice: rslice, 259 + User: user, 260 + OldRoot: head, 261 + NewRoot: nroot, 262 + Ops: []RepoOp{{ 263 + Kind: EvtKindUpdateRecord, 264 + Collection: collection, 265 + Rkey: rkey, 266 + Record: rec, 267 + RecCid: cc, 268 + }}, 269 + RepoSlice: rslice, 261 270 }) 262 271 } 263 272 ··· 308 317 309 318 if rm.events != nil { 310 319 rm.events(ctx, &RepoEvent{ 311 - Kind: EvtKindDeleteRecord, 312 - User: user, 313 - OldRoot: head, 314 - NewRoot: nroot, 315 - Collection: collection, 316 - Rkey: rkey, 317 - RepoSlice: rslice, 320 + User: user, 321 + OldRoot: head, 322 + NewRoot: nroot, 323 + Ops: []RepoOp{{ 324 + Kind: EvtKindDeleteRecord, 325 + Collection: collection, 326 + Rkey: rkey, 327 + }}, 328 + RepoSlice: rslice, 318 329 }) 319 330 } 320 331 ··· 383 394 384 395 if rm.events != nil { 385 396 rm.events(ctx, &RepoEvent{ 386 - Kind: EvtKindInitActor, 387 397 User: user, 388 398 NewRoot: root, 389 - ActorInfo: &ActorInfo{ 390 - Did: did, 391 - Handle: handle, 392 - DisplayName: displayname, 393 - DeclRefCid: declcid, 394 - Type: actortype, 395 - }, 399 + Ops: []RepoOp{{ 400 + Kind: EvtKindInitActor, 401 + ActorInfo: &ActorInfo{ 402 + Did: did, 403 + Handle: handle, 404 + DisplayName: displayname, 405 + DeclRefCid: declcid, 406 + Type: actortype, 407 + }, 408 + }}, 396 409 RepoSlice: rslice, 397 410 }) 398 411 } ··· 411 424 return rm.cs.ReadUserCar(ctx, user, fromcid, true, w) 412 425 } 413 426 414 - func (rm *RepoManager) GetRecord(ctx context.Context, user uint, collection string, rkey string, maybeCid cid.Cid) (cid.Cid, any, error) { 427 + func (rm *RepoManager) GetRecord(ctx context.Context, user uint, collection string, rkey string, maybeCid cid.Cid) (cid.Cid, cbg.CBORMarshaler, error) { 415 428 bs, err := rm.cs.ReadOnlySession(user) 416 429 if err != nil { 417 430 return cid.Undef, nil, err ··· 468 481 return ap, nil 469 482 } 470 483 471 - func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, kind EventKind, uid uint, collection string, rkey string, carslice []byte) error { 484 + func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, kind EventKind, uid uint, ops []*events.RepoOp, carslice []byte) error { 472 485 root, ds, err := rm.cs.ImportSlice(ctx, uid, carslice) 473 486 if err != nil { 474 487 return fmt.Errorf("importing external carslice: %w", err) ··· 479 492 return fmt.Errorf("opening external user repo: %w", err) 480 493 } 481 494 482 - switch kind { 483 - case EvtKindCreateRecord: 484 - recid, rec, err := r.GetRecord(ctx, collection+"/"+rkey) 485 - if err != nil { 486 - return fmt.Errorf("reading changed record from car slice: %w", err) 487 - } 495 + var evtops []RepoOp 496 + for _, op := range ops { 497 + switch op.Kind { 498 + case string(EvtKindCreateRecord): 499 + recid, rec, err := r.GetRecord(ctx, op.Collection+"/"+op.Rkey) 500 + if err != nil { 501 + return fmt.Errorf("reading changed record from car slice: %w", err) 502 + } 488 503 489 - rslice, err := ds.CloseWithRoot(ctx, root) 490 - if err != nil { 491 - return fmt.Errorf("close with root: %w", err) 492 - } 493 - 494 - // TODO: what happens if this update fails? 495 - if err := rm.updateUserRepoHead(ctx, uid, root); err != nil { 496 - return fmt.Errorf("updating user head: %w", err) 497 - } 498 - 499 - if rm.events != nil { 500 - rm.events(ctx, &RepoEvent{ 501 - Kind: EvtKindCreateRecord, 502 - User: uid, 503 - //OldRoot: head, 504 - NewRoot: root, 505 - Collection: collection, 506 - Rkey: rkey, 504 + evtops = append(evtops, RepoOp{ 505 + Kind: EvtKindCreateRecord, 506 + Collection: op.Collection, 507 + Rkey: op.Rkey, 507 508 Record: rec, 508 509 RecCid: recid, 509 - RepoSlice: rslice, 510 - PDS: pdsid, 511 510 }) 511 + return nil 512 + default: 513 + return fmt.Errorf("unrecognized external user event kind: %q", kind) 512 514 } 513 - return nil 514 - default: 515 - return fmt.Errorf("unrecognized external user event kind: %q", kind) 515 + } 516 + 517 + rslice, err := ds.CloseWithRoot(ctx, root) 518 + if err != nil { 519 + return fmt.Errorf("close with root: %w", err) 520 + } 521 + 522 + // TODO: what happens if this update fails? 523 + if err := rm.updateUserRepoHead(ctx, uid, root); err != nil { 524 + return fmt.Errorf("updating user head: %w", err) 525 + } 526 + 527 + if rm.events != nil { 528 + rm.events(ctx, &RepoEvent{ 529 + User: uid, 530 + //OldRoot: head, 531 + NewRoot: root, 532 + Ops: evtops, 533 + RepoSlice: rslice, 534 + PDS: pdsid, 535 + }) 516 536 } 517 537 518 538 return nil 519 539 } 520 540 521 - func nsidForCollection(collection string) string { 522 - return collection + "/" + repo.NextTID() 541 + func rkeyForCollection(collection string) string { 542 + return repo.NextTID() 523 543 } 524 544 545 + /* 525 546 func anyRecordParse(rec any) (cbg.CBORMarshaler, error) { 526 547 // TODO: really should just have a fancy type that auto-things upon json unmarshal 527 548 rmap, ok := rec.(map[string]any) ··· 534 555 return nil, fmt.Errorf("records must have string $type field") 535 556 } 536 557 } 558 + */ 537 559 538 560 func (rm *RepoManager) BatchWrite(ctx context.Context, user uint, writes []*atproto.RepoBatchWrite_Input_Writes_Elem) error { 539 561 ctx, span := otel.Tracer("repoman").Start(ctx, "BatchWrite") ··· 557 579 return err 558 580 } 559 581 582 + var ops []RepoOp 560 583 for _, w := range writes { 561 584 switch { 562 585 case w.RepoBatchWrite_Create != nil: 563 586 c := w.RepoBatchWrite_Create 564 - var nsid string 587 + var rkey string 565 588 if c.Rkey != nil { 566 - nsid = c.Collection + "/" + *c.Rkey 589 + rkey = *c.Rkey 567 590 } else { 568 - nsid = nsidForCollection(c.Collection) 591 + rkey = rkeyForCollection(c.Collection) 569 592 } 570 593 571 - cc, rpath, err := r.CreateRecord(ctx, nsid, c.Value) 594 + nsid := c.Collection + "/" + rkey 595 + cc, err := r.PutRecord(ctx, nsid, c.Value.Val) 572 596 if err != nil { 573 597 return err 574 598 } 575 599 576 - _ = rpath 577 - _ = cc // do we do something about this? 600 + ops = append(ops, RepoOp{ 601 + Kind: EvtKindCreateRecord, 602 + Collection: c.Collection, 603 + Rkey: rkey, 604 + RecCid: cc, 605 + Record: c.Value.Val, 606 + }) 578 607 case w.RepoBatchWrite_Update != nil: 579 608 u := w.RepoBatchWrite_Update 580 609 581 - cc, err := r.PutRecord(ctx, u.Collection+"/"+u.Rkey, u.Value) 610 + cc, err := r.PutRecord(ctx, u.Collection+"/"+u.Rkey, u.Value.Val) 582 611 if err != nil { 583 612 return err 584 613 } 585 614 586 - _ = cc 615 + ops = append(ops, RepoOp{ 616 + Kind: EvtKindUpdateRecord, 617 + Collection: u.Collection, 618 + Rkey: u.Rkey, 619 + RecCid: cc, 620 + Record: u.Value.Val, 621 + }) 587 622 case w.RepoBatchWrite_Delete != nil: 588 623 d := w.RepoBatchWrite_Delete 589 624 590 625 if err := r.DeleteRecord(ctx, d.Collection+"/"+d.Rkey); err != nil { 591 626 return err 592 627 } 628 + 629 + ops = append(ops, RepoOp{ 630 + Kind: EvtKindDeleteRecord, 631 + Collection: d.Collection, 632 + Rkey: d.Rkey, 633 + }) 593 634 default: 594 635 return fmt.Errorf("no operation set in write enum") 595 636 } ··· 612 653 613 654 if rm.events != nil { 614 655 rm.events(ctx, &RepoEvent{ 615 - Kind: EvtKindDeleteRecord, 616 - User: user, 617 - OldRoot: head, 618 - NewRoot: nroot, 619 - Collection: collection, 620 - Rkey: rkey, 621 - RepoSlice: rslice, 656 + User: user, 657 + OldRoot: head, 658 + NewRoot: nroot, 659 + RepoSlice: rslice, 660 + Ops: ops, 622 661 }) 623 662 } 624 663
+9 -6
server/federation_test.go
··· 19 19 atproto "github.com/whyrusleeping/gosky/api/atproto" 20 20 bsky "github.com/whyrusleeping/gosky/api/bsky" 21 21 "github.com/whyrusleeping/gosky/carstore" 22 + "github.com/whyrusleeping/gosky/lex/util" 22 23 "github.com/whyrusleeping/gosky/plc" 23 24 "github.com/whyrusleeping/gosky/xrpc" 24 25 "gorm.io/driver/sqlite" ··· 183 184 resp, err := atproto.RepoCreateRecord(ctx, u.client, &atproto.RepoCreateRecord_Input{ 184 185 Collection: "app.bsky.feed.post", 185 186 Did: u.did, 186 - Record: &bsky.FeedPost{ 187 + Record: util.LexiconTypeDecoder{&bsky.FeedPost{ 187 188 CreatedAt: time.Now().Format(time.RFC3339), 188 189 Text: body, 189 190 Reply: &bsky.FeedPost_ReplyRef{ ··· 191 192 Cid: pcid, 192 193 Uri: post, 193 194 }, 194 - }, 195 + }}, 195 196 }, 196 197 }) 197 198 ··· 208 209 resp, err := atproto.RepoCreateRecord(ctx, u.client, &atproto.RepoCreateRecord_Input{ 209 210 Collection: "app.bsky.feed.post", 210 211 Did: u.did, 211 - Record: &bsky.FeedPost{ 212 + Record: util.LexiconTypeDecoder{&bsky.FeedPost{ 212 213 CreatedAt: time.Now().Format(time.RFC3339), 213 214 Text: body, 214 - }, 215 + }}, 215 216 }) 216 217 217 218 if err != nil { ··· 231 232 resp, err := atproto.RepoCreateRecord(ctx, u.client, &atproto.RepoCreateRecord_Input{ 232 233 Collection: "app.bsky.graph.follow", 233 234 Did: u.did, 234 - Record: &bsky.GraphFollow{ 235 + Record: util.LexiconTypeDecoder{&bsky.GraphFollow{ 235 236 CreatedAt: time.Now().Format(time.RFC3339), 236 237 Subject: &bsky.ActorRef{ 237 238 DeclarationCid: "bafyreid27zk7lbis4zw5fz4podbvbs4fc5ivwji3dmrwa6zggnj4bnd57u", 238 239 Did: did, 239 240 }, 240 - }, 241 + }}, 241 242 }) 242 243 243 244 if err != nil { ··· 307 308 lp1 := laura.Post(t, "hello bob") 308 309 time.Sleep(time.Millisecond * 50) 309 310 311 + select {} 310 312 f := bob.GetFeed(t) 311 313 assert.Equal(f[0].Post.Uri, bp1.Uri) 312 314 assert.Equal(f[1].Post.Uri, lp1.Uri) ··· 324 326 if len(lnot) != 1 { 325 327 t.Fatal("wrong number of notifications") 326 328 } 329 + 327 330 }
+1 -1
server/fedmgr.go
··· 77 77 return fmt.Errorf("failed to unmarshal event: %w", err) 78 78 } 79 79 80 - fmt.Println("got event: ", host.Host, ev.Kind, ev.Collection) 80 + fmt.Println("got event: ", host.Host, ev.Kind) 81 81 if err := s.handleFedEvent(context.TODO(), host, &ev); err != nil { 82 82 log.Printf("failed to index event from %q: %s", host.Host, err) 83 83 }
+4 -3
server/feedgen.go
··· 11 11 "github.com/ipfs/go-cid" 12 12 bsky "github.com/whyrusleeping/gosky/api/bsky" 13 13 "github.com/whyrusleeping/gosky/indexer" 14 + "github.com/whyrusleeping/gosky/lex/util" 14 15 "github.com/whyrusleeping/gosky/types" 15 16 "go.opentelemetry.io/otel" 16 17 "gorm.io/gorm" ··· 31 32 }, nil 32 33 } 33 34 34 - type ReadRecordFunc func(context.Context, uint, cid.Cid) (any, error) 35 + type ReadRecordFunc func(context.Context, uint, cid.Cid) (util.CBOR, error) 35 36 36 37 /* 37 38 type HydratedFeedItem struct { ··· 148 149 return nil, err 149 150 } 150 151 151 - out.Post.Record = rec 152 + out.Post.Record = util.LexiconTypeDecoder{rec} 152 153 153 154 return out, nil 154 155 } ··· 311 312 return nil, err 312 313 } 313 314 314 - p, ok := hi.Post.Record.(*bsky.FeedPost) 315 + p, ok := hi.Post.Record.Val.(*bsky.FeedPost) 315 316 if !ok { 316 317 return nil, fmt.Errorf("getPostThread can only operate on app.bsky.feed.post records") 317 318 }
+6 -27
server/handlers.go
··· 10 10 11 11 "github.com/ipfs/go-cid" 12 12 jwt "github.com/lestrrat-go/jwx/jwt" 13 - cbg "github.com/whyrusleeping/cbor-gen" 14 13 comatprototypes "github.com/whyrusleeping/gosky/api/atproto" 15 14 appbskytypes "github.com/whyrusleeping/gosky/api/bsky" 15 + "github.com/whyrusleeping/gosky/lex/util" 16 16 ) 17 17 18 18 func (s *Server) handleAppBskyActorCreateScene(ctx context.Context, input *appbskytypes.ActorCreateScene_Input) (*appbskytypes.ActorCreateScene_Output, error) { ··· 107 107 return &appbskytypes.ActorUpdateProfile_Output{ 108 108 Cid: ncid.String(), 109 109 Uri: "at://" + u.Did + "/app.bsky.actor.profile/self", 110 - Record: profile, 110 + Record: util.LexiconTypeDecoder{profile}, 111 111 }, nil 112 112 } 113 113 ··· 502 502 func (s *Server) handleComAtprotoRepoCreateRecord(ctx context.Context, input *comatprototypes.RepoCreateRecord_Input) (*comatprototypes.RepoCreateRecord_Output, error) { 503 503 u, err := s.getUser(ctx) 504 504 if err != nil { 505 - return nil, err 505 + return nil, fmt.Errorf("get user: %w", err) 506 506 } 507 507 508 - var rec cbg.CBORMarshaler 509 - switch input.Collection { 510 - case "app.bsky.feed.post": 511 - rec = new(appbskytypes.FeedPost) 512 - case "app.bsky.graph.follow": 513 - rec = new(appbskytypes.GraphFollow) 514 - case "app.bsky.feed.repost": 515 - rec = new(appbskytypes.FeedRepost) 516 - default: 517 - return nil, fmt.Errorf("unsupported collection: %q", input.Collection) 518 - } 519 - 520 - // TODO: if we had a 'record' type receiver declaration in lexicon i could 521 - // codegen in a special handler for things that are supposed to be records 522 - // like this 523 - if err := convertRecordTo(input.Record, rec); err != nil { 524 - return nil, err 525 - } 526 - 527 - rpath, recid, err := s.repoman.CreateRecord(ctx, u.ID, input.Collection, rec) 508 + rpath, recid, err := s.repoman.CreateRecord(ctx, u.ID, input.Collection, input.Record.Val) 528 509 if err != nil { 529 - return nil, err 510 + return nil, fmt.Errorf("record create: %w", err) 530 511 } 531 512 532 513 return &comatprototypes.RepoCreateRecord_Output{ ··· 558 539 return nil, err 559 540 } 560 541 561 - fmt.Println("USER: ", user, targetUser.Handle, targetUser.Did) 562 - 563 542 var maybeCid cid.Cid 564 543 if c != "" { 565 544 cc, err := cid.Decode(c) ··· 578 557 return &comatprototypes.RepoGetRecord_Output{ 579 558 Cid: &ccstr, 580 559 Uri: "at://" + targetUser.Did + "/" + collection + "/" + rkey, 581 - Value: rec, 560 + Value: util.LexiconTypeDecoder{rec}, 582 561 }, nil 583 562 } 584 563
+32 -24
server/server.go
··· 126 126 func (s *Server) handleFedEvent(ctx context.Context, host *Peering, evt *events.Event) error { 127 127 fmt.Printf("[%s] got fed event from %q: %s\n", s.serviceUrl, host.Host, evt.Kind) 128 128 switch evt.Kind { 129 - case events.EvtKindCreateRecord: 129 + case events.EvtKindRepoChange: 130 130 u, err := s.lookupUserByDid(ctx, evt.User) 131 131 if err != nil { 132 132 if !errors.Is(err, gorm.ErrRecordNotFound) { ··· 142 142 u.ID = subj.Uid 143 143 } 144 144 145 - return s.repoman.HandleExternalUserEvent(ctx, host.ID, repomgr.EvtKindCreateRecord, u.ID, evt.Collection, evt.Rkey, evt.CarSlice) 146 - case events.EvtKindUpdateRecord: 145 + return s.repoman.HandleExternalUserEvent(ctx, host.ID, repomgr.EvtKindCreateRecord, u.ID, evt.RepoOps, evt.CarSlice) 147 146 default: 148 147 return fmt.Errorf("unrecognized fed event kind: %q", evt.Kind) 149 148 } ··· 222 221 } 223 222 224 223 func (s *Server) repoEventToFedEvent(ctx context.Context, evt *repomgr.RepoEvent) (*events.Event, error) { 225 - out := &events.Event{ 226 - CarSlice: evt.RepoSlice, 227 - } 228 - 229 - switch evt.Kind { 230 - case repomgr.EvtKindCreateRecord: 231 - out.Kind = events.EvtKindCreateRecord 232 - case repomgr.EvtKindUpdateRecord: 233 - out.Kind = events.EvtKindUpdateRecord 234 - case repomgr.EvtKindInitActor: 235 - return nil, nil 236 - default: 237 - return nil, fmt.Errorf("unrecognized repo event kind: %q", evt.Kind) 238 - } 239 - 240 224 did, err := s.indexer.DidForUser(ctx, evt.User) 241 225 if err != nil { 242 226 return nil, err 243 227 } 244 228 245 - out.PrivUid = evt.User 246 - out.User = did 247 - out.Collection = evt.Collection 248 - out.Rkey = evt.Rkey 229 + out := &events.Event{ 230 + Kind: events.EvtKindRepoChange, 231 + CarSlice: evt.RepoSlice, 232 + PrivUid: evt.User, 233 + User: did, 234 + } 235 + 236 + for _, op := range evt.Ops { 237 + switch op.Kind { 238 + case repomgr.EvtKindCreateRecord: 239 + out.RepoOps = append(out.RepoOps, &events.RepoOp{ 240 + Kind: events.EvtKindRepoChange, 241 + Collection: op.Collection, 242 + Rkey: op.Rkey, 243 + }) 244 + case repomgr.EvtKindUpdateRecord: 245 + out.RepoOps = append(out.RepoOps, &events.RepoOp{ 246 + Kind: events.EvtKindRepoChange, 247 + Collection: op.Collection, 248 + Rkey: op.Rkey, 249 + }) 250 + case repomgr.EvtKindInitActor: 251 + return nil, nil 252 + default: 253 + return nil, fmt.Errorf("unrecognized repo event kind: %q", op.Kind) 254 + } 255 + } 249 256 250 257 return out, nil 251 258 } 252 259 253 - func (s *Server) readRecordFunc(ctx context.Context, user uint, c cid.Cid) (any, error) { 260 + func (s *Server) readRecordFunc(ctx context.Context, user uint, c cid.Cid) (util.CBOR, error) { 254 261 bs, err := s.cs.ReadOnlySession(user) 255 262 if err != nil { 256 263 return nil, err ··· 326 333 } 327 334 328 335 e.HTTPErrorHandler = func(err error, ctx echo.Context) { 329 - fmt.Println("HANDLER ERROR: ", err) 336 + fmt.Printf("HANDLER ERROR: (%s) %s\n", ctx.Path(), err) 337 + ctx.Response().WriteHeader(500) 330 338 } 331 339 332 340 e.Use(middleware.JWTWithConfig(cfg), s.userCheckMiddleware)