this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

update codegen (#1082)

authored by

bnewbold and committed by
GitHub
0b69efcd 1c879a77

+44 -1422
-627
api/atproto/cbor_gen.go
··· 1206 1206 1207 1207 return nil 1208 1208 } 1209 - func (t *SyncSubscribeRepos_Handle) MarshalCBOR(w io.Writer) error { 1210 - if t == nil { 1211 - _, err := w.Write(cbg.CborNull) 1212 - return err 1213 - } 1214 1209 1215 - cw := cbg.NewCborWriter(w) 1216 - 1217 - if _, err := cw.Write([]byte{164}); err != nil { 1218 - return err 1219 - } 1220 - 1221 - // t.Did (string) (string) 1222 - if len("did") > 1000000 { 1223 - return xerrors.Errorf("Value in field \"did\" was too long") 1224 - } 1225 - 1226 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("did"))); err != nil { 1227 - return err 1228 - } 1229 - if _, err := cw.WriteString(string("did")); err != nil { 1230 - return err 1231 - } 1232 - 1233 - if len(t.Did) > 1000000 { 1234 - return xerrors.Errorf("Value in field t.Did was too long") 1235 - } 1236 - 1237 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Did))); err != nil { 1238 - return err 1239 - } 1240 - if _, err := cw.WriteString(string(t.Did)); err != nil { 1241 - return err 1242 - } 1243 - 1244 - // t.Seq (int64) (int64) 1245 - if len("seq") > 1000000 { 1246 - return xerrors.Errorf("Value in field \"seq\" was too long") 1247 - } 1248 - 1249 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil { 1250 - return err 1251 - } 1252 - if _, err := cw.WriteString(string("seq")); err != nil { 1253 - return err 1254 - } 1255 - 1256 - if t.Seq >= 0 { 1257 - if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil { 1258 - return err 1259 - } 1260 - } else { 1261 - if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil { 1262 - return err 1263 - } 1264 - } 1265 - 1266 - // t.Time (string) (string) 1267 - if len("time") > 1000000 { 1268 - return xerrors.Errorf("Value in field \"time\" was too long") 1269 - } 1270 - 1271 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("time"))); err != nil { 1272 - return err 1273 - } 1274 - if _, err := cw.WriteString(string("time")); err != nil { 1275 - return err 1276 - } 1277 - 1278 - if len(t.Time) > 1000000 { 1279 - return xerrors.Errorf("Value in field t.Time was too long") 1280 - } 1281 - 1282 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Time))); err != nil { 1283 - return err 1284 - } 1285 - if _, err := cw.WriteString(string(t.Time)); err != nil { 1286 - return err 1287 - } 1288 - 1289 - // t.Handle (string) (string) 1290 - if len("handle") > 1000000 { 1291 - return xerrors.Errorf("Value in field \"handle\" was too long") 1292 - } 1293 - 1294 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("handle"))); err != nil { 1295 - return err 1296 - } 1297 - if _, err := cw.WriteString(string("handle")); err != nil { 1298 - return err 1299 - } 1300 - 1301 - if len(t.Handle) > 1000000 { 1302 - return xerrors.Errorf("Value in field t.Handle was too long") 1303 - } 1304 - 1305 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Handle))); err != nil { 1306 - return err 1307 - } 1308 - if _, err := cw.WriteString(string(t.Handle)); err != nil { 1309 - return err 1310 - } 1311 - return nil 1312 - } 1313 - 1314 - func (t *SyncSubscribeRepos_Handle) UnmarshalCBOR(r io.Reader) (err error) { 1315 - *t = SyncSubscribeRepos_Handle{} 1316 - 1317 - cr := cbg.NewCborReader(r) 1318 - 1319 - maj, extra, err := cr.ReadHeader() 1320 - if err != nil { 1321 - return err 1322 - } 1323 - defer func() { 1324 - if err == io.EOF { 1325 - err = io.ErrUnexpectedEOF 1326 - } 1327 - }() 1328 - 1329 - if maj != cbg.MajMap { 1330 - return fmt.Errorf("cbor input should be of type map") 1331 - } 1332 - 1333 - if extra > cbg.MaxLength { 1334 - return fmt.Errorf("SyncSubscribeRepos_Handle: map struct too large (%d)", extra) 1335 - } 1336 - 1337 - n := extra 1338 - 1339 - nameBuf := make([]byte, 6) 1340 - for i := uint64(0); i < n; i++ { 1341 - nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 1342 - if err != nil { 1343 - return err 1344 - } 1345 - 1346 - if !ok { 1347 - // Field doesn't exist on this type, so ignore it 1348 - if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil { 1349 - return err 1350 - } 1351 - continue 1352 - } 1353 - 1354 - switch string(nameBuf[:nameLen]) { 1355 - // t.Did (string) (string) 1356 - case "did": 1357 - 1358 - { 1359 - sval, err := cbg.ReadStringWithMax(cr, 1000000) 1360 - if err != nil { 1361 - return err 1362 - } 1363 - 1364 - t.Did = string(sval) 1365 - } 1366 - // t.Seq (int64) (int64) 1367 - case "seq": 1368 - { 1369 - maj, extra, err := cr.ReadHeader() 1370 - if err != nil { 1371 - return err 1372 - } 1373 - var extraI int64 1374 - switch maj { 1375 - case cbg.MajUnsignedInt: 1376 - extraI = int64(extra) 1377 - if extraI < 0 { 1378 - return fmt.Errorf("int64 positive overflow") 1379 - } 1380 - case cbg.MajNegativeInt: 1381 - extraI = int64(extra) 1382 - if extraI < 0 { 1383 - return fmt.Errorf("int64 negative overflow") 1384 - } 1385 - extraI = -1 - extraI 1386 - default: 1387 - return fmt.Errorf("wrong type for int64 field: %d", maj) 1388 - } 1389 - 1390 - t.Seq = int64(extraI) 1391 - } 1392 - // t.Time (string) (string) 1393 - case "time": 1394 - 1395 - { 1396 - sval, err := cbg.ReadStringWithMax(cr, 1000000) 1397 - if err != nil { 1398 - return err 1399 - } 1400 - 1401 - t.Time = string(sval) 1402 - } 1403 - // t.Handle (string) (string) 1404 - case "handle": 1405 - 1406 - { 1407 - sval, err := cbg.ReadStringWithMax(cr, 1000000) 1408 - if err != nil { 1409 - return err 1410 - } 1411 - 1412 - t.Handle = string(sval) 1413 - } 1414 - 1415 - default: 1416 - // Field doesn't exist on this type, so ignore it 1417 - if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil { 1418 - return err 1419 - } 1420 - } 1421 - } 1422 - 1423 - return nil 1424 - } 1425 1210 func (t *SyncSubscribeRepos_Identity) MarshalCBOR(w io.Writer) error { 1426 1211 if t == nil { 1427 1212 _, err := w.Write(cbg.CborNull) ··· 2094 1879 2095 1880 return nil 2096 1881 } 2097 - func (t *SyncSubscribeRepos_Migrate) MarshalCBOR(w io.Writer) error { 2098 - if t == nil { 2099 - _, err := w.Write(cbg.CborNull) 2100 - return err 2101 - } 2102 1882 2103 - cw := cbg.NewCborWriter(w) 2104 - 2105 - if _, err := cw.Write([]byte{164}); err != nil { 2106 - return err 2107 - } 2108 - 2109 - // t.Did (string) (string) 2110 - if len("did") > 1000000 { 2111 - return xerrors.Errorf("Value in field \"did\" was too long") 2112 - } 2113 - 2114 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("did"))); err != nil { 2115 - return err 2116 - } 2117 - if _, err := cw.WriteString(string("did")); err != nil { 2118 - return err 2119 - } 2120 - 2121 - if len(t.Did) > 1000000 { 2122 - return xerrors.Errorf("Value in field t.Did was too long") 2123 - } 2124 - 2125 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Did))); err != nil { 2126 - return err 2127 - } 2128 - if _, err := cw.WriteString(string(t.Did)); err != nil { 2129 - return err 2130 - } 2131 - 2132 - // t.Seq (int64) (int64) 2133 - if len("seq") > 1000000 { 2134 - return xerrors.Errorf("Value in field \"seq\" was too long") 2135 - } 2136 - 2137 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil { 2138 - return err 2139 - } 2140 - if _, err := cw.WriteString(string("seq")); err != nil { 2141 - return err 2142 - } 2143 - 2144 - if t.Seq >= 0 { 2145 - if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil { 2146 - return err 2147 - } 2148 - } else { 2149 - if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil { 2150 - return err 2151 - } 2152 - } 2153 - 2154 - // t.Time (string) (string) 2155 - if len("time") > 1000000 { 2156 - return xerrors.Errorf("Value in field \"time\" was too long") 2157 - } 2158 - 2159 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("time"))); err != nil { 2160 - return err 2161 - } 2162 - if _, err := cw.WriteString(string("time")); err != nil { 2163 - return err 2164 - } 2165 - 2166 - if len(t.Time) > 1000000 { 2167 - return xerrors.Errorf("Value in field t.Time was too long") 2168 - } 2169 - 2170 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Time))); err != nil { 2171 - return err 2172 - } 2173 - if _, err := cw.WriteString(string(t.Time)); err != nil { 2174 - return err 2175 - } 2176 - 2177 - // t.MigrateTo (string) (string) 2178 - if len("migrateTo") > 1000000 { 2179 - return xerrors.Errorf("Value in field \"migrateTo\" was too long") 2180 - } 2181 - 2182 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("migrateTo"))); err != nil { 2183 - return err 2184 - } 2185 - if _, err := cw.WriteString(string("migrateTo")); err != nil { 2186 - return err 2187 - } 2188 - 2189 - if t.MigrateTo == nil { 2190 - if _, err := cw.Write(cbg.CborNull); err != nil { 2191 - return err 2192 - } 2193 - } else { 2194 - if len(*t.MigrateTo) > 1000000 { 2195 - return xerrors.Errorf("Value in field t.MigrateTo was too long") 2196 - } 2197 - 2198 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(*t.MigrateTo))); err != nil { 2199 - return err 2200 - } 2201 - if _, err := cw.WriteString(string(*t.MigrateTo)); err != nil { 2202 - return err 2203 - } 2204 - } 2205 - return nil 2206 - } 2207 - 2208 - func (t *SyncSubscribeRepos_Migrate) UnmarshalCBOR(r io.Reader) (err error) { 2209 - *t = SyncSubscribeRepos_Migrate{} 2210 - 2211 - cr := cbg.NewCborReader(r) 2212 - 2213 - maj, extra, err := cr.ReadHeader() 2214 - if err != nil { 2215 - return err 2216 - } 2217 - defer func() { 2218 - if err == io.EOF { 2219 - err = io.ErrUnexpectedEOF 2220 - } 2221 - }() 2222 - 2223 - if maj != cbg.MajMap { 2224 - return fmt.Errorf("cbor input should be of type map") 2225 - } 2226 - 2227 - if extra > cbg.MaxLength { 2228 - return fmt.Errorf("SyncSubscribeRepos_Migrate: map struct too large (%d)", extra) 2229 - } 2230 - 2231 - n := extra 2232 - 2233 - nameBuf := make([]byte, 9) 2234 - for i := uint64(0); i < n; i++ { 2235 - nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 2236 - if err != nil { 2237 - return err 2238 - } 2239 - 2240 - if !ok { 2241 - // Field doesn't exist on this type, so ignore it 2242 - if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil { 2243 - return err 2244 - } 2245 - continue 2246 - } 2247 - 2248 - switch string(nameBuf[:nameLen]) { 2249 - // t.Did (string) (string) 2250 - case "did": 2251 - 2252 - { 2253 - sval, err := cbg.ReadStringWithMax(cr, 1000000) 2254 - if err != nil { 2255 - return err 2256 - } 2257 - 2258 - t.Did = string(sval) 2259 - } 2260 - // t.Seq (int64) (int64) 2261 - case "seq": 2262 - { 2263 - maj, extra, err := cr.ReadHeader() 2264 - if err != nil { 2265 - return err 2266 - } 2267 - var extraI int64 2268 - switch maj { 2269 - case cbg.MajUnsignedInt: 2270 - extraI = int64(extra) 2271 - if extraI < 0 { 2272 - return fmt.Errorf("int64 positive overflow") 2273 - } 2274 - case cbg.MajNegativeInt: 2275 - extraI = int64(extra) 2276 - if extraI < 0 { 2277 - return fmt.Errorf("int64 negative overflow") 2278 - } 2279 - extraI = -1 - extraI 2280 - default: 2281 - return fmt.Errorf("wrong type for int64 field: %d", maj) 2282 - } 2283 - 2284 - t.Seq = int64(extraI) 2285 - } 2286 - // t.Time (string) (string) 2287 - case "time": 2288 - 2289 - { 2290 - sval, err := cbg.ReadStringWithMax(cr, 1000000) 2291 - if err != nil { 2292 - return err 2293 - } 2294 - 2295 - t.Time = string(sval) 2296 - } 2297 - // t.MigrateTo (string) (string) 2298 - case "migrateTo": 2299 - 2300 - { 2301 - b, err := cr.ReadByte() 2302 - if err != nil { 2303 - return err 2304 - } 2305 - if b != cbg.CborNull[0] { 2306 - if err := cr.UnreadByte(); err != nil { 2307 - return err 2308 - } 2309 - 2310 - sval, err := cbg.ReadStringWithMax(cr, 1000000) 2311 - if err != nil { 2312 - return err 2313 - } 2314 - 2315 - t.MigrateTo = (*string)(&sval) 2316 - } 2317 - } 2318 - 2319 - default: 2320 - // Field doesn't exist on this type, so ignore it 2321 - if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil { 2322 - return err 2323 - } 2324 - } 2325 - } 2326 - 2327 - return nil 2328 - } 2329 1883 func (t *SyncSubscribeRepos_RepoOp) MarshalCBOR(w io.Writer) error { 2330 1884 if t == nil { 2331 1885 _, err := w.Write(cbg.CborNull) ··· 2540 2094 2541 2095 return nil 2542 2096 } 2543 - func (t *SyncSubscribeRepos_Tombstone) MarshalCBOR(w io.Writer) error { 2544 - if t == nil { 2545 - _, err := w.Write(cbg.CborNull) 2546 - return err 2547 - } 2548 2097 2549 - cw := cbg.NewCborWriter(w) 2550 - 2551 - if _, err := cw.Write([]byte{163}); err != nil { 2552 - return err 2553 - } 2554 - 2555 - // t.Did (string) (string) 2556 - if len("did") > 1000000 { 2557 - return xerrors.Errorf("Value in field \"did\" was too long") 2558 - } 2559 - 2560 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("did"))); err != nil { 2561 - return err 2562 - } 2563 - if _, err := cw.WriteString(string("did")); err != nil { 2564 - return err 2565 - } 2566 - 2567 - if len(t.Did) > 1000000 { 2568 - return xerrors.Errorf("Value in field t.Did was too long") 2569 - } 2570 - 2571 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Did))); err != nil { 2572 - return err 2573 - } 2574 - if _, err := cw.WriteString(string(t.Did)); err != nil { 2575 - return err 2576 - } 2577 - 2578 - // t.Seq (int64) (int64) 2579 - if len("seq") > 1000000 { 2580 - return xerrors.Errorf("Value in field \"seq\" was too long") 2581 - } 2582 - 2583 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil { 2584 - return err 2585 - } 2586 - if _, err := cw.WriteString(string("seq")); err != nil { 2587 - return err 2588 - } 2589 - 2590 - if t.Seq >= 0 { 2591 - if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil { 2592 - return err 2593 - } 2594 - } else { 2595 - if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil { 2596 - return err 2597 - } 2598 - } 2599 - 2600 - // t.Time (string) (string) 2601 - if len("time") > 1000000 { 2602 - return xerrors.Errorf("Value in field \"time\" was too long") 2603 - } 2604 - 2605 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("time"))); err != nil { 2606 - return err 2607 - } 2608 - if _, err := cw.WriteString(string("time")); err != nil { 2609 - return err 2610 - } 2611 - 2612 - if len(t.Time) > 1000000 { 2613 - return xerrors.Errorf("Value in field t.Time was too long") 2614 - } 2615 - 2616 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Time))); err != nil { 2617 - return err 2618 - } 2619 - if _, err := cw.WriteString(string(t.Time)); err != nil { 2620 - return err 2621 - } 2622 - return nil 2623 - } 2624 - 2625 - func (t *SyncSubscribeRepos_Tombstone) UnmarshalCBOR(r io.Reader) (err error) { 2626 - *t = SyncSubscribeRepos_Tombstone{} 2627 - 2628 - cr := cbg.NewCborReader(r) 2629 - 2630 - maj, extra, err := cr.ReadHeader() 2631 - if err != nil { 2632 - return err 2633 - } 2634 - defer func() { 2635 - if err == io.EOF { 2636 - err = io.ErrUnexpectedEOF 2637 - } 2638 - }() 2639 - 2640 - if maj != cbg.MajMap { 2641 - return fmt.Errorf("cbor input should be of type map") 2642 - } 2643 - 2644 - if extra > cbg.MaxLength { 2645 - return fmt.Errorf("SyncSubscribeRepos_Tombstone: map struct too large (%d)", extra) 2646 - } 2647 - 2648 - n := extra 2649 - 2650 - nameBuf := make([]byte, 4) 2651 - for i := uint64(0); i < n; i++ { 2652 - nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 2653 - if err != nil { 2654 - return err 2655 - } 2656 - 2657 - if !ok { 2658 - // Field doesn't exist on this type, so ignore it 2659 - if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil { 2660 - return err 2661 - } 2662 - continue 2663 - } 2664 - 2665 - switch string(nameBuf[:nameLen]) { 2666 - // t.Did (string) (string) 2667 - case "did": 2668 - 2669 - { 2670 - sval, err := cbg.ReadStringWithMax(cr, 1000000) 2671 - if err != nil { 2672 - return err 2673 - } 2674 - 2675 - t.Did = string(sval) 2676 - } 2677 - // t.Seq (int64) (int64) 2678 - case "seq": 2679 - { 2680 - maj, extra, err := cr.ReadHeader() 2681 - if err != nil { 2682 - return err 2683 - } 2684 - var extraI int64 2685 - switch maj { 2686 - case cbg.MajUnsignedInt: 2687 - extraI = int64(extra) 2688 - if extraI < 0 { 2689 - return fmt.Errorf("int64 positive overflow") 2690 - } 2691 - case cbg.MajNegativeInt: 2692 - extraI = int64(extra) 2693 - if extraI < 0 { 2694 - return fmt.Errorf("int64 negative overflow") 2695 - } 2696 - extraI = -1 - extraI 2697 - default: 2698 - return fmt.Errorf("wrong type for int64 field: %d", maj) 2699 - } 2700 - 2701 - t.Seq = int64(extraI) 2702 - } 2703 - // t.Time (string) (string) 2704 - case "time": 2705 - 2706 - { 2707 - sval, err := cbg.ReadStringWithMax(cr, 1000000) 2708 - if err != nil { 2709 - return err 2710 - } 2711 - 2712 - t.Time = string(sval) 2713 - } 2714 - 2715 - default: 2716 - // Field doesn't exist on this type, so ignore it 2717 - if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil { 2718 - return err 2719 - } 2720 - } 2721 - } 2722 - 2723 - return nil 2724 - } 2725 2098 func (t *LabelDefs_SelfLabels) MarshalCBOR(w io.Writer) error { 2726 2099 if t == nil { 2727 2100 _, err := w.Write(cbg.CborNull)
-29
api/atproto/syncsubscribeRepos.go
··· 49 49 TooBig bool `json:"tooBig" cborgen:"tooBig"` 50 50 } 51 51 52 - // SyncSubscribeRepos_Handle is a "handle" in the com.atproto.sync.subscribeRepos schema. 53 - // 54 - // DEPRECATED -- Use #identity event instead 55 - type SyncSubscribeRepos_Handle struct { 56 - Did string `json:"did" cborgen:"did"` 57 - Handle string `json:"handle" cborgen:"handle"` 58 - Seq int64 `json:"seq" cborgen:"seq"` 59 - Time string `json:"time" cborgen:"time"` 60 - } 61 - 62 52 // SyncSubscribeRepos_Identity is a "identity" in the com.atproto.sync.subscribeRepos schema. 63 53 // 64 54 // Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache. ··· 76 66 Name string `json:"name" cborgen:"name"` 77 67 } 78 68 79 - // SyncSubscribeRepos_Migrate is a "migrate" in the com.atproto.sync.subscribeRepos schema. 80 - // 81 - // DEPRECATED -- Use #account event instead 82 - type SyncSubscribeRepos_Migrate struct { 83 - Did string `json:"did" cborgen:"did"` 84 - MigrateTo *string `json:"migrateTo" cborgen:"migrateTo"` 85 - Seq int64 `json:"seq" cborgen:"seq"` 86 - Time string `json:"time" cborgen:"time"` 87 - } 88 - 89 69 // SyncSubscribeRepos_RepoOp is a "repoOp" in the com.atproto.sync.subscribeRepos schema. 90 70 // 91 71 // A repo operation, ie a mutation of a single record. ··· 113 93 // time: Timestamp of when this message was originally broadcast. 114 94 Time string `json:"time" cborgen:"time"` 115 95 } 116 - 117 - // SyncSubscribeRepos_Tombstone is a "tombstone" in the com.atproto.sync.subscribeRepos schema. 118 - // 119 - // DEPRECATED -- Use #account event instead 120 - type SyncSubscribeRepos_Tombstone struct { 121 - Did string `json:"did" cborgen:"did"` 122 - Seq int64 `json:"seq" cborgen:"seq"` 123 - Time string `json:"time" cborgen:"time"` 124 - }
+1 -1
api/bsky/actorstatus.go
··· 21 21 type ActorStatus struct { 22 22 LexiconTypeID string `json:"$type,const=app.bsky.actor.status" cborgen:"$type,const=app.bsky.actor.status"` 23 23 CreatedAt string `json:"createdAt" cborgen:"createdAt"` 24 - // durationMinutes: The duration of the status in minutes. Applications can choose to limit the duration. 24 + // durationMinutes: The duration of the status in minutes. Applications can choose to impose minimum and maximum limits. 25 25 DurationMinutes *int64 `json:"durationMinutes,omitempty" cborgen:"durationMinutes,omitempty"` 26 26 // embed: An optional embed associated with the status. 27 27 Embed *ActorStatus_Embed `json:"embed,omitempty" cborgen:"embed,omitempty"`
+8 -1
api/bsky/unspeccedgetConfig.go
··· 10 10 "github.com/bluesky-social/indigo/xrpc" 11 11 ) 12 12 13 + // UnspeccedGetConfig_LiveNowConfig is a "liveNowConfig" in the app.bsky.unspecced.getConfig schema. 14 + type UnspeccedGetConfig_LiveNowConfig struct { 15 + Did string `json:"did" cborgen:"did"` 16 + Domains []string `json:"domains" cborgen:"domains"` 17 + } 18 + 13 19 // UnspeccedGetConfig_Output is the output of a app.bsky.unspecced.getConfig call. 14 20 type UnspeccedGetConfig_Output struct { 15 - CheckEmailConfirmed *bool `json:"checkEmailConfirmed,omitempty" cborgen:"checkEmailConfirmed,omitempty"` 21 + CheckEmailConfirmed *bool `json:"checkEmailConfirmed,omitempty" cborgen:"checkEmailConfirmed,omitempty"` 22 + LiveNow []*UnspeccedGetConfig_LiveNowConfig `json:"liveNow,omitempty" cborgen:"liveNow,omitempty"` 16 23 } 17 24 18 25 // UnspeccedGetConfig calls the XRPC method "app.bsky.unspecced.getConfig".
-1
api/ozone/moderationdefs.go
··· 1043 1043 // 1044 1044 // Detailed view of a subject. For record subjects, the author's repo and profile will be returned. 1045 1045 type ModerationDefs_SubjectView struct { 1046 - //Profile *ModerationDefs_SubjectView_Profile `json:"profile,omitempty" cborgen:"profile,omitempty"` 1047 1046 Record *ModerationDefs_RecordViewDetail `json:"record,omitempty" cborgen:"record,omitempty"` 1048 1047 Repo *ModerationDefs_RepoViewDetail `json:"repo,omitempty" cborgen:"repo,omitempty"` 1049 1048 Status *ModerationDefs_SubjectStatusView `json:"status,omitempty" cborgen:"status,omitempty"`
+3 -5
api/ozone/verificationdefs.go
··· 22 22 // handle: Handle of the subject the verification applies to at the moment of verifying, which might not be the same at the time of viewing. The verification is only valid if the current handle matches the one at the time of verifying. 23 23 Handle string `json:"handle" cborgen:"handle"` 24 24 // issuer: The user who issued this verification. 25 - Issuer string `json:"issuer" cborgen:"issuer"` 26 - //IssuerProfile *VerificationDefs_VerificationView_IssuerProfile `json:"issuerProfile,omitempty" cborgen:"issuerProfile,omitempty"` 27 - //IssuerRepo *VerificationDefs_VerificationView_IssuerRepo `json:"issuerRepo,omitempty" cborgen:"issuerRepo,omitempty"` 25 + Issuer string `json:"issuer" cborgen:"issuer"` 26 + IssuerRepo *VerificationDefs_VerificationView_IssuerRepo `json:"issuerRepo,omitempty" cborgen:"issuerRepo,omitempty"` 28 27 // revokeReason: Describes the reason for revocation, also indicating that the verification is no longer valid. 29 28 RevokeReason *string `json:"revokeReason,omitempty" cborgen:"revokeReason,omitempty"` 30 29 // revokedAt: Timestamp when the verification was revoked. ··· 32 31 // revokedBy: The user who revoked this verification. 33 32 RevokedBy *string `json:"revokedBy,omitempty" cborgen:"revokedBy,omitempty"` 34 33 // subject: The subject of the verification. 35 - Subject string `json:"subject" cborgen:"subject"` 36 - //SubjectProfile *VerificationDefs_VerificationView_SubjectProfile `json:"subjectProfile,omitempty" cborgen:"subjectProfile,omitempty"` 34 + Subject string `json:"subject" cborgen:"subject"` 37 35 SubjectRepo *VerificationDefs_VerificationView_SubjectRepo `json:"subjectRepo,omitempty" cborgen:"subjectRepo,omitempty"` 38 36 // uri: The AT-URI of the verification record. 39 37 Uri string `json:"uri" cborgen:"uri"`
-78
bgs/bgs.go
··· 903 903 904 904 repoCommitsResultCounter.WithLabelValues(host.Host, "ok").Inc() 905 905 return nil 906 - case env.RepoHandle != nil: 907 - bgs.log.Info("bgs got repo handle event", "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle) 908 - // Flush any cached DID documents for this user 909 - bgs.didr.FlushCacheFor(env.RepoHandle.Did) 910 - 911 - // TODO: ignoring the data in the message and just going out to the DID doc 912 - act, err := bgs.createExternalUser(ctx, env.RepoHandle.Did) 913 - if err != nil { 914 - return err 915 - } 916 - 917 - if act.Handle.String != env.RepoHandle.Handle { 918 - bgs.log.Warn("handle update did not update handle to asserted value", "did", env.RepoHandle.Did, "expected", env.RepoHandle.Handle, "actual", act.Handle) 919 - } 920 - 921 - // TODO: Update the ReposHandle event type to include "verified" or something 922 - 923 - // Broadcast the handle update to all consumers 924 - err = bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{ 925 - RepoHandle: &comatproto.SyncSubscribeRepos_Handle{ 926 - Did: env.RepoHandle.Did, 927 - Handle: env.RepoHandle.Handle, 928 - Time: env.RepoHandle.Time, 929 - }, 930 - }) 931 - if err != nil { 932 - bgs.log.Error("failed to broadcast RepoHandle event", "error", err, "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle) 933 - return fmt.Errorf("failed to broadcast RepoHandle event: %w", err) 934 - } 935 - 936 - return nil 937 906 case env.RepoIdentity != nil: 938 907 bgs.log.Info("bgs got identity event", "did", env.RepoIdentity.Did) 939 908 // Flush any cached DID documents for this user ··· 1035 1004 } 1036 1005 1037 1006 return nil 1038 - case env.RepoMigrate != nil: 1039 - if _, err := bgs.createExternalUser(ctx, env.RepoMigrate.Did); err != nil { 1040 - return err 1041 - } 1042 - 1043 - return nil 1044 - case env.RepoTombstone != nil: 1045 - if err := bgs.handleRepoTombstone(ctx, host, env.RepoTombstone); err != nil { 1046 - return err 1047 - } 1048 - 1049 - return nil 1050 1007 default: 1051 1008 return fmt.Errorf("invalid fed event") 1052 1009 } 1053 - } 1054 - 1055 - func (bgs *BGS) handleRepoTombstone(ctx context.Context, pds *models.PDS, evt *atproto.SyncSubscribeRepos_Tombstone) error { 1056 - u, err := bgs.lookupUserByDid(ctx, evt.Did) 1057 - if err != nil { 1058 - return err 1059 - } 1060 - 1061 - if u.PDS != pds.ID { 1062 - return fmt.Errorf("unauthoritative tombstone event from %s for %s", pds.Host, evt.Did) 1063 - } 1064 - 1065 - if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumns(map[string]any{ 1066 - "tombstoned": true, 1067 - "handle": nil, 1068 - }).Error; err != nil { 1069 - return err 1070 - } 1071 - u.SetTombstoned(true) 1072 - 1073 - if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{ 1074 - "handle": nil, 1075 - }).Error; err != nil { 1076 - return err 1077 - } 1078 - 1079 - // delete data from carstore 1080 - if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil { 1081 - // don't let a failure here prevent us from propagating this event 1082 - bgs.log.Error("failed to delete user data from carstore", "err", err) 1083 - } 1084 - 1085 - return bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{ 1086 - RepoTombstone: evt, 1087 - }) 1088 1010 } 1089 1011 1090 1012 // TODO: rename? This also updates users, and 'external' is an old phrasing
-45
bgs/fedmgr.go
··· 569 569 570 570 return nil 571 571 }, 572 - RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 573 - log.Info("got remote handle update event", "pdsHost", host.Host, "did", evt.Did, "handle", evt.Handle) 574 - if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{ 575 - RepoHandle: evt, 576 - }); err != nil { 577 - log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err) 578 - } 579 - *lastCursor = evt.Seq 580 - 581 - if err := s.updateCursor(sub, *lastCursor); err != nil { 582 - return fmt.Errorf("updating cursor: %w", err) 583 - } 584 - 585 - return nil 586 - }, 587 - RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error { 588 - log.Info("got remote repo migrate event", "pdsHost", host.Host, "did", evt.Did, "migrateTo", evt.MigrateTo) 589 - if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{ 590 - RepoMigrate: evt, 591 - }); err != nil { 592 - log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err) 593 - } 594 - *lastCursor = evt.Seq 595 - 596 - if err := s.updateCursor(sub, *lastCursor); err != nil { 597 - return fmt.Errorf("updating cursor: %w", err) 598 - } 599 - 600 - return nil 601 - }, 602 - RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { 603 - log.Info("got remote repo tombstone event", "pdsHost", host.Host, "did", evt.Did) 604 - if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{ 605 - RepoTombstone: evt, 606 - }); err != nil { 607 - log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err) 608 - } 609 - *lastCursor = evt.Seq 610 - 611 - if err := s.updateCursor(sub, *lastCursor); err != nil { 612 - return fmt.Errorf("updating cursor: %w", err) 613 - } 614 - 615 - return nil 616 - }, 617 572 RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error { 618 573 log.Info("info event", "name", info.Name, "message", info.Message, "pdsHost", host.Host) 619 574 return nil
-18
cmd/goat/firehose.go
··· 189 189 } 190 190 return nil 191 191 }, 192 - RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 193 - if gfc.VerifyBasic { 194 - slog.Info("deprecated event type", "eventType", "handle", "did", evt.Did, "seq", evt.Seq) 195 - } 196 - return nil 197 - }, 198 - RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error { 199 - if gfc.VerifyBasic { 200 - slog.Info("deprecated event type", "eventType", "migrate", "did", evt.Did, "seq", evt.Seq) 201 - } 202 - return nil 203 - }, 204 - RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { 205 - if gfc.VerifyBasic { 206 - slog.Info("deprecated event type", "eventType", "handle", "did", evt.Did, "seq", evt.Seq) 207 - } 208 - return nil 209 - }, 210 192 } 211 193 212 194 scheduler := parallel.NewScheduler(
-16
cmd/gosky/debug.go
··· 262 262 263 263 return nil 264 264 }, 265 - RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 266 - fmt.Printf("\rChecking seq: %d ", evt.Seq) 267 - if lastSeq > 0 && evt.Seq != lastSeq+1 { 268 - fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq) 269 - } 270 - lastSeq = evt.Seq 271 - return nil 272 - }, 273 - RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { 274 - fmt.Printf("\rChecking seq: %d ", evt.Seq) 275 - if lastSeq > 0 && evt.Seq != lastSeq+1 { 276 - fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq) 277 - } 278 - lastSeq = evt.Seq 279 - return nil 280 - }, 281 265 RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error { 282 266 return nil 283 267 },
-41
cmd/gosky/main.go
··· 284 284 285 285 return nil 286 286 }, 287 - RepoMigrate: func(migrate *comatproto.SyncSubscribeRepos_Migrate) error { 288 - if jsonfmt { 289 - b, err := json.Marshal(migrate) 290 - if err != nil { 291 - return err 292 - } 293 - fmt.Println(string(b)) 294 - } else { 295 - fmt.Printf("(%d) RepoMigrate: %s moving to: %s\n", migrate.Seq, migrate.Did, *migrate.MigrateTo) 296 - } 297 - 298 - return nil 299 - }, 300 - RepoHandle: func(handle *comatproto.SyncSubscribeRepos_Handle) error { 301 - if jsonfmt { 302 - b, err := json.Marshal(handle) 303 - if err != nil { 304 - return err 305 - } 306 - fmt.Println(string(b)) 307 - } else { 308 - fmt.Printf("(%d) RepoHandle: %s (changed to: %s)\n", handle.Seq, handle.Did, handle.Handle) 309 - } 310 - 311 - return nil 312 - 313 - }, 314 287 RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error { 315 288 if jsonfmt { 316 289 b, err := json.Marshal(info) ··· 323 296 } 324 297 325 298 return nil 326 - }, 327 - RepoTombstone: func(tomb *comatproto.SyncSubscribeRepos_Tombstone) error { 328 - if jsonfmt { 329 - b, err := json.Marshal(tomb) 330 - if err != nil { 331 - return err 332 - } 333 - fmt.Println(string(b)) 334 - } else { 335 - fmt.Printf("(%d) Tombstone: %s\n", tomb.Seq, tomb.Did) 336 - } 337 - 338 - return nil 339 - 340 299 }, 341 300 // TODO: all the other event types 342 301 Error: func(errf *events.ErrorFrame) error {
-10
cmd/gosky/streamdiff.go
··· 127 127 return "ERROR" 128 128 case evt.RepoCommit != nil: 129 129 return "#commit" 130 - case evt.RepoHandle != nil: 131 - return "#handle" 132 130 case evt.RepoInfo != nil: 133 131 return "#info" 134 - case evt.RepoMigrate != nil: 135 - return "#migrate" 136 - case evt.RepoTombstone != nil: 137 - return "#tombstone" 138 132 default: 139 133 return "unknown" 140 134 } ··· 157 151 if sameCommit(evt.RepoCommit, oe.RepoCommit) { 158 152 return i 159 153 } 160 - case evt.RepoHandle != nil: 161 - panic("not handling handle updates yet") 162 - case evt.RepoMigrate != nil: 163 - panic("not handling repo migrates yet") 164 154 default: 165 155 panic("unhandled event type: " + evtop) 166 156 }
-9
cmd/relay/relay/ingest.go
··· 43 43 case evt.RepoAccount != nil: 44 44 //repoAccountReceivedCounter.WithLabelValues(hostname).Add(1) 45 45 return r.processAccountEvent(ctx, evt.RepoAccount, hostname, hostID) 46 - case evt.RepoHandle != nil: // DEPRECATED 47 - eventsWarningsCounter.WithLabelValues(hostname, "handle").Add(1) 48 - return nil 49 - case evt.RepoMigrate != nil: // DEPRECATED 50 - eventsWarningsCounter.WithLabelValues(hostname, "migrate").Add(1) 51 - return nil 52 - case evt.RepoTombstone != nil: // DEPRECATED 53 - eventsWarningsCounter.WithLabelValues(hostname, "tombstone").Add(1) 54 - return nil 55 46 default: 56 47 return fmt.Errorf("unhandled repo stream event type") 57 48 }
-27
cmd/relay/relay/slurper.go
··· 454 454 s.logger.Debug("info event", "name", info.Name, "message", info.Message, "host", sub.Hostname) 455 455 return nil 456 456 }, 457 - RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { // DEPRECATED 458 - logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "handle") 459 - logger.Debug("got remote handle update event", "handle", evt.Handle) 460 - if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoHandle: evt}, sub.Hostname, sub.HostID); err != nil { 461 - logger.Error("failed handling event", "err", err) 462 - } 463 - sub.UpdateSeq() 464 - return nil 465 - }, 466 - RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error { // DEPRECATED 467 - logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "migrate") 468 - logger.Debug("got remote repo migrate event", "migrateTo", evt.MigrateTo) 469 - if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoMigrate: evt}, sub.Hostname, sub.HostID); err != nil { 470 - logger.Error("failed handling event", "err", err) 471 - } 472 - sub.UpdateSeq() 473 - return nil 474 - }, 475 - RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { // DEPRECATED 476 - logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "tombstone") 477 - logger.Debug("got remote repo tombstone event") 478 - if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoTombstone: evt}, sub.Hostname, sub.HostID); err != nil { 479 - logger.Error("failed handling event", "err", err) 480 - } 481 - sub.UpdateSeq() 482 - return nil 483 - }, 484 457 } 485 458 486 459 limiters := []*slidingwindow.Limiter{
+8 -71
cmd/relay/stream/consumer.go
··· 18 18 const MaxMessageBytes = 5_000_000 19 19 20 20 type RepoStreamCallbacks struct { 21 - RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error 22 - RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error 23 - RepoHandle func(evt *comatproto.SyncSubscribeRepos_Handle) error 24 - RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error 25 - RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error 26 - RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error 27 - RepoMigrate func(evt *comatproto.SyncSubscribeRepos_Migrate) error 28 - RepoTombstone func(evt *comatproto.SyncSubscribeRepos_Tombstone) error 29 - LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error 30 - LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error 31 - Error func(evt *ErrorFrame) error 21 + RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error 22 + RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error 23 + RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error 24 + RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error 25 + RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error 26 + LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error 27 + LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error 28 + Error func(evt *ErrorFrame) error 32 29 } 33 30 34 31 func (rsc *RepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error { ··· 37 34 return rsc.RepoCommit(xev.RepoCommit) 38 35 case xev.RepoSync != nil && rsc.RepoSync != nil: 39 36 return rsc.RepoSync(xev.RepoSync) 40 - case xev.RepoHandle != nil && rsc.RepoHandle != nil: 41 - return rsc.RepoHandle(xev.RepoHandle) 42 37 case xev.RepoInfo != nil && rsc.RepoInfo != nil: 43 38 return rsc.RepoInfo(xev.RepoInfo) 44 - case xev.RepoMigrate != nil && rsc.RepoMigrate != nil: 45 - return rsc.RepoMigrate(xev.RepoMigrate) 46 39 case xev.RepoIdentity != nil && rsc.RepoIdentity != nil: 47 40 return rsc.RepoIdentity(xev.RepoIdentity) 48 41 case xev.RepoAccount != nil && rsc.RepoAccount != nil: 49 42 return rsc.RepoAccount(xev.RepoAccount) 50 - case xev.RepoTombstone != nil && rsc.RepoTombstone != nil: 51 - return rsc.RepoTombstone(xev.RepoTombstone) 52 43 case xev.LabelLabels != nil && rsc.LabelLabels != nil: 53 44 return rsc.LabelLabels(xev.LabelLabels) 54 45 case xev.LabelInfo != nil && rsc.LabelInfo != nil: ··· 248 239 }); err != nil { 249 240 return err 250 241 } 251 - case "#handle": 252 - // TODO: DEPRECATED message; warning/counter; drop message 253 - var evt comatproto.SyncSubscribeRepos_Handle 254 - if err := evt.UnmarshalCBOR(r); err != nil { 255 - return err 256 - } 257 - 258 - if evt.Seq <= lastSeq { 259 - logger.Error("got events out of order from stream", "seq", evt.Seq, "prev", lastSeq) 260 - continue 261 - } 262 - lastSeq = evt.Seq 263 - 264 - if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{ 265 - RepoHandle: &evt, 266 - }); err != nil { 267 - return err 268 - } 269 242 case "#identity": 270 243 var evt comatproto.SyncSubscribeRepos_Identity 271 244 if err := evt.UnmarshalCBOR(r); err != nil { ··· 309 282 310 283 if err := sched.AddWork(ctx, "", &XRPCStreamEvent{ 311 284 RepoInfo: &evt, 312 - }); err != nil { 313 - return err 314 - } 315 - case "#migrate": 316 - // TODO: DEPRECATED message; warning/counter; drop message 317 - var evt comatproto.SyncSubscribeRepos_Migrate 318 - if err := evt.UnmarshalCBOR(r); err != nil { 319 - return err 320 - } 321 - 322 - if evt.Seq <= lastSeq { 323 - logger.Error("got events out of order from stream", "seq", evt.Seq, "prev", lastSeq) 324 - continue 325 - } 326 - lastSeq = evt.Seq 327 - 328 - if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{ 329 - RepoMigrate: &evt, 330 - }); err != nil { 331 - return err 332 - } 333 - case "#tombstone": 334 - // TODO: DEPRECATED message; warning/counter; drop message 335 - var evt comatproto.SyncSubscribeRepos_Tombstone 336 - if err := evt.UnmarshalCBOR(r); err != nil { 337 - return err 338 - } 339 - 340 - if evt.Seq <= lastSeq { 341 - logger.Error("got events out of order from stream", "seq", evt.Seq, "prev", lastSeq) 342 - continue 343 - } 344 - lastSeq = evt.Seq 345 - 346 - if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{ 347 - RepoTombstone: &evt, 348 285 }); err != nil { 349 286 return err 350 287 }
+8 -53
cmd/relay/stream/events.go
··· 23 23 } 24 24 25 25 type XRPCStreamEvent struct { 26 - Error *ErrorFrame 27 - RepoCommit *comatproto.SyncSubscribeRepos_Commit 28 - RepoSync *comatproto.SyncSubscribeRepos_Sync 29 - RepoHandle *comatproto.SyncSubscribeRepos_Handle // DEPRECATED 30 - RepoIdentity *comatproto.SyncSubscribeRepos_Identity 31 - RepoInfo *comatproto.SyncSubscribeRepos_Info 32 - RepoMigrate *comatproto.SyncSubscribeRepos_Migrate // DEPRECATED 33 - RepoTombstone *comatproto.SyncSubscribeRepos_Tombstone // DEPRECATED 34 - RepoAccount *comatproto.SyncSubscribeRepos_Account 35 - LabelLabels *comatproto.LabelSubscribeLabels_Labels 36 - LabelInfo *comatproto.LabelSubscribeLabels_Info 26 + Error *ErrorFrame 27 + RepoCommit *comatproto.SyncSubscribeRepos_Commit 28 + RepoSync *comatproto.SyncSubscribeRepos_Sync 29 + RepoIdentity *comatproto.SyncSubscribeRepos_Identity 30 + RepoInfo *comatproto.SyncSubscribeRepos_Info 31 + RepoAccount *comatproto.SyncSubscribeRepos_Account 32 + LabelLabels *comatproto.LabelSubscribeLabels_Labels 33 + LabelInfo *comatproto.LabelSubscribeLabels_Info 37 34 38 35 // some private fields for internal routing perf 39 36 PrivUid uint64 `json:"-" cborgen:"-"` ··· 56 53 case evt.RepoSync != nil: 57 54 header.MsgType = "#sync" 58 55 obj = evt.RepoSync 59 - case evt.RepoHandle != nil: 60 - header.MsgType = "#handle" 61 - obj = evt.RepoHandle 62 56 case evt.RepoIdentity != nil: 63 57 header.MsgType = "#identity" 64 58 obj = evt.RepoIdentity ··· 68 62 case evt.RepoInfo != nil: 69 63 header.MsgType = "#info" 70 64 obj = evt.RepoInfo 71 - case evt.RepoMigrate != nil: 72 - header.MsgType = "#migrate" 73 - obj = evt.RepoMigrate 74 - case evt.RepoTombstone != nil: 75 - header.MsgType = "#tombstone" 76 - obj = evt.RepoTombstone 77 65 default: 78 66 return fmt.Errorf("unrecognized event kind") 79 67 } ··· 105 93 return fmt.Errorf("reading repoSync event: %w", err) 106 94 } 107 95 xevt.RepoSync = &evt 108 - case "#handle": 109 - // TODO: DEPRECATED message; warning/counter; drop message 110 - var evt comatproto.SyncSubscribeRepos_Handle 111 - if err := evt.UnmarshalCBOR(r); err != nil { 112 - return err 113 - } 114 - xevt.RepoHandle = &evt 115 96 case "#identity": 116 97 var evt comatproto.SyncSubscribeRepos_Identity 117 98 if err := evt.UnmarshalCBOR(r); err != nil { ··· 131 112 return err 132 113 } 133 114 xevt.RepoInfo = &evt 134 - case "#migrate": 135 - // TODO: DEPRECATED message; warning/counter; drop message 136 - var evt comatproto.SyncSubscribeRepos_Migrate 137 - if err := evt.UnmarshalCBOR(r); err != nil { 138 - return err 139 - } 140 - xevt.RepoMigrate = &evt 141 - case "#tombstone": 142 - // TODO: DEPRECATED message; warning/counter; drop message 143 - var evt comatproto.SyncSubscribeRepos_Tombstone 144 - if err := evt.UnmarshalCBOR(r); err != nil { 145 - return err 146 - } 147 - xevt.RepoTombstone = &evt 148 115 case "#labels": 149 116 var evt comatproto.LabelSubscribeLabels_Labels 150 117 if err := evt.UnmarshalCBOR(r); err != nil { ··· 193 160 return evt.RepoCommit.Seq 194 161 case evt.RepoSync != nil: 195 162 return evt.RepoSync.Seq 196 - case evt.RepoHandle != nil: 197 - return evt.RepoHandle.Seq 198 - case evt.RepoMigrate != nil: 199 - return evt.RepoMigrate.Seq 200 - case evt.RepoTombstone != nil: 201 - return evt.RepoTombstone.Seq 202 163 case evt.RepoIdentity != nil: 203 164 return evt.RepoIdentity.Seq 204 165 case evt.RepoAccount != nil: ··· 220 181 return evt.RepoCommit.Seq, true 221 182 case evt.RepoSync != nil: 222 183 return evt.RepoSync.Seq, true 223 - case evt.RepoHandle != nil: 224 - return evt.RepoHandle.Seq, true 225 - case evt.RepoMigrate != nil: 226 - return evt.RepoMigrate.Seq, true 227 - case evt.RepoTombstone != nil: 228 - return evt.RepoTombstone.Seq, true 229 184 case evt.RepoIdentity != nil: 230 185 return evt.RepoIdentity.Seq, true 231 186 case evt.RepoAccount != nil:
-34
cmd/relay/stream/persist/diskpersist/diskpersist.go
··· 484 484 pjob.Evt.RepoCommit.Seq = seq 485 485 case pjob.Evt.RepoSync != nil: 486 486 pjob.Evt.RepoSync.Seq = seq 487 - case pjob.Evt.RepoHandle != nil: 488 - pjob.Evt.RepoHandle.Seq = seq 489 487 case pjob.Evt.RepoIdentity != nil: 490 488 pjob.Evt.RepoIdentity.Seq = seq 491 489 case pjob.Evt.RepoAccount != nil: 492 490 pjob.Evt.RepoAccount.Seq = seq 493 - case pjob.Evt.RepoTombstone != nil: 494 - pjob.Evt.RepoTombstone.Seq = seq 495 491 default: 496 492 // only those three get peristed right now 497 493 // we should not actually ever get here... ··· 547 543 if err := xevt.RepoSync.MarshalCBOR(cw); err != nil { 548 544 return fmt.Errorf("failed to marshal: %w", err) 549 545 } 550 - case xevt.RepoHandle != nil: 551 - evtKind = evtKindHandle 552 - did = xevt.RepoHandle.Did 553 - if err := xevt.RepoHandle.MarshalCBOR(cw); err != nil { 554 - return fmt.Errorf("failed to marshal: %w", err) 555 - } 556 546 case xevt.RepoIdentity != nil: 557 547 evtKind = evtKindIdentity 558 548 did = xevt.RepoIdentity.Did ··· 563 553 evtKind = evtKindAccount 564 554 did = xevt.RepoAccount.Did 565 555 if err := xevt.RepoAccount.MarshalCBOR(cw); err != nil { 566 - return fmt.Errorf("failed to marshal: %w", err) 567 - } 568 - case xevt.RepoTombstone != nil: 569 - evtKind = evtKindTombstone 570 - did = xevt.RepoTombstone.Did 571 - if err := xevt.RepoTombstone.MarshalCBOR(cw); err != nil { 572 556 return fmt.Errorf("failed to marshal: %w", err) 573 557 } 574 558 default: ··· 810 794 if err := cb(&stream.XRPCStreamEvent{RepoSync: &evt}); err != nil { 811 795 return nil, err 812 796 } 813 - case evtKindHandle: 814 - var evt atproto.SyncSubscribeRepos_Handle 815 - if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 816 - return nil, err 817 - } 818 - evt.Seq = h.Seq 819 - if err := cb(&stream.XRPCStreamEvent{RepoHandle: &evt}); err != nil { 820 - return nil, err 821 - } 822 797 case evtKindIdentity: 823 798 var evt atproto.SyncSubscribeRepos_Identity 824 799 if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { ··· 835 810 } 836 811 evt.Seq = h.Seq 837 812 if err := cb(&stream.XRPCStreamEvent{RepoAccount: &evt}); err != nil { 838 - return nil, err 839 - } 840 - case evtKindTombstone: 841 - var evt atproto.SyncSubscribeRepos_Tombstone 842 - if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 843 - return nil, err 844 - } 845 - evt.Seq = h.Seq 846 - if err := cb(&stream.XRPCStreamEvent{RepoTombstone: &evt}); err != nil { 847 813 return nil, err 848 814 } 849 815 default:
-8
cmd/relay/testing/consumer.go
··· 62 62 c.LastSeq = evt.Seq 63 63 return nil 64 64 }, 65 - // NOTE: this is included to test that the events are *not* passed through; can be removed in the near future 66 - RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 67 - c.eventsLk.Lock() 68 - defer c.eventsLk.Unlock() 69 - c.Events = append(c.Events, &stream.XRPCStreamEvent{RepoHandle: evt}) 70 - c.LastSeq = evt.Seq 71 - return nil 72 - }, 73 65 } 74 66 return rsc 75 67 }
-36
cmd/sonar/sonar.go
··· 109 109 case xe.RepoCommit != nil: 110 110 eventsProcessedCounter.WithLabelValues("repo_commit", s.SocketURL).Inc() 111 111 return s.HandleRepoCommit(ctx, xe.RepoCommit) 112 - case xe.RepoHandle != nil: 113 - eventsProcessedCounter.WithLabelValues("repo_handle", s.SocketURL).Inc() 114 - now := time.Now() 115 - s.ProgMux.Lock() 116 - s.Progress.LastSeq = xe.RepoHandle.Seq 117 - s.Progress.LastSeqProcessedAt = now 118 - s.ProgMux.Unlock() 119 - // Parse time from the event time string 120 - t, err := time.Parse(time.RFC3339, xe.RepoHandle.Time) 121 - if err != nil { 122 - s.Logger.Error("error parsing time", "err", err) 123 - return nil 124 - } 125 - lastEvtCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano())) 126 - lastEvtProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(now.UnixNano())) 127 - lastEvtCreatedEvtProcessedGapGauge.WithLabelValues(s.SocketURL).Set(float64(now.Sub(t).Seconds())) 128 - lastSeqGauge.WithLabelValues(s.SocketURL).Set(float64(xe.RepoHandle.Seq)) 129 112 case xe.RepoIdentity != nil: 130 113 eventsProcessedCounter.WithLabelValues("identity", s.SocketURL).Inc() 131 114 now := time.Now() ··· 142 125 s.ProgMux.Unlock() 143 126 case xe.RepoInfo != nil: 144 127 eventsProcessedCounter.WithLabelValues("repo_info", s.SocketURL).Inc() 145 - case xe.RepoMigrate != nil: 146 - eventsProcessedCounter.WithLabelValues("repo_migrate", s.SocketURL).Inc() 147 - now := time.Now() 148 - s.ProgMux.Lock() 149 - s.Progress.LastSeq = xe.RepoMigrate.Seq 150 - s.Progress.LastSeqProcessedAt = time.Now() 151 - s.ProgMux.Unlock() 152 - // Parse time from the event time string 153 - t, err := time.Parse(time.RFC3339, xe.RepoMigrate.Time) 154 - if err != nil { 155 - s.Logger.Error("error parsing time", "err", err) 156 - return nil 157 - } 158 - lastEvtCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano())) 159 - lastEvtProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(now.UnixNano())) 160 - lastEvtCreatedEvtProcessedGapGauge.WithLabelValues(s.SocketURL).Set(float64(now.Sub(t).Seconds())) 161 - lastSeqGauge.WithLabelValues(s.SocketURL).Set(float64(xe.RepoHandle.Seq)) 162 - case xe.RepoTombstone != nil: 163 - eventsProcessedCounter.WithLabelValues("repo_tombstone", s.SocketURL).Inc() 164 128 case xe.LabelInfo != nil: 165 129 eventsProcessedCounter.WithLabelValues("label_info", s.SocketURL).Inc() 166 130 case xe.LabelLabels != nil:
-9
cmd/supercollider/main.go
··· 338 338 case evt.RepoCommit != nil: 339 339 header.MsgType = "#commit" 340 340 obj = evt.RepoCommit 341 - case evt.RepoHandle != nil: 342 - header.MsgType = "#handle" 343 - obj = evt.RepoHandle 344 341 case evt.RepoInfo != nil: 345 342 header.MsgType = "#info" 346 343 obj = evt.RepoInfo 347 - case evt.RepoMigrate != nil: 348 - header.MsgType = "#migrate" 349 - obj = evt.RepoMigrate 350 - case evt.RepoTombstone != nil: 351 - header.MsgType = "#tombstone" 352 - obj = evt.RepoTombstone 353 344 default: 354 345 logger.Error("unrecognized event kind") 355 346 continue
+8 -68
events/consumer.go
··· 16 16 ) 17 17 18 18 type RepoStreamCallbacks struct { 19 - RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error 20 - RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error 21 - RepoHandle func(evt *comatproto.SyncSubscribeRepos_Handle) error 22 - RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error 23 - RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error 24 - RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error 25 - RepoMigrate func(evt *comatproto.SyncSubscribeRepos_Migrate) error 26 - RepoTombstone func(evt *comatproto.SyncSubscribeRepos_Tombstone) error 27 - LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error 28 - LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error 29 - Error func(evt *ErrorFrame) error 19 + RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error 20 + RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error 21 + RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error 22 + RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error 23 + RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error 24 + LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error 25 + LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error 26 + Error func(evt *ErrorFrame) error 30 27 } 31 28 32 29 func (rsc *RepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error { ··· 35 32 return rsc.RepoCommit(xev.RepoCommit) 36 33 case xev.RepoSync != nil && rsc.RepoSync != nil: 37 34 return rsc.RepoSync(xev.RepoSync) 38 - case xev.RepoHandle != nil && rsc.RepoHandle != nil: 39 - return rsc.RepoHandle(xev.RepoHandle) 40 35 case xev.RepoInfo != nil && rsc.RepoInfo != nil: 41 36 return rsc.RepoInfo(xev.RepoInfo) 42 - case xev.RepoMigrate != nil && rsc.RepoMigrate != nil: 43 - return rsc.RepoMigrate(xev.RepoMigrate) 44 37 case xev.RepoIdentity != nil && rsc.RepoIdentity != nil: 45 38 return rsc.RepoIdentity(xev.RepoIdentity) 46 39 case xev.RepoAccount != nil && rsc.RepoAccount != nil: 47 40 return rsc.RepoAccount(xev.RepoAccount) 48 - case xev.RepoTombstone != nil && rsc.RepoTombstone != nil: 49 - return rsc.RepoTombstone(xev.RepoTombstone) 50 41 case xev.LabelLabels != nil && rsc.LabelLabels != nil: 51 42 return rsc.LabelLabels(xev.LabelLabels) 52 43 case xev.LabelInfo != nil && rsc.LabelInfo != nil: ··· 241 232 }); err != nil { 242 233 return err 243 234 } 244 - case "#handle": 245 - // TODO: DEPRECATED message; warning/counter; drop message 246 - var evt comatproto.SyncSubscribeRepos_Handle 247 - if err := evt.UnmarshalCBOR(r); err != nil { 248 - return err 249 - } 250 - 251 - if evt.Seq < lastSeq { 252 - log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq) 253 - } 254 - lastSeq = evt.Seq 255 - 256 - if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{ 257 - RepoHandle: &evt, 258 - }); err != nil { 259 - return err 260 - } 261 235 case "#identity": 262 236 var evt comatproto.SyncSubscribeRepos_Identity 263 237 if err := evt.UnmarshalCBOR(r); err != nil { ··· 299 273 300 274 if err := sched.AddWork(ctx, "", &XRPCStreamEvent{ 301 275 RepoInfo: &evt, 302 - }); err != nil { 303 - return err 304 - } 305 - case "#migrate": 306 - // TODO: DEPRECATED message; warning/counter; drop message 307 - var evt comatproto.SyncSubscribeRepos_Migrate 308 - if err := evt.UnmarshalCBOR(r); err != nil { 309 - return err 310 - } 311 - 312 - if evt.Seq < lastSeq { 313 - log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq) 314 - } 315 - lastSeq = evt.Seq 316 - 317 - if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{ 318 - RepoMigrate: &evt, 319 - }); err != nil { 320 - return err 321 - } 322 - case "#tombstone": 323 - // TODO: DEPRECATED message; warning/counter; drop message 324 - var evt comatproto.SyncSubscribeRepos_Tombstone 325 - if err := evt.UnmarshalCBOR(r); err != nil { 326 - return err 327 - } 328 - 329 - if evt.Seq < lastSeq { 330 - log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq) 331 - } 332 - lastSeq = evt.Seq 333 - 334 - if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{ 335 - RepoTombstone: &evt, 336 276 }); err != nil { 337 277 return err 338 278 }
-88
events/dbpersist/dbpersist.go
··· 173 173 e.RepoCommit.Seq = int64(item.Seq) 174 174 case e.RepoSync != nil: 175 175 e.RepoSync.Seq = int64(item.Seq) 176 - case e.RepoHandle != nil: 177 - e.RepoHandle.Seq = int64(item.Seq) 178 176 case e.RepoIdentity != nil: 179 177 e.RepoIdentity.Seq = int64(item.Seq) 180 178 case e.RepoAccount != nil: 181 179 e.RepoAccount.Seq = int64(item.Seq) 182 - case e.RepoTombstone != nil: 183 - e.RepoTombstone.Seq = int64(item.Seq) 184 180 default: 185 181 return fmt.Errorf("unknown event type") 186 182 } ··· 225 221 if err != nil { 226 222 return err 227 223 } 228 - case e.RepoHandle != nil: 229 - rer, err = p.RecordFromHandleChange(ctx, e.RepoHandle) 230 - if err != nil { 231 - return err 232 - } 233 224 case e.RepoIdentity != nil: 234 225 rer, err = p.RecordFromRepoIdentity(ctx, e.RepoIdentity) 235 226 if err != nil { ··· 240 231 if err != nil { 241 232 return err 242 233 } 243 - case e.RepoTombstone != nil: 244 - rer, err = p.RecordFromTombstone(ctx, e.RepoTombstone) 245 - if err != nil { 246 - return err 247 - } 248 234 default: 249 235 return nil 250 236 } ··· 256 242 return nil 257 243 } 258 244 259 - func (p *DbPersistence) RecordFromHandleChange(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Handle) (*RepoEventRecord, error) { 260 - t, err := time.Parse(util.ISO8601, evt.Time) 261 - if err != nil { 262 - return nil, err 263 - } 264 - 265 - uid, err := p.uidForDid(ctx, evt.Did) 266 - if err != nil { 267 - return nil, err 268 - } 269 - 270 - return &RepoEventRecord{ 271 - Repo: uid, 272 - Type: "repo_handle", 273 - Time: t, 274 - NewHandle: &evt.Handle, 275 - }, nil 276 - } 277 - 278 245 func (p *DbPersistence) RecordFromRepoIdentity(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) (*RepoEventRecord, error) { 279 246 t, err := time.Parse(util.ISO8601, evt.Time) 280 247 if err != nil { ··· 313 280 }, nil 314 281 } 315 282 316 - func (p *DbPersistence) RecordFromTombstone(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Tombstone) (*RepoEventRecord, error) { 317 - t, err := time.Parse(util.ISO8601, evt.Time) 318 - if err != nil { 319 - return nil, err 320 - } 321 - 322 - uid, err := p.uidForDid(ctx, evt.Did) 323 - if err != nil { 324 - return nil, err 325 - } 326 - 327 - return &RepoEventRecord{ 328 - Repo: uid, 329 - Type: "repo_tombstone", 330 - Time: t, 331 - }, nil 332 - } 333 - 334 283 func (p *DbPersistence) RecordFromRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) (*RepoEventRecord, error) { 335 284 // TODO: hack hack hack 336 285 if len(evt.Ops) > 8192 { ··· 480 429 streamEvent, err = p.hydrateCommit(ctx, record) 481 430 case record.Type == "repo_sync": 482 431 streamEvent, err = p.hydrateSyncEvent(ctx, record) 483 - case record.NewHandle != nil: 484 - streamEvent, err = p.hydrateHandleChange(ctx, record) 485 432 case record.Type == "repo_identity": 486 433 streamEvent, err = p.hydrateIdentityEvent(ctx, record) 487 434 case record.Type == "repo_account": 488 435 streamEvent, err = p.hydrateAccountEvent(ctx, record) 489 - case record.Type == "repo_tombstone": 490 - streamEvent, err = p.hydrateTombstone(ctx, record) 491 436 default: 492 437 err = fmt.Errorf("unknown event type: %s", record.Type) 493 438 } ··· 550 495 return u.Did, nil 551 496 } 552 497 553 - func (p *DbPersistence) hydrateHandleChange(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) { 554 - if rer.NewHandle == nil { 555 - return nil, fmt.Errorf("NewHandle is nil") 556 - } 557 - 558 - did, err := p.didForUid(ctx, rer.Repo) 559 - if err != nil { 560 - return nil, err 561 - } 562 - 563 - return &events.XRPCStreamEvent{ 564 - RepoHandle: &comatproto.SyncSubscribeRepos_Handle{ 565 - Did: did, 566 - Handle: *rer.NewHandle, 567 - Time: rer.Time.Format(util.ISO8601), 568 - }, 569 - }, nil 570 - } 571 - 572 498 func (p *DbPersistence) hydrateIdentityEvent(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) { 573 499 did, err := p.didForUid(ctx, rer.Repo) 574 500 if err != nil { ··· 595 521 Time: rer.Time.Format(util.ISO8601), 596 522 Active: rer.Active, 597 523 Status: rer.Status, 598 - }, 599 - }, nil 600 - } 601 - 602 - func (p *DbPersistence) hydrateTombstone(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) { 603 - did, err := p.didForUid(ctx, rer.Repo) 604 - if err != nil { 605 - return nil, err 606 - } 607 - 608 - return &events.XRPCStreamEvent{ 609 - RepoTombstone: &comatproto.SyncSubscribeRepos_Tombstone{ 610 - Did: did, 611 - Time: rer.Time.Format(util.ISO8601), 612 524 }, 613 525 }, nil 614 526 }
-34
events/diskpersist/diskpersist.go
··· 458 458 e.RepoCommit.Seq = seq 459 459 case e.RepoSync != nil: 460 460 e.RepoSync.Seq = seq 461 - case e.RepoHandle != nil: 462 - e.RepoHandle.Seq = seq 463 461 case e.RepoIdentity != nil: 464 462 e.RepoIdentity.Seq = seq 465 463 case e.RepoAccount != nil: 466 464 e.RepoAccount.Seq = seq 467 - case e.RepoTombstone != nil: 468 - e.RepoTombstone.Seq = seq 469 465 default: 470 466 // only those three get peristed right now 471 467 // we should not actually ever get here... ··· 518 514 if err := e.RepoSync.MarshalCBOR(cw); err != nil { 519 515 return fmt.Errorf("failed to marshal: %w", err) 520 516 } 521 - case e.RepoHandle != nil: 522 - evtKind = evtKindHandle 523 - did = e.RepoHandle.Did 524 - if err := e.RepoHandle.MarshalCBOR(cw); err != nil { 525 - return fmt.Errorf("failed to marshal: %w", err) 526 - } 527 517 case e.RepoIdentity != nil: 528 518 evtKind = evtKindIdentity 529 519 did = e.RepoIdentity.Did ··· 534 524 evtKind = evtKindAccount 535 525 did = e.RepoAccount.Did 536 526 if err := e.RepoAccount.MarshalCBOR(cw); err != nil { 537 - return fmt.Errorf("failed to marshal: %w", err) 538 - } 539 - case e.RepoTombstone != nil: 540 - evtKind = evtKindTombstone 541 - did = e.RepoTombstone.Did 542 - if err := e.RepoTombstone.MarshalCBOR(cw); err != nil { 543 527 return fmt.Errorf("failed to marshal: %w", err) 544 528 } 545 529 default: ··· 763 747 if err := cb(&events.XRPCStreamEvent{RepoSync: &evt}); err != nil { 764 748 return nil, err 765 749 } 766 - case evtKindHandle: 767 - var evt atproto.SyncSubscribeRepos_Handle 768 - if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 769 - return nil, err 770 - } 771 - evt.Seq = h.Seq 772 - if err := cb(&events.XRPCStreamEvent{RepoHandle: &evt}); err != nil { 773 - return nil, err 774 - } 775 750 case evtKindIdentity: 776 751 var evt atproto.SyncSubscribeRepos_Identity 777 752 if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { ··· 788 763 } 789 764 evt.Seq = h.Seq 790 765 if err := cb(&events.XRPCStreamEvent{RepoAccount: &evt}); err != nil { 791 - return nil, err 792 - } 793 - case evtKindTombstone: 794 - var evt atproto.SyncSubscribeRepos_Tombstone 795 - if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 796 - return nil, err 797 - } 798 - evt.Seq = h.Seq 799 - if err := cb(&events.XRPCStreamEvent{RepoTombstone: &evt}); err != nil { 800 766 return nil, err 801 767 } 802 768 default:
+8 -53
events/events.go
··· 187 187 } 188 188 189 189 type XRPCStreamEvent struct { 190 - Error *ErrorFrame 191 - RepoCommit *comatproto.SyncSubscribeRepos_Commit 192 - RepoSync *comatproto.SyncSubscribeRepos_Sync 193 - RepoHandle *comatproto.SyncSubscribeRepos_Handle // DEPRECATED 194 - RepoIdentity *comatproto.SyncSubscribeRepos_Identity 195 - RepoInfo *comatproto.SyncSubscribeRepos_Info 196 - RepoMigrate *comatproto.SyncSubscribeRepos_Migrate // DEPRECATED 197 - RepoTombstone *comatproto.SyncSubscribeRepos_Tombstone // DEPRECATED 198 - RepoAccount *comatproto.SyncSubscribeRepos_Account 199 - LabelLabels *comatproto.LabelSubscribeLabels_Labels 200 - LabelInfo *comatproto.LabelSubscribeLabels_Info 190 + Error *ErrorFrame 191 + RepoCommit *comatproto.SyncSubscribeRepos_Commit 192 + RepoSync *comatproto.SyncSubscribeRepos_Sync 193 + RepoIdentity *comatproto.SyncSubscribeRepos_Identity 194 + RepoInfo *comatproto.SyncSubscribeRepos_Info 195 + RepoAccount *comatproto.SyncSubscribeRepos_Account 196 + LabelLabels *comatproto.LabelSubscribeLabels_Labels 197 + LabelInfo *comatproto.LabelSubscribeLabels_Info 201 198 202 199 // some private fields for internal routing perf 203 200 PrivUid models.Uid `json:"-" cborgen:"-"` ··· 220 217 case evt.RepoSync != nil: 221 218 header.MsgType = "#sync" 222 219 obj = evt.RepoSync 223 - case evt.RepoHandle != nil: 224 - header.MsgType = "#handle" 225 - obj = evt.RepoHandle 226 220 case evt.RepoIdentity != nil: 227 221 header.MsgType = "#identity" 228 222 obj = evt.RepoIdentity ··· 232 226 case evt.RepoInfo != nil: 233 227 header.MsgType = "#info" 234 228 obj = evt.RepoInfo 235 - case evt.RepoMigrate != nil: 236 - header.MsgType = "#migrate" 237 - obj = evt.RepoMigrate 238 - case evt.RepoTombstone != nil: 239 - header.MsgType = "#tombstone" 240 - obj = evt.RepoTombstone 241 229 default: 242 230 return fmt.Errorf("unrecognized event kind") 243 231 } ··· 269 257 return fmt.Errorf("reading repoSync event: %w", err) 270 258 } 271 259 xevt.RepoSync = &evt 272 - case "#handle": 273 - // TODO: DEPRECATED message; warning/counter; drop message 274 - var evt comatproto.SyncSubscribeRepos_Handle 275 - if err := evt.UnmarshalCBOR(r); err != nil { 276 - return err 277 - } 278 - xevt.RepoHandle = &evt 279 260 case "#identity": 280 261 var evt comatproto.SyncSubscribeRepos_Identity 281 262 if err := evt.UnmarshalCBOR(r); err != nil { ··· 295 276 return err 296 277 } 297 278 xevt.RepoInfo = &evt 298 - case "#migrate": 299 - // TODO: DEPRECATED message; warning/counter; drop message 300 - var evt comatproto.SyncSubscribeRepos_Migrate 301 - if err := evt.UnmarshalCBOR(r); err != nil { 302 - return err 303 - } 304 - xevt.RepoMigrate = &evt 305 - case "#tombstone": 306 - // TODO: DEPRECATED message; warning/counter; drop message 307 - var evt comatproto.SyncSubscribeRepos_Tombstone 308 - if err := evt.UnmarshalCBOR(r); err != nil { 309 - return err 310 - } 311 - xevt.RepoTombstone = &evt 312 279 case "#labels": 313 280 var evt comatproto.LabelSubscribeLabels_Labels 314 281 if err := evt.UnmarshalCBOR(r); err != nil { ··· 476 443 return evt.RepoCommit.Seq 477 444 case evt.RepoSync != nil: 478 445 return evt.RepoSync.Seq 479 - case evt.RepoHandle != nil: 480 - return evt.RepoHandle.Seq 481 - case evt.RepoMigrate != nil: 482 - return evt.RepoMigrate.Seq 483 - case evt.RepoTombstone != nil: 484 - return evt.RepoTombstone.Seq 485 446 case evt.RepoIdentity != nil: 486 447 return evt.RepoIdentity.Seq 487 448 case evt.RepoAccount != nil: ··· 503 464 return evt.RepoCommit.Seq, true 504 465 case evt.RepoSync != nil: 505 466 return evt.RepoSync.Seq, true 506 - case evt.RepoHandle != nil: 507 - return evt.RepoHandle.Seq, true 508 - case evt.RepoMigrate != nil: 509 - return evt.RepoMigrate.Seq, true 510 - case evt.RepoTombstone != nil: 511 - return evt.RepoTombstone.Seq, true 512 467 case evt.RepoIdentity != nil: 513 468 return evt.RepoIdentity.Seq, true 514 469 case evt.RepoAccount != nil:
-6
events/persist.go
··· 42 42 switch { 43 43 case e.RepoCommit != nil: 44 44 e.RepoCommit.Seq = mp.seq 45 - case e.RepoHandle != nil: 46 - e.RepoHandle.Seq = mp.seq 47 45 case e.RepoIdentity != nil: 48 46 e.RepoIdentity.Seq = mp.seq 49 47 case e.RepoAccount != nil: 50 48 e.RepoAccount.Seq = mp.seq 51 - case e.RepoMigrate != nil: 52 - e.RepoMigrate.Seq = mp.seq 53 - case e.RepoTombstone != nil: 54 - e.RepoTombstone.Seq = mp.seq 55 49 case e.LabelLabels != nil: 56 50 e.LabelLabels.Seq = mp.seq 57 51 default:
-6
events/yolopersist/yolopersist.go
··· 30 30 e.RepoCommit.Seq = yp.seq 31 31 case e.RepoSync != nil: 32 32 e.RepoSync.Seq = yp.seq 33 - case e.RepoHandle != nil: 34 - e.RepoHandle.Seq = yp.seq 35 33 case e.RepoIdentity != nil: 36 34 e.RepoIdentity.Seq = yp.seq 37 35 case e.RepoAccount != nil: 38 36 e.RepoAccount.Seq = yp.seq 39 - case e.RepoMigrate != nil: 40 - e.RepoMigrate.Seq = yp.seq 41 - case e.RepoTombstone != nil: 42 - e.RepoTombstone.Seq = yp.seq 43 37 case e.LabelLabels != nil: 44 38 e.LabelLabels.Seq = yp.seq 45 39 default:
-3
gen/main.go
··· 98 98 atproto.RepoStrongRef{}, 99 99 atproto.SyncSubscribeRepos_Commit{}, 100 100 atproto.SyncSubscribeRepos_Sync{}, 101 - atproto.SyncSubscribeRepos_Handle{}, 102 101 atproto.SyncSubscribeRepos_Identity{}, 103 102 atproto.SyncSubscribeRepos_Account{}, 104 103 atproto.SyncSubscribeRepos_Info{}, 105 - atproto.SyncSubscribeRepos_Migrate{}, 106 104 atproto.SyncSubscribeRepos_RepoOp{}, 107 - atproto.SyncSubscribeRepos_Tombstone{}, 108 105 atproto.LabelDefs_SelfLabels{}, 109 106 atproto.LabelDefs_SelfLabel{}, 110 107 atproto.LabelDefs_Label{},
-20
pds/server.go
··· 598 598 case evt.RepoCommit != nil: 599 599 header.MsgType = "#commit" 600 600 obj = evt.RepoCommit 601 - case evt.RepoHandle != nil: 602 - header.MsgType = "#handle" 603 - obj = evt.RepoHandle 604 601 case evt.RepoIdentity != nil: 605 602 header.MsgType = "#identity" 606 603 obj = evt.RepoIdentity ··· 610 607 case evt.RepoInfo != nil: 611 608 header.MsgType = "#info" 612 609 obj = evt.RepoInfo 613 - case evt.RepoMigrate != nil: 614 - header.MsgType = "#migrate" 615 - obj = evt.RepoMigrate 616 - case evt.RepoTombstone != nil: 617 - header.MsgType = "#tombstone" 618 - obj = evt.RepoTombstone 619 610 default: 620 611 return fmt.Errorf("unrecognized event kind") 621 612 } ··· 660 651 return fmt.Errorf("failed to update handle: %w", err) 661 652 } 662 653 663 - if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{ 664 - RepoHandle: &comatproto.SyncSubscribeRepos_Handle{ 665 - Did: u.Did, 666 - Handle: handle, 667 - Time: time.Now().Format(util.ISO8601), 668 - }, 669 - }); err != nil { 670 - return fmt.Errorf("failed to push event: %s", err) 671 - } 672 - 673 - // Also push an Identity event 674 654 if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{ 675 655 RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{ 676 656 Did: u.Did,
-16
search/firehose.go
··· 115 115 return nil 116 116 117 117 }, 118 - RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 119 - ctx := context.Background() 120 - ctx, span := tracer.Start(ctx, "RepoHandle") 121 - defer span.End() 122 - 123 - did, err := syntax.ParseDID(evt.Did) 124 - if err != nil { 125 - idx.logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 126 - return nil 127 - } 128 - if err := idx.updateUserHandle(ctx, did, evt.Handle); err != nil { 129 - // TODO: handle this case (instead of return nil) 130 - idx.logger.Error("failed to update user handle", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 131 - } 132 - return nil 133 - }, 134 118 } 135 119 136 120 return events.HandleRepoStream(
-2
testing/integ_test.go
··· 299 299 300 300 initevt := evts.Next() 301 301 t.Log(initevt.RepoCommit) 302 - hcevt := evts.Next() 303 - t.Log(hcevt.RepoHandle) 304 302 idevt := evts.Next() 305 303 t.Log(idevt.RepoIdentity) 306 304 }
-7
testing/utils.go
··· 666 666 es.Lk.Unlock() 667 667 return nil 668 668 }, 669 - RepoHandle: func(evt *atproto.SyncSubscribeRepos_Handle) error { 670 - fmt.Println("received handle event: ", evt.Seq, evt.Did) 671 - es.Lk.Lock() 672 - es.Events = append(es.Events, &events.XRPCStreamEvent{RepoHandle: evt}) 673 - es.Lk.Unlock() 674 - return nil 675 - }, 676 669 RepoIdentity: func(evt *atproto.SyncSubscribeRepos_Identity) error { 677 670 fmt.Println("received identity event: ", evt.Seq, evt.Did) 678 671 es.Lk.Lock()