this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add sync event (#983)

Goal with this is a very small/minimal PR which won't conflict with
either Relay v1.1 branch, or
https://github.com/bluesky-social/indigo/pull/967

This doesn't deprecate/remove anything, just adds cborgen for existing
lexicon types, and callbacks for event handlers.

authored by

bnewbold and committed by
GitHub
388bbc90 ce3c9547

+306
+271
api/atproto/cbor_gen.go
··· 935 935 936 936 return nil 937 937 } 938 + func (t *SyncSubscribeRepos_Sync) MarshalCBOR(w io.Writer) error { 939 + if t == nil { 940 + _, err := w.Write(cbg.CborNull) 941 + return err 942 + } 943 + 944 + cw := cbg.NewCborWriter(w) 945 + fieldCount := 5 946 + 947 + if t.Blocks == nil { 948 + fieldCount-- 949 + } 950 + 951 + if _, err := cw.Write(cbg.CborEncodeMajorType(cbg.MajMap, uint64(fieldCount))); err != nil { 952 + return err 953 + } 954 + 955 + // t.Did (string) (string) 956 + if len("did") > 1000000 { 957 + return xerrors.Errorf("Value in field \"did\" was too long") 958 + } 959 + 960 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("did"))); err != nil { 961 + return err 962 + } 963 + if _, err := cw.WriteString(string("did")); err != nil { 964 + return err 965 + } 966 + 967 + if len(t.Did) > 1000000 { 968 + return xerrors.Errorf("Value in field t.Did was too long") 969 + } 970 + 971 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Did))); err != nil { 972 + return err 973 + } 974 + if _, err := cw.WriteString(string(t.Did)); err != nil { 975 + return err 976 + } 977 + 978 + // t.Rev (string) (string) 979 + if len("rev") > 1000000 { 980 + return xerrors.Errorf("Value in field \"rev\" was too long") 981 + } 982 + 983 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("rev"))); err != nil { 984 + return err 985 + } 986 + if _, err := cw.WriteString(string("rev")); err != nil { 987 + return err 988 + } 989 + 990 + if len(t.Rev) > 1000000 { 991 + return xerrors.Errorf("Value in field t.Rev was too long") 992 + } 993 + 994 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Rev))); err != nil { 995 + return err 996 + } 997 + if _, err := cw.WriteString(string(t.Rev)); err != nil { 998 + return err 999 + } 1000 + 1001 + // t.Seq (int64) (int64) 1002 + if len("seq") > 1000000 { 1003 + return xerrors.Errorf("Value in field \"seq\" was too long") 1004 + } 1005 + 1006 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil { 1007 + return err 1008 + } 1009 + if _, err := cw.WriteString(string("seq")); err != nil { 1010 + return err 1011 + } 1012 + 1013 + if t.Seq >= 0 { 1014 + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil { 1015 + return err 1016 + } 1017 + } else { 1018 + if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil { 1019 + return err 1020 + } 1021 + } 1022 + 1023 + // t.Time (string) (string) 1024 + if len("time") > 1000000 { 1025 + return xerrors.Errorf("Value in field \"time\" was too long") 1026 + } 1027 + 1028 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("time"))); err != nil { 1029 + return err 1030 + } 1031 + if _, err := cw.WriteString(string("time")); err != nil { 1032 + return err 1033 + } 1034 + 1035 + if len(t.Time) > 1000000 { 1036 + return xerrors.Errorf("Value in field t.Time was too long") 1037 + } 1038 + 1039 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Time))); err != nil { 1040 + return err 1041 + } 1042 + if _, err := cw.WriteString(string(t.Time)); err != nil { 1043 + return err 1044 + } 1045 + 1046 + // t.Blocks (util.LexBytes) (slice) 1047 + if t.Blocks != nil { 1048 + 1049 + if len("blocks") > 1000000 { 1050 + return xerrors.Errorf("Value in field \"blocks\" was too long") 1051 + } 1052 + 1053 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("blocks"))); err != nil { 1054 + return err 1055 + } 1056 + if _, err := cw.WriteString(string("blocks")); err != nil { 1057 + return err 1058 + } 1059 + 1060 + if len(t.Blocks) > 2097152 { 1061 + return xerrors.Errorf("Byte array in field t.Blocks was too long") 1062 + } 1063 + 1064 + if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(t.Blocks))); err != nil { 1065 + return err 1066 + } 1067 + 1068 + if _, err := cw.Write(t.Blocks); err != nil { 1069 + return err 1070 + } 1071 + 1072 + } 1073 + return nil 1074 + } 1075 + 1076 + func (t *SyncSubscribeRepos_Sync) UnmarshalCBOR(r io.Reader) (err error) { 1077 + *t = SyncSubscribeRepos_Sync{} 1078 + 1079 + cr := cbg.NewCborReader(r) 1080 + 1081 + maj, extra, err := cr.ReadHeader() 1082 + if err != nil { 1083 + return err 1084 + } 1085 + defer func() { 1086 + if err == io.EOF { 1087 + err = io.ErrUnexpectedEOF 1088 + } 1089 + }() 1090 + 1091 + if maj != cbg.MajMap { 1092 + return fmt.Errorf("cbor input should be of type map") 1093 + } 1094 + 1095 + if extra > cbg.MaxLength { 1096 + return fmt.Errorf("SyncSubscribeRepos_Sync: map struct too large (%d)", extra) 1097 + } 1098 + 1099 + n := extra 1100 + 1101 + nameBuf := make([]byte, 6) 1102 + for i := uint64(0); i < n; i++ { 1103 + nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 1104 + if err != nil { 1105 + return err 1106 + } 1107 + 1108 + if !ok { 1109 + // Field doesn't exist on this type, so ignore it 1110 + if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil { 1111 + return err 1112 + } 1113 + continue 1114 + } 1115 + 1116 + switch string(nameBuf[:nameLen]) { 1117 + // t.Did (string) (string) 1118 + case "did": 1119 + 1120 + { 1121 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 1122 + if err != nil { 1123 + return err 1124 + } 1125 + 1126 + t.Did = string(sval) 1127 + } 1128 + // t.Rev (string) (string) 1129 + case "rev": 1130 + 1131 + { 1132 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 1133 + if err != nil { 1134 + return err 1135 + } 1136 + 1137 + t.Rev = string(sval) 1138 + } 1139 + // t.Seq (int64) (int64) 1140 + case "seq": 1141 + { 1142 + maj, extra, err := cr.ReadHeader() 1143 + if err != nil { 1144 + return err 1145 + } 1146 + var extraI int64 1147 + switch maj { 1148 + case cbg.MajUnsignedInt: 1149 + extraI = int64(extra) 1150 + if extraI < 0 { 1151 + return fmt.Errorf("int64 positive overflow") 1152 + } 1153 + case cbg.MajNegativeInt: 1154 + extraI = int64(extra) 1155 + if extraI < 0 { 1156 + return fmt.Errorf("int64 negative overflow") 1157 + } 1158 + extraI = -1 - extraI 1159 + default: 1160 + return fmt.Errorf("wrong type for int64 field: %d", maj) 1161 + } 1162 + 1163 + t.Seq = int64(extraI) 1164 + } 1165 + // t.Time (string) (string) 1166 + case "time": 1167 + 1168 + { 1169 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 1170 + if err != nil { 1171 + return err 1172 + } 1173 + 1174 + t.Time = string(sval) 1175 + } 1176 + // t.Blocks (util.LexBytes) (slice) 1177 + case "blocks": 1178 + 1179 + maj, extra, err = cr.ReadHeader() 1180 + if err != nil { 1181 + return err 1182 + } 1183 + 1184 + if extra > 2097152 { 1185 + return fmt.Errorf("t.Blocks: byte array too large (%d)", extra) 1186 + } 1187 + if maj != cbg.MajByteString { 1188 + return fmt.Errorf("expected byte array") 1189 + } 1190 + 1191 + if extra > 0 { 1192 + t.Blocks = make([]uint8, extra) 1193 + } 1194 + 1195 + if _, err := io.ReadFull(cr, t.Blocks); err != nil { 1196 + return err 1197 + } 1198 + 1199 + default: 1200 + // Field doesn't exist on this type, so ignore it 1201 + if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil { 1202 + return err 1203 + } 1204 + } 1205 + } 1206 + 1207 + return nil 1208 + } 938 1209 func (t *SyncSubscribeRepos_Handle) MarshalCBOR(w io.Writer) error { 939 1210 if t == nil { 940 1211 _, err := w.Write(cbg.CborNull)
+20
events/consumer.go
··· 17 17 18 18 type RepoStreamCallbacks struct { 19 19 RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error 20 + RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error 20 21 RepoHandle func(evt *comatproto.SyncSubscribeRepos_Handle) error 21 22 RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error 22 23 RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error ··· 32 33 switch { 33 34 case xev.RepoCommit != nil && rsc.RepoCommit != nil: 34 35 return rsc.RepoCommit(xev.RepoCommit) 36 + case xev.RepoSync != nil && rsc.RepoCommit != nil: 37 + return rsc.RepoSync(xev.RepoSync) 35 38 case xev.RepoHandle != nil && rsc.RepoHandle != nil: 36 39 return rsc.RepoHandle(xev.RepoHandle) 37 40 case xev.RepoInfo != nil && rsc.RepoInfo != nil: ··· 209 212 210 213 if err := sched.AddWork(ctx, evt.Repo, &XRPCStreamEvent{ 211 214 RepoCommit: &evt, 215 + }); err != nil { 216 + return err 217 + } 218 + case "#sync": 219 + var evt comatproto.SyncSubscribeRepos_Sync 220 + if err := evt.UnmarshalCBOR(r); err != nil { 221 + return fmt.Errorf("reading repoSync event: %w", err) 222 + } 223 + 224 + if evt.Seq < lastSeq { 225 + log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq) 226 + } 227 + 228 + lastSeq = evt.Seq 229 + 230 + if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{ 231 + RepoSync: &evt, 212 232 }); err != nil { 213 233 return err 214 234 }
+14
events/events.go
··· 166 166 type XRPCStreamEvent struct { 167 167 Error *ErrorFrame 168 168 RepoCommit *comatproto.SyncSubscribeRepos_Commit 169 + RepoSync *comatproto.SyncSubscribeRepos_Sync 169 170 RepoHandle *comatproto.SyncSubscribeRepos_Handle 170 171 RepoIdentity *comatproto.SyncSubscribeRepos_Identity 171 172 RepoInfo *comatproto.SyncSubscribeRepos_Info ··· 193 194 case evt.RepoCommit != nil: 194 195 header.MsgType = "#commit" 195 196 obj = evt.RepoCommit 197 + case evt.RepoSync != nil: 198 + header.MsgType = "#sync" 199 + obj = evt.RepoSync 196 200 case evt.RepoHandle != nil: 197 201 header.MsgType = "#handle" 198 202 obj = evt.RepoHandle ··· 236 240 return fmt.Errorf("reading repoCommit event: %w", err) 237 241 } 238 242 xevt.RepoCommit = &evt 243 + case "#sync": 244 + var evt comatproto.SyncSubscribeRepos_Sync 245 + if err := evt.UnmarshalCBOR(r); err != nil { 246 + return fmt.Errorf("reading repoSync event: %w", err) 247 + } 248 + xevt.RepoSync = &evt 239 249 case "#handle": 240 250 var evt comatproto.SyncSubscribeRepos_Handle 241 251 if err := evt.UnmarshalCBOR(r); err != nil { ··· 436 446 return -1 437 447 case evt.RepoCommit != nil: 438 448 return evt.RepoCommit.Seq 449 + case evt.RepoSync != nil: 450 + return evt.RepoSync.Seq 439 451 case evt.RepoHandle != nil: 440 452 return evt.RepoHandle.Seq 441 453 case evt.RepoMigrate != nil: ··· 461 473 return -1, false 462 474 case evt.RepoCommit != nil: 463 475 return evt.RepoCommit.Seq, true 476 + case evt.RepoSync != nil: 477 + return evt.RepoSync.Seq, true 464 478 case evt.RepoHandle != nil: 465 479 return evt.RepoHandle.Seq, true 466 480 case evt.RepoMigrate != nil:
+1
gen/main.go
··· 95 95 atproto.LexiconSchema{}, 96 96 atproto.RepoStrongRef{}, 97 97 atproto.SyncSubscribeRepos_Commit{}, 98 + atproto.SyncSubscribeRepos_Sync{}, 98 99 atproto.SyncSubscribeRepos_Handle{}, 99 100 atproto.SyncSubscribeRepos_Identity{}, 100 101 atproto.SyncSubscribeRepos_Account{},