this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Support #account events in the Relay event stream (#676)

This change includes the required changes to support `#account` events
from PDSs that allow for PDS-directed suspension, takedowns,
deactivation, and reactivation of accounts.

Still need to fix cborgen cause it broke _again_...

authored by

Jaz and committed by
GitHub
c1b16d45 638c7b9a

+1058 -36
+328 -1
api/atproto/cbor_gen.go
··· 991 991 } 992 992 993 993 cw := cbg.NewCborWriter(w) 994 + fieldCount := 4 994 995 995 - if _, err := cw.Write([]byte{163}); err != nil { 996 + if t.Handle == nil { 997 + fieldCount-- 998 + } 999 + 1000 + if _, err := cw.Write(cbg.CborEncodeMajorType(cbg.MajMap, uint64(fieldCount))); err != nil { 996 1001 return err 997 1002 } 998 1003 ··· 1063 1068 if _, err := cw.WriteString(string(t.Time)); err != nil { 1064 1069 return err 1065 1070 } 1071 + 1072 + // t.Handle (string) (string) 1073 + if t.Handle != nil { 1074 + 1075 + if len("handle") > 1000000 { 1076 + return xerrors.Errorf("Value in field \"handle\" was too long") 1077 + } 1078 + 1079 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("handle"))); err != nil { 1080 + return err 1081 + } 1082 + if _, err := cw.WriteString(string("handle")); err != nil { 1083 + return err 1084 + } 1085 + 1086 + if t.Handle == nil { 1087 + if _, err := cw.Write(cbg.CborNull); err != nil { 1088 + return err 1089 + } 1090 + } else { 1091 + if len(*t.Handle) > 1000000 { 1092 + return xerrors.Errorf("Value in field t.Handle was too long") 1093 + } 1094 + 1095 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(*t.Handle))); err != nil { 1096 + return err 1097 + } 1098 + if _, err := cw.WriteString(string(*t.Handle)); err != nil { 1099 + return err 1100 + } 1101 + } 1102 + } 1066 1103 return nil 1067 1104 } 1068 1105 ··· 1151 1188 } 1152 1189 1153 1190 t.Time = string(sval) 1191 + } 1192 + // t.Handle (string) (string) 1193 + case "handle": 1194 + 1195 + { 1196 + b, err := cr.ReadByte() 1197 + if err != nil { 1198 + return err 1199 + } 1200 + if b != cbg.CborNull[0] { 1201 + if err := cr.UnreadByte(); err != nil { 1202 + return err 1203 + } 1204 + 1205 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 1206 + if err != nil { 1207 + return err 1208 + } 1209 + 1210 + t.Handle = (*string)(&sval) 1211 + } 1212 + } 1213 + 1214 + default: 1215 + // Field doesn't exist on this type, so ignore it 1216 + cbg.ScanForLinks(r, func(cid.Cid) {}) 1217 + } 1218 + } 1219 + 1220 + return nil 1221 + } 1222 + func (t *SyncSubscribeRepos_Account) MarshalCBOR(w io.Writer) error { 1223 + if t == nil { 1224 + _, err := w.Write(cbg.CborNull) 1225 + return err 1226 + } 1227 + 1228 + cw := cbg.NewCborWriter(w) 1229 + fieldCount := 5 1230 + 1231 + if t.Status == nil { 1232 + fieldCount-- 1233 + } 1234 + 1235 + if _, err := cw.Write(cbg.CborEncodeMajorType(cbg.MajMap, uint64(fieldCount))); err != nil { 1236 + return err 1237 + } 1238 + 1239 + // t.Did (string) (string) 1240 + if len("did") > 1000000 { 1241 + return xerrors.Errorf("Value in field \"did\" was too long") 1242 + } 1243 + 1244 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("did"))); err != nil { 1245 + return err 1246 + } 1247 + if _, err := cw.WriteString(string("did")); err != nil { 1248 + return err 1249 + } 1250 + 1251 + if len(t.Did) > 1000000 { 1252 + return xerrors.Errorf("Value in field t.Did was too long") 1253 + } 1254 + 1255 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Did))); err != nil { 1256 + return err 1257 + } 1258 + if _, err := cw.WriteString(string(t.Did)); err != nil { 1259 + return err 1260 + } 1261 + 1262 + // t.Seq (int64) (int64) 1263 + if len("seq") > 1000000 { 1264 + return xerrors.Errorf("Value in field \"seq\" was too long") 1265 + } 1266 + 1267 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil { 1268 + return err 1269 + } 1270 + if _, err := cw.WriteString(string("seq")); err != nil { 1271 + return err 1272 + } 1273 + 1274 + if t.Seq >= 0 { 1275 + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil { 1276 + return err 1277 + } 1278 + } else { 1279 + if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil { 1280 + return err 1281 + } 1282 + } 1283 + 1284 + // t.Time (string) (string) 1285 + if len("time") > 1000000 { 1286 + return xerrors.Errorf("Value in field \"time\" was too long") 1287 + } 1288 + 1289 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("time"))); err != nil { 1290 + return err 1291 + } 1292 + if _, err := cw.WriteString(string("time")); err != nil { 1293 + return err 1294 + } 1295 + 1296 + if len(t.Time) > 1000000 { 1297 + return xerrors.Errorf("Value in field t.Time was too long") 1298 + } 1299 + 1300 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Time))); err != nil { 1301 + return err 1302 + } 1303 + if _, err := cw.WriteString(string(t.Time)); err != nil { 1304 + return err 1305 + } 1306 + 1307 + // t.Active (bool) (bool) 1308 + if len("active") > 1000000 { 1309 + return xerrors.Errorf("Value in field \"active\" was too long") 1310 + } 1311 + 1312 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("active"))); err != nil { 1313 + return err 1314 + } 1315 + if _, err := cw.WriteString(string("active")); err != nil { 1316 + return err 1317 + } 1318 + 1319 + if err := cbg.WriteBool(w, t.Active); err != nil { 1320 + return err 1321 + } 1322 + 1323 + // t.Status (string) (string) 1324 + if t.Status != nil { 1325 + 1326 + if len("status") > 1000000 { 1327 + return xerrors.Errorf("Value in field \"status\" was too long") 1328 + } 1329 + 1330 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("status"))); err != nil { 1331 + return err 1332 + } 1333 + if _, err := cw.WriteString(string("status")); err != nil { 1334 + return err 1335 + } 1336 + 1337 + if t.Status == nil { 1338 + if _, err := cw.Write(cbg.CborNull); err != nil { 1339 + return err 1340 + } 1341 + } else { 1342 + if len(*t.Status) > 1000000 { 1343 + return xerrors.Errorf("Value in field t.Status was too long") 1344 + } 1345 + 1346 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(*t.Status))); err != nil { 1347 + return err 1348 + } 1349 + if _, err := cw.WriteString(string(*t.Status)); err != nil { 1350 + return err 1351 + } 1352 + } 1353 + } 1354 + return nil 1355 + } 1356 + 1357 + func (t *SyncSubscribeRepos_Account) UnmarshalCBOR(r io.Reader) (err error) { 1358 + *t = SyncSubscribeRepos_Account{} 1359 + 1360 + cr := cbg.NewCborReader(r) 1361 + 1362 + maj, extra, err := cr.ReadHeader() 1363 + if err != nil { 1364 + return err 1365 + } 1366 + defer func() { 1367 + if err == io.EOF { 1368 + err = io.ErrUnexpectedEOF 1369 + } 1370 + }() 1371 + 1372 + if maj != cbg.MajMap { 1373 + return fmt.Errorf("cbor input should be of type map") 1374 + } 1375 + 1376 + if extra > cbg.MaxLength { 1377 + return fmt.Errorf("SyncSubscribeRepos_Account: map struct too large (%d)", extra) 1378 + } 1379 + 1380 + var name string 1381 + n := extra 1382 + 1383 + for i := uint64(0); i < n; i++ { 1384 + 1385 + { 1386 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 1387 + if err != nil { 1388 + return err 1389 + } 1390 + 1391 + name = string(sval) 1392 + } 1393 + 1394 + switch name { 1395 + // t.Did (string) (string) 1396 + case "did": 1397 + 1398 + { 1399 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 1400 + if err != nil { 1401 + return err 1402 + } 1403 + 1404 + t.Did = string(sval) 1405 + } 1406 + // t.Seq (int64) (int64) 1407 + case "seq": 1408 + { 1409 + maj, extra, err := cr.ReadHeader() 1410 + if err != nil { 1411 + return err 1412 + } 1413 + var extraI int64 1414 + switch maj { 1415 + case cbg.MajUnsignedInt: 1416 + extraI = int64(extra) 1417 + if extraI < 0 { 1418 + return fmt.Errorf("int64 positive overflow") 1419 + } 1420 + case cbg.MajNegativeInt: 1421 + extraI = int64(extra) 1422 + if extraI < 0 { 1423 + return fmt.Errorf("int64 negative overflow") 1424 + } 1425 + extraI = -1 - extraI 1426 + default: 1427 + return fmt.Errorf("wrong type for int64 field: %d", maj) 1428 + } 1429 + 1430 + t.Seq = int64(extraI) 1431 + } 1432 + // t.Time (string) (string) 1433 + case "time": 1434 + 1435 + { 1436 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 1437 + if err != nil { 1438 + return err 1439 + } 1440 + 1441 + t.Time = string(sval) 1442 + } 1443 + // t.Active (bool) (bool) 1444 + case "active": 1445 + 1446 + maj, extra, err = cr.ReadHeader() 1447 + if err != nil { 1448 + return err 1449 + } 1450 + if maj != cbg.MajOther { 1451 + return fmt.Errorf("booleans must be major type 7") 1452 + } 1453 + switch extra { 1454 + case 20: 1455 + t.Active = false 1456 + case 21: 1457 + t.Active = true 1458 + default: 1459 + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) 1460 + } 1461 + // t.Status (string) (string) 1462 + case "status": 1463 + 1464 + { 1465 + b, err := cr.ReadByte() 1466 + if err != nil { 1467 + return err 1468 + } 1469 + if b != cbg.CborNull[0] { 1470 + if err := cr.UnreadByte(); err != nil { 1471 + return err 1472 + } 1473 + 1474 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 1475 + if err != nil { 1476 + return err 1477 + } 1478 + 1479 + t.Status = (*string)(&sval) 1480 + } 1154 1481 } 1155 1482 1156 1483 default:
+9 -7
api/atproto/servercreateSession.go
··· 12 12 13 13 // ServerCreateSession_Input is the input argument to a com.atproto.server.createSession call. 14 14 type ServerCreateSession_Input struct { 15 + AuthFactorToken *string `json:"authFactorToken,omitempty" cborgen:"authFactorToken,omitempty"` 15 16 // identifier: Handle or other identifier supported by the server for the authenticating user. 16 17 Identifier string `json:"identifier" cborgen:"identifier"` 17 18 Password string `json:"password" cborgen:"password"` ··· 19 20 20 21 // ServerCreateSession_Output is the output of a com.atproto.server.createSession call. 21 22 type ServerCreateSession_Output struct { 22 - AccessJwt string `json:"accessJwt" cborgen:"accessJwt"` 23 - Did string `json:"did" cborgen:"did"` 24 - DidDoc *interface{} `json:"didDoc,omitempty" cborgen:"didDoc,omitempty"` 25 - Email *string `json:"email,omitempty" cborgen:"email,omitempty"` 26 - EmailConfirmed *bool `json:"emailConfirmed,omitempty" cborgen:"emailConfirmed,omitempty"` 27 - Handle string `json:"handle" cborgen:"handle"` 28 - RefreshJwt string `json:"refreshJwt" cborgen:"refreshJwt"` 23 + AccessJwt string `json:"accessJwt" cborgen:"accessJwt"` 24 + Did string `json:"did" cborgen:"did"` 25 + DidDoc *interface{} `json:"didDoc,omitempty" cborgen:"didDoc,omitempty"` 26 + Email *string `json:"email,omitempty" cborgen:"email,omitempty"` 27 + EmailAuthFactor *bool `json:"emailAuthFactor,omitempty" cborgen:"emailAuthFactor,omitempty"` 28 + EmailConfirmed *bool `json:"emailConfirmed,omitempty" cborgen:"emailConfirmed,omitempty"` 29 + Handle string `json:"handle" cborgen:"handle"` 30 + RefreshJwt string `json:"refreshJwt" cborgen:"refreshJwt"` 29 31 } 30 32 31 33 // ServerCreateSession calls the XRPC method "com.atproto.server.createSession".
+6 -5
api/atproto/servergetSession.go
··· 12 12 13 13 // ServerGetSession_Output is the output of a com.atproto.server.getSession call. 14 14 type ServerGetSession_Output struct { 15 - Did string `json:"did" cborgen:"did"` 16 - DidDoc *interface{} `json:"didDoc,omitempty" cborgen:"didDoc,omitempty"` 17 - Email *string `json:"email,omitempty" cborgen:"email,omitempty"` 18 - EmailConfirmed *bool `json:"emailConfirmed,omitempty" cborgen:"emailConfirmed,omitempty"` 19 - Handle string `json:"handle" cborgen:"handle"` 15 + Did string `json:"did" cborgen:"did"` 16 + DidDoc *interface{} `json:"didDoc,omitempty" cborgen:"didDoc,omitempty"` 17 + Email *string `json:"email,omitempty" cborgen:"email,omitempty"` 18 + EmailAuthFactor *bool `json:"emailAuthFactor,omitempty" cborgen:"emailAuthFactor,omitempty"` 19 + EmailConfirmed *bool `json:"emailConfirmed,omitempty" cborgen:"emailConfirmed,omitempty"` 20 + Handle string `json:"handle" cborgen:"handle"` 20 21 } 21 22 22 23 // ServerGetSession calls the XRPC method "com.atproto.server.getSession".
+2 -1
api/atproto/serverupdateEmail.go
··· 12 12 13 13 // ServerUpdateEmail_Input is the input argument to a com.atproto.server.updateEmail call. 14 14 type ServerUpdateEmail_Input struct { 15 - Email string `json:"email" cborgen:"email"` 15 + Email string `json:"email" cborgen:"email"` 16 + EmailAuthFactor *bool `json:"emailAuthFactor,omitempty" cborgen:"emailAuthFactor,omitempty"` 16 17 // token: Requires a token from com.atproto.sever.requestEmailUpdate if the account's email has been confirmed. 17 18 Token *string `json:"token,omitempty" cborgen:"token,omitempty"` 18 19 }
+37
api/atproto/syncgetRepoStatus.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + package atproto 4 + 5 + // schema: com.atproto.sync.getRepoStatus 6 + 7 + import ( 8 + "context" 9 + 10 + "github.com/bluesky-social/indigo/xrpc" 11 + ) 12 + 13 + // SyncGetRepoStatus_Output is the output of a com.atproto.sync.getRepoStatus call. 14 + type SyncGetRepoStatus_Output struct { 15 + Active bool `json:"active" cborgen:"active"` 16 + Did string `json:"did" cborgen:"did"` 17 + // rev: Optional field, the current rev of the repo, if active=true 18 + Rev *string `json:"rev,omitempty" cborgen:"rev,omitempty"` 19 + // status: If active=false, this optional field indicates a possible reason for why the account is not active. If active=false and no status is supplied, then the host makes no claim for why the repository is no longer being hosted. 20 + Status *string `json:"status,omitempty" cborgen:"status,omitempty"` 21 + } 22 + 23 + // SyncGetRepoStatus calls the XRPC method "com.atproto.sync.getRepoStatus". 24 + // 25 + // did: The DID of the repo. 26 + func SyncGetRepoStatus(ctx context.Context, c *xrpc.Client, did string) (*SyncGetRepoStatus_Output, error) { 27 + var out SyncGetRepoStatus_Output 28 + 29 + params := map[string]interface{}{ 30 + "did": did, 31 + } 32 + if err := c.Do(ctx, xrpc.Query, "", "com.atproto.sync.getRepoStatus", params, nil, &out); err != nil { 33 + return nil, err 34 + } 35 + 36 + return &out, nil 37 + }
+4 -1
api/atproto/synclistRepos.go
··· 18 18 19 19 // SyncListRepos_Repo is a "repo" in the com.atproto.sync.listRepos schema. 20 20 type SyncListRepos_Repo struct { 21 - Did string `json:"did" cborgen:"did"` 21 + Active *bool `json:"active,omitempty" cborgen:"active,omitempty"` 22 + Did string `json:"did" cborgen:"did"` 22 23 // head: Current repo commit CID 23 24 Head string `json:"head" cborgen:"head"` 24 25 Rev string `json:"rev" cborgen:"rev"` 26 + // status: If active=false, this optional field indicates a possible reason for why the account is not active. If active=false and no status is supplied, then the host makes no claim for why the repository is no longer being hosted. 27 + Status *string `json:"status,omitempty" cborgen:"status,omitempty"` 25 28 } 26 29 27 30 // SyncListRepos calls the XRPC method "com.atproto.sync.listRepos".
+21 -6
api/atproto/syncsubscribeRepos.go
··· 8 8 "github.com/bluesky-social/indigo/lex/util" 9 9 ) 10 10 11 + // SyncSubscribeRepos_Account is a "account" in the com.atproto.sync.subscribeRepos schema. 12 + // 13 + // Represents a change to an account's status on a host (eg, PDS or Relay). The semantics of this event are that the status is at the host which emitted the event, not necessarily that at the currently active PDS. Eg, a Relay takedown would emit a takedown with active=false, even if the PDS is still active. 14 + type SyncSubscribeRepos_Account struct { 15 + // active: Indicates that the account has a repository which can be fetched from the host that emitted this event. 16 + Active bool `json:"active" cborgen:"active"` 17 + Did string `json:"did" cborgen:"did"` 18 + Seq int64 `json:"seq" cborgen:"seq"` 19 + // status: If active=false, this optional field indicates a reason for why the account is not active. 20 + Status *string `json:"status,omitempty" cborgen:"status,omitempty"` 21 + Time string `json:"time" cborgen:"time"` 22 + } 23 + 11 24 // SyncSubscribeRepos_Commit is a "commit" in the com.atproto.sync.subscribeRepos schema. 12 25 // 13 26 // Represents an update of repository state. Note that empty commits are allowed, which include no repo data changes, but an update to rev and signature. ··· 38 51 39 52 // SyncSubscribeRepos_Handle is a "handle" in the com.atproto.sync.subscribeRepos schema. 40 53 // 41 - // Represents an update of the account's handle, or transition to/from invalid state. NOTE: Will be deprecated in favor of #identity. 54 + // DEPRECATED -- Use #identity event instead 42 55 type SyncSubscribeRepos_Handle struct { 43 56 Did string `json:"did" cborgen:"did"` 44 57 Handle string `json:"handle" cborgen:"handle"` ··· 50 63 // 51 64 // Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache. 52 65 type SyncSubscribeRepos_Identity struct { 53 - Did string `json:"did" cborgen:"did"` 54 - Seq int64 `json:"seq" cborgen:"seq"` 55 - Time string `json:"time" cborgen:"time"` 66 + Did string `json:"did" cborgen:"did"` 67 + // handle: The current handle for the account, or 'handle.invalid' if validation fails. This field is optional, might have been validated or passed-through from an upstream source. Semantics and behaviors for PDS vs Relay may evolve in the future; see atproto specs for more details. 68 + Handle *string `json:"handle,omitempty" cborgen:"handle,omitempty"` 69 + Seq int64 `json:"seq" cborgen:"seq"` 70 + Time string `json:"time" cborgen:"time"` 56 71 } 57 72 58 73 // SyncSubscribeRepos_Info is a "info" in the com.atproto.sync.subscribeRepos schema. ··· 63 78 64 79 // SyncSubscribeRepos_Migrate is a "migrate" in the com.atproto.sync.subscribeRepos schema. 65 80 // 66 - // Represents an account moving from one PDS instance to another. NOTE: not implemented; account migration uses #identity instead 81 + // DEPRECATED -- Use #account event instead 67 82 type SyncSubscribeRepos_Migrate struct { 68 83 Did string `json:"did" cborgen:"did"` 69 84 MigrateTo *string `json:"migrateTo" cborgen:"migrateTo"` ··· 83 98 84 99 // SyncSubscribeRepos_Tombstone is a "tombstone" in the com.atproto.sync.subscribeRepos schema. 85 100 // 86 - // Indicates that an account has been deleted. NOTE: may be deprecated in favor of #identity or a future #account event 101 + // DEPRECATED -- Use #account event instead 87 102 type SyncSubscribeRepos_Tombstone struct { 88 103 Did string `json:"did" cborgen:"did"` 89 104 Seq int64 `json:"seq" cborgen:"seq"`
+171 -11
bgs/bgs.go
··· 45 45 ) 46 46 47 47 var log = logging.Logger("bgs") 48 + var tracer = otel.Tracer("bgs") 48 49 49 50 // serverListenerBootTimeout is how long to wait for the requested server socket 50 51 // to become available for use. This is an arbitrary timeout that should be safe ··· 428 429 429 430 func (bgs *BGS) checkAdminAuth(next echo.HandlerFunc) echo.HandlerFunc { 430 431 return func(e echo.Context) error { 431 - ctx, span := otel.Tracer("bgs").Start(e.Request().Context(), "checkAdminAuth") 432 + ctx, span := tracer.Start(e.Request().Context(), "checkAdminAuth") 432 433 defer span.End() 433 434 434 435 e.SetRequest(e.Request().WithContext(ctx)) ··· 469 470 // and no data about this user will be served. 470 471 TakenDown bool 471 472 Tombstoned bool 473 + 474 + // UpstreamStatus is the state of the user as reported by the upstream PDS 475 + UpstreamStatus string `gorm:"index"` 472 476 } 473 477 474 478 type addTargetBody struct { ··· 640 644 case evt.RepoIdentity != nil: 641 645 header.MsgType = "#identity" 642 646 obj = evt.RepoIdentity 647 + case evt.RepoAccount != nil: 648 + header.MsgType = "#account" 649 + obj = evt.RepoAccount 643 650 case evt.RepoInfo != nil: 644 651 header.MsgType = "#info" 645 652 obj = evt.RepoInfo ··· 744 751 } 745 752 746 753 func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error) { 747 - ctx, span := otel.Tracer("bgs").Start(ctx, "lookupUserByDid") 754 + ctx, span := tracer.Start(ctx, "lookupUserByDid") 748 755 defer span.End() 749 756 750 757 var u User ··· 760 767 } 761 768 762 769 func (bgs *BGS) lookupUserByUID(ctx context.Context, uid models.Uid) (*User, error) { 763 - ctx, span := otel.Tracer("bgs").Start(ctx, "lookupUserByUID") 770 + ctx, span := tracer.Start(ctx, "lookupUserByUID") 764 771 defer span.End() 765 772 766 773 var u User ··· 784 791 } 785 792 786 793 func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *events.XRPCStreamEvent) error { 787 - ctx, span := otel.Tracer("bgs").Start(ctx, "handleFedEvent") 794 + ctx, span := tracer.Start(ctx, "handleFedEvent") 788 795 defer span.End() 789 796 790 797 start := time.Now() ··· 816 823 u.Did = evt.Repo 817 824 } 818 825 819 - if u.TakenDown { 820 - log.Debugw("dropping event from taken down user", "did", evt.Repo, "seq", evt.Seq, "host", host.Host) 826 + span.SetAttributes(attribute.String("upstream_status", u.UpstreamStatus)) 827 + 828 + if u.TakenDown || u.UpstreamStatus == events.AccountStatusTakendown { 829 + span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.TakenDown)) 830 + log.Debugw("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "host", host.Host) 831 + return nil 832 + } 833 + 834 + if u.UpstreamStatus == events.AccountStatusSuspended { 835 + log.Debugw("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "host", host.Host) 836 + return nil 837 + } 838 + 839 + if u.UpstreamStatus == events.AccountStatusDeactivated { 840 + log.Debugw("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "host", host.Host) 821 841 return nil 822 842 } 823 843 ··· 841 861 } 842 862 843 863 if u.Tombstoned { 864 + span.SetAttributes(attribute.Bool("tombstoned", true)) 844 865 // we've checked the authority of the users PDS, so reinstate the account 845 866 if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil { 846 867 return fmt.Errorf("failed to un-tombstone a user: %w", err) ··· 937 958 // Broadcast the identity event to all consumers 938 959 err = bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{ 939 960 RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{ 940 - Did: env.RepoIdentity.Did, 941 - Seq: env.RepoIdentity.Seq, 942 - Time: env.RepoIdentity.Time, 961 + Did: env.RepoIdentity.Did, 962 + Seq: env.RepoIdentity.Seq, 963 + Time: env.RepoIdentity.Time, 964 + Handle: env.RepoIdentity.Handle, 943 965 }, 944 966 }) 945 967 if err != nil { ··· 948 970 } 949 971 950 972 return nil 973 + case env.RepoAccount != nil: 974 + span.SetAttributes( 975 + attribute.String("did", env.RepoAccount.Did), 976 + attribute.Int64("seq", env.RepoAccount.Seq), 977 + attribute.Bool("active", env.RepoAccount.Active), 978 + ) 979 + 980 + if env.RepoAccount.Status != nil { 981 + span.SetAttributes(attribute.String("repo_status", *env.RepoAccount.Status)) 982 + } 983 + 984 + log.Infow("bgs got account event", "did", env.RepoAccount.Did) 985 + // Flush any cached DID documents for this user 986 + bgs.didr.FlushCacheFor(env.RepoAccount.Did) 987 + 988 + // Refetch the DID doc to make sure the PDS is still authoritative 989 + ai, err := bgs.createExternalUser(ctx, env.RepoAccount.Did) 990 + if err != nil { 991 + span.RecordError(err) 992 + return err 993 + } 994 + 995 + // Check if the PDS is still authoritative 996 + // if not we don't want to be propagating this account event 997 + if ai.PDS != host.ID { 998 + log.Errorw("account event from non-authoritative pds", 999 + "seq", env.RepoAccount.Seq, 1000 + "did", env.RepoAccount.Did, 1001 + "event_from", host.Host, 1002 + "did_doc_declared_pds", ai.PDS, 1003 + "account_evt", env.RepoAccount, 1004 + ) 1005 + return fmt.Errorf("event from non-authoritative pds") 1006 + } 1007 + 1008 + // Process the account status change 1009 + repoStatus := events.AccountStatusActive 1010 + if !env.RepoAccount.Active && env.RepoAccount.Status != nil { 1011 + repoStatus = *env.RepoAccount.Status 1012 + } 1013 + 1014 + err = bgs.UpdateAccountStatus(ctx, env.RepoAccount.Did, repoStatus) 1015 + if err != nil { 1016 + span.RecordError(err) 1017 + return fmt.Errorf("failed to update account status: %w", err) 1018 + } 1019 + 1020 + shouldBeActive := env.RepoAccount.Active 1021 + status := env.RepoAccount.Status 1022 + u, err := bgs.lookupUserByDid(ctx, env.RepoAccount.Did) 1023 + if err != nil { 1024 + return fmt.Errorf("failed to look up user by did: %w", err) 1025 + } 1026 + 1027 + if u.TakenDown { 1028 + shouldBeActive = false 1029 + status = &events.AccountStatusTakendown 1030 + } 1031 + 1032 + // Broadcast the account event to all consumers 1033 + err = bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{ 1034 + RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 1035 + Did: env.RepoAccount.Did, 1036 + Seq: env.RepoAccount.Seq, 1037 + Time: env.RepoAccount.Time, 1038 + Active: shouldBeActive, 1039 + Status: status, 1040 + }, 1041 + }) 1042 + if err != nil { 1043 + log.Errorw("failed to broadcast Account event", "error", err, "did", env.RepoAccount.Did) 1044 + return fmt.Errorf("failed to broadcast Account event: %w", err) 1045 + } 1046 + 1047 + return nil 951 1048 case env.RepoMigrate != nil: 952 1049 if _, err := bgs.createExternalUser(ctx, env.RepoMigrate.Did); err != nil { 953 1050 return err ··· 1001 1098 1002 1099 // TODO: rename? This also updates users, and 'external' is an old phrasing 1003 1100 func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.ActorInfo, error) { 1004 - ctx, span := otel.Tracer("bgs").Start(ctx, "createExternalUser") 1101 + ctx, span := tracer.Start(ctx, "createExternalUser") 1005 1102 defer span.End() 1006 1103 1007 1104 externalUserCreationAttempts.Inc() ··· 1236 1333 return subj, nil 1237 1334 } 1238 1335 1336 + func (bgs *BGS) UpdateAccountStatus(ctx context.Context, did string, status string) error { 1337 + ctx, span := tracer.Start(ctx, "UpdateAccountStatus") 1338 + defer span.End() 1339 + 1340 + span.SetAttributes( 1341 + attribute.String("did", did), 1342 + attribute.String("status", status), 1343 + ) 1344 + 1345 + u, err := bgs.lookupUserByDid(ctx, did) 1346 + if err != nil { 1347 + return err 1348 + } 1349 + 1350 + switch status { 1351 + case events.AccountStatusActive: 1352 + // Unset the PDS-specific status flags 1353 + if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusActive).Error; err != nil { 1354 + return fmt.Errorf("failed to set user active status: %w", err) 1355 + } 1356 + case events.AccountStatusDeactivated: 1357 + if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusDeactivated).Error; err != nil { 1358 + return fmt.Errorf("failed to set user deactivation status: %w", err) 1359 + } 1360 + case events.AccountStatusSuspended: 1361 + if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusSuspended).Error; err != nil { 1362 + return fmt.Errorf("failed to set user suspension status: %w", err) 1363 + } 1364 + case events.AccountStatusTakendown: 1365 + if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusTakendown).Error; err != nil { 1366 + return fmt.Errorf("failed to set user taken down status: %w", err) 1367 + } 1368 + 1369 + if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{ 1370 + "handle": nil, 1371 + }).Error; err != nil { 1372 + return err 1373 + } 1374 + case events.AccountStatusDeleted: 1375 + if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumns(map[string]any{ 1376 + "tombstoned": true, 1377 + "handle": nil, 1378 + "upstream_status": events.AccountStatusDeleted, 1379 + }).Error; err != nil { 1380 + return err 1381 + } 1382 + 1383 + if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{ 1384 + "handle": nil, 1385 + }).Error; err != nil { 1386 + return err 1387 + } 1388 + 1389 + // delete data from carstore 1390 + if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil { 1391 + // don't let a failure here prevent us from propagating this event 1392 + log.Errorf("failed to delete user data from carstore: %s", err) 1393 + } 1394 + } 1395 + 1396 + return nil 1397 + } 1398 + 1239 1399 func (bgs *BGS) TakeDownRepo(ctx context.Context, did string) error { 1240 1400 u, err := bgs.lookupUserByDid(ctx, did) 1241 1401 if err != nil { ··· 1332 1492 } 1333 1493 1334 1494 func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error { 1335 - ctx, span := otel.Tracer("bgs").Start(ctx, "ResyncPDS") 1495 + ctx, span := tracer.Start(ctx, "ResyncPDS") 1336 1496 defer span.End() 1337 1497 log := log.With("pds", pds.Host, "source", "resync_pds") 1338 1498 resync, found := bgs.LoadOrStoreResync(pds)
+15
bgs/fedmgr.go
··· 606 606 607 607 return nil 608 608 }, 609 + RepoAccount: func(acct *comatproto.SyncSubscribeRepos_Account) error { 610 + log.Infow("account event", "did", acct.Did, "status", acct.Status) 611 + if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{ 612 + RepoAccount: acct, 613 + }); err != nil { 614 + log.Errorf("failed handling event from %q (%d): %s", host.Host, acct.Seq, err) 615 + } 616 + *lastCursor = acct.Seq 617 + 618 + if err := s.updateCursor(sub, *lastCursor); err != nil { 619 + return fmt.Errorf("updating cursor: %w", err) 620 + } 621 + 622 + return nil 623 + }, 609 624 // TODO: all the other event types (handle change, migration, etc) 610 625 Error: func(errf *events.ErrorFrame) error { 611 626 switch errf.Error {
+45 -4
bgs/handlers.go
··· 14 14 atproto "github.com/bluesky-social/indigo/api/atproto" 15 15 comatprototypes "github.com/bluesky-social/indigo/api/atproto" 16 16 "github.com/bluesky-social/indigo/carstore" 17 + "github.com/bluesky-social/indigo/events" 17 18 "github.com/bluesky-social/indigo/mst" 18 19 "gorm.io/gorm" 19 20 ··· 39 40 } 40 41 41 42 if u.TakenDown { 42 - return nil, fmt.Errorf("account was taken down") 43 + return nil, fmt.Errorf("account was taken down by the Relay") 44 + } 45 + 46 + if u.UpstreamStatus == events.AccountStatusTakendown { 47 + return nil, fmt.Errorf("account was taken down by its PDS") 48 + } 49 + 50 + if u.UpstreamStatus == events.AccountStatusDeactivated { 51 + return nil, fmt.Errorf("account is temporarily deactivated") 52 + } 53 + 54 + if u.UpstreamStatus == events.AccountStatusSuspended { 55 + return nil, fmt.Errorf("account is suspended by its PDS") 43 56 } 44 57 45 58 root, blocks, err := s.repoman.GetRecordProof(ctx, u.ID, collection, rkey) ··· 84 97 } 85 98 86 99 if u.TakenDown { 87 - return nil, fmt.Errorf("account was taken down") 100 + return nil, fmt.Errorf("account was taken down by the Relay") 101 + } 102 + 103 + if u.UpstreamStatus == events.AccountStatusTakendown { 104 + return nil, fmt.Errorf("account was taken down by its PDS") 105 + } 106 + 107 + if u.UpstreamStatus == events.AccountStatusDeactivated { 108 + return nil, fmt.Errorf("account is temporarily deactivated") 109 + } 110 + 111 + if u.UpstreamStatus == events.AccountStatusSuspended { 112 + return nil, fmt.Errorf("account is suspended by its PDS") 88 113 } 89 114 90 115 // TODO: stream the response ··· 185 210 } 186 211 187 212 users := []User{} 188 - if err := s.db.Model(&User{}).Where("id > ? AND NOT tombstoned AND NOT taken_down", c).Order("id").Limit(limit).Find(&users).Error; err != nil { 213 + 214 + q := fmt.Sprintf("id > ? AND NOT tombstoned AND NOT taken_down AND upstream_status != '%s' AND upstream_status != '%s' AND upstream_status != '%s'", 215 + events.AccountStatusDeactivated, events.AccountStatusSuspended, events.AccountStatusTakendown) 216 + 217 + if err := s.db.Model(&User{}).Where(q, c).Order("id").Limit(limit).Find(&users).Error; err != nil { 189 218 if err == gorm.ErrRecordNotFound { 190 219 return &comatprototypes.SyncListRepos_Output{}, nil 191 220 } ··· 234 263 } 235 264 236 265 if u.TakenDown { 237 - return nil, fmt.Errorf("account was taken down") 266 + return nil, fmt.Errorf("account was taken down by the Relay") 267 + } 268 + 269 + if u.UpstreamStatus == events.AccountStatusTakendown { 270 + return nil, fmt.Errorf("account was taken down by its PDS") 271 + } 272 + 273 + if u.UpstreamStatus == events.AccountStatusDeactivated { 274 + return nil, fmt.Errorf("account is temporarily deactivated") 275 + } 276 + 277 + if u.UpstreamStatus == events.AccountStatusSuspended { 278 + return nil, fmt.Errorf("account is suspended by its PDS") 238 279 } 239 280 240 281 root, err := s.repoman.GetRepoRoot(ctx, u.ID)
+19
events/consumer.go
··· 18 18 RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error 19 19 RepoHandle func(evt *comatproto.SyncSubscribeRepos_Handle) error 20 20 RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error 21 + RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error 21 22 RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error 22 23 RepoMigrate func(evt *comatproto.SyncSubscribeRepos_Migrate) error 23 24 RepoTombstone func(evt *comatproto.SyncSubscribeRepos_Tombstone) error ··· 38 39 return rsc.RepoMigrate(xev.RepoMigrate) 39 40 case xev.RepoIdentity != nil && rsc.RepoIdentity != nil: 40 41 return rsc.RepoIdentity(xev.RepoIdentity) 42 + case xev.RepoAccount != nil && rsc.RepoAccount != nil: 43 + return rsc.RepoAccount(xev.RepoAccount) 41 44 case xev.RepoTombstone != nil && rsc.RepoTombstone != nil: 42 45 return rsc.RepoTombstone(xev.RepoTombstone) 43 46 case xev.LabelLabels != nil && rsc.LabelLabels != nil: ··· 230 233 231 234 if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{ 232 235 RepoIdentity: &evt, 236 + }); err != nil { 237 + return err 238 + } 239 + case "#account": 240 + var evt comatproto.SyncSubscribeRepos_Account 241 + if err := evt.UnmarshalCBOR(r); err != nil { 242 + return err 243 + } 244 + 245 + if evt.Seq < lastSeq { 246 + log.Errorf("Got events out of order from stream (seq = %d, prev = %d)", evt.Seq, lastSeq) 247 + } 248 + lastSeq = evt.Seq 249 + 250 + if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{ 251 + RepoAccount: &evt, 233 252 }); err != nil { 234 253 return err 235 254 }
+50
events/dbpersist.go
··· 79 79 Type string 80 80 Rebase bool 81 81 82 + // Active and Status are only set on RepoAccount events 83 + Active bool 84 + Status *string 85 + 82 86 Ops []byte 83 87 } 84 88 ··· 167 171 e.RepoHandle.Seq = int64(item.Seq) 168 172 case e.RepoIdentity != nil: 169 173 e.RepoIdentity.Seq = int64(item.Seq) 174 + case e.RepoAccount != nil: 175 + e.RepoAccount.Seq = int64(item.Seq) 170 176 case e.RepoTombstone != nil: 171 177 e.RepoTombstone.Seq = int64(item.Seq) 172 178 default: ··· 218 224 if err != nil { 219 225 return err 220 226 } 227 + case e.RepoAccount != nil: 228 + rer, err = p.RecordFromRepoAccount(ctx, e.RepoAccount) 229 + if err != nil { 230 + return err 231 + } 221 232 case e.RepoTombstone != nil: 222 233 rer, err = p.RecordFromTombstone(ctx, e.RepoTombstone) 223 234 if err != nil { ··· 270 281 Time: t, 271 282 }, nil 272 283 } 284 + 285 + func (p *DbPersistence) RecordFromRepoAccount(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account) (*RepoEventRecord, error) { 286 + t, err := time.Parse(util.ISO8601, evt.Time) 287 + if err != nil { 288 + return nil, err 289 + } 290 + 291 + uid, err := p.uidForDid(ctx, evt.Did) 292 + if err != nil { 293 + return nil, err 294 + } 295 + 296 + return &RepoEventRecord{ 297 + Repo: uid, 298 + Type: "repo_account", 299 + Time: t, 300 + Active: evt.Active, 301 + Status: evt.Status, 302 + }, nil 303 + } 304 + 273 305 func (p *DbPersistence) RecordFromTombstone(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Tombstone) (*RepoEventRecord, error) { 274 306 t, err := time.Parse(util.ISO8601, evt.Time) 275 307 if err != nil { ··· 422 454 streamEvent, err = p.hydrateHandleChange(ctx, record) 423 455 case record.Type == "repo_identity": 424 456 streamEvent, err = p.hydrateIdentityEvent(ctx, record) 457 + case record.Type == "repo_account": 458 + streamEvent, err = p.hydrateAccountEvent(ctx, record) 425 459 case record.Type == "repo_tombstone": 426 460 streamEvent, err = p.hydrateTombstone(ctx, record) 427 461 default: ··· 515 549 RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{ 516 550 Did: did, 517 551 Time: rer.Time.Format(util.ISO8601), 552 + }, 553 + }, nil 554 + } 555 + 556 + func (p *DbPersistence) hydrateAccountEvent(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) { 557 + did, err := p.didForUid(ctx, rer.Repo) 558 + if err != nil { 559 + return nil, err 560 + } 561 + 562 + return &XRPCStreamEvent{ 563 + RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 564 + Did: did, 565 + Time: rer.Time.Format(util.ISO8601), 566 + Active: rer.Active, 567 + Status: rer.Status, 518 568 }, 519 569 }, nil 520 570 }
+18
events/diskpersist.go
··· 277 277 evtKindHandle = 2 278 278 evtKindTombstone = 3 279 279 evtKindIdentity = 4 280 + evtKindAccount = 5 280 281 ) 281 282 282 283 var emptyHeader = make([]byte, headerSize) ··· 454 455 e.RepoHandle.Seq = seq 455 456 case e.RepoIdentity != nil: 456 457 e.RepoIdentity.Seq = seq 458 + case e.RepoAccount != nil: 459 + e.RepoAccount.Seq = seq 457 460 case e.RepoTombstone != nil: 458 461 e.RepoTombstone.Seq = seq 459 462 default: ··· 512 515 evtKind = evtKindIdentity 513 516 did = e.RepoIdentity.Did 514 517 if err := e.RepoIdentity.MarshalCBOR(cw); err != nil { 518 + return fmt.Errorf("failed to marshal: %w", err) 519 + } 520 + case e.RepoAccount != nil: 521 + evtKind = evtKindAccount 522 + did = e.RepoAccount.Did 523 + if err := e.RepoAccount.MarshalCBOR(cw); err != nil { 515 524 return fmt.Errorf("failed to marshal: %w", err) 516 525 } 517 526 case e.RepoTombstone != nil: ··· 748 757 } 749 758 evt.Seq = h.Seq 750 759 if err := cb(&XRPCStreamEvent{RepoIdentity: &evt}); err != nil { 760 + return nil, err 761 + } 762 + case evtKindAccount: 763 + var evt atproto.SyncSubscribeRepos_Account 764 + if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 765 + return nil, err 766 + } 767 + evt.Seq = h.Seq 768 + if err := cb(&XRPCStreamEvent{RepoAccount: &evt}); err != nil { 751 769 return nil, err 752 770 } 753 771 case evtKindTombstone:
+9
events/events.go
··· 133 133 MsgType string `cborgen:"t"` 134 134 } 135 135 136 + var ( 137 + AccountStatusActive = "active" 138 + AccountStatusTakendown = "takendown" 139 + AccountStatusSuspended = "suspended" 140 + AccountStatusDeleted = "deleted" 141 + AccountStatusDeactivated = "deactivated" 142 + ) 143 + 136 144 type XRPCStreamEvent struct { 137 145 Error *ErrorFrame 138 146 RepoCommit *comatproto.SyncSubscribeRepos_Commit ··· 141 149 RepoInfo *comatproto.SyncSubscribeRepos_Info 142 150 RepoMigrate *comatproto.SyncSubscribeRepos_Migrate 143 151 RepoTombstone *comatproto.SyncSubscribeRepos_Tombstone 152 + RepoAccount *comatproto.SyncSubscribeRepos_Account 144 153 LabelLabels *comatproto.LabelSubscribeLabels_Labels 145 154 LabelInfo *comatproto.LabelSubscribeLabels_Info 146 155
+2
events/persist.go
··· 45 45 e.RepoHandle.Seq = mp.seq 46 46 case e.RepoIdentity != nil: 47 47 e.RepoIdentity.Seq = mp.seq 48 + case e.RepoAccount != nil: 49 + e.RepoAccount.Seq = mp.seq 48 50 case e.RepoMigrate != nil: 49 51 e.RepoMigrate.Seq = mp.seq 50 52 case e.RepoTombstone != nil:
+2
events/yolopersist.go
··· 31 31 e.RepoHandle.Seq = yp.seq 32 32 case e.RepoIdentity != nil: 33 33 e.RepoIdentity.Seq = yp.seq 34 + case e.RepoAccount != nil: 35 + e.RepoAccount.Seq = yp.seq 34 36 case e.RepoMigrate != nil: 35 37 e.RepoMigrate.Seq = yp.seq 36 38 case e.RepoTombstone != nil:
+1
gen/main.go
··· 87 87 atproto.SyncSubscribeRepos_Commit{}, 88 88 atproto.SyncSubscribeRepos_Handle{}, 89 89 atproto.SyncSubscribeRepos_Identity{}, 90 + atproto.SyncSubscribeRepos_Account{}, 90 91 atproto.SyncSubscribeRepos_Info{}, 91 92 atproto.SyncSubscribeRepos_Migrate{}, 92 93 atproto.SyncSubscribeRepos_RepoOp{},
+132
pds/server.go
··· 326 326 return true 327 327 case "/.well-known/atproto-did": 328 328 return true 329 + case "/takedownRepo": 330 + return true 331 + case "/suspendRepo": 332 + return true 333 + case "/deactivateRepo": 334 + return true 335 + case "/reactivateRepo": 336 + return true 329 337 default: 330 338 return false 331 339 } ··· 347 355 ctx.Response().WriteHeader(500) 348 356 } 349 357 358 + e.GET("/takedownRepo", func(c echo.Context) error { 359 + ctx := c.Request().Context() 360 + did := c.QueryParam("did") 361 + if did == "" { 362 + return fmt.Errorf("missing did") 363 + } 364 + 365 + if err := s.TakedownRepo(ctx, did); err != nil { 366 + return err 367 + } 368 + 369 + return c.String(200, "ok") 370 + }) 371 + 372 + e.GET("/suspendRepo", func(c echo.Context) error { 373 + ctx := c.Request().Context() 374 + did := c.QueryParam("did") 375 + if did == "" { 376 + return fmt.Errorf("missing did") 377 + } 378 + 379 + if err := s.SuspendRepo(ctx, did); err != nil { 380 + return err 381 + } 382 + 383 + return c.String(200, "ok") 384 + }) 385 + 386 + e.GET("/deactivateRepo", func(c echo.Context) error { 387 + ctx := c.Request().Context() 388 + did := c.QueryParam("did") 389 + if did == "" { 390 + return fmt.Errorf("missing did") 391 + } 392 + 393 + if err := s.DeactivateRepo(ctx, did); err != nil { 394 + return err 395 + } 396 + 397 + return c.String(200, "ok") 398 + }) 399 + 400 + e.GET("/reactivateRepo", func(c echo.Context) error { 401 + ctx := c.Request().Context() 402 + did := c.QueryParam("did") 403 + if did == "" { 404 + return fmt.Errorf("missing did") 405 + } 406 + 407 + if err := s.ReactivateRepo(ctx, did); err != nil { 408 + return err 409 + } 410 + 411 + return c.String(200, "ok") 412 + }) 413 + 350 414 e.Use(middleware.JWTWithConfig(cfg), s.userCheckMiddleware) 351 415 s.RegisterHandlersComAtproto(e) 416 + 352 417 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler) 353 418 e.GET("/xrpc/_health", s.HandleHealthCheck) 354 419 e.GET("/.well-known/atproto-did", s.HandleResolveDid) ··· 641 706 case evt.RepoIdentity != nil: 642 707 header.MsgType = "#identity" 643 708 obj = evt.RepoIdentity 709 + case evt.RepoAccount != nil: 710 + header.MsgType = "#account" 711 + obj = evt.RepoAccount 644 712 case evt.RepoInfo != nil: 645 713 header.MsgType = "#info" 646 714 obj = evt.RepoInfo ··· 709 777 RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{ 710 778 Did: u.Did, 711 779 Time: time.Now().Format(util.ISO8601), 780 + }, 781 + }); err != nil { 782 + return fmt.Errorf("failed to push event: %s", err) 783 + } 784 + 785 + return nil 786 + } 787 + 788 + func (s *Server) TakedownRepo(ctx context.Context, did string) error { 789 + // Push an Account event 790 + if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{ 791 + RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 792 + Did: did, 793 + Active: false, 794 + Status: &events.AccountStatusTakendown, 795 + Time: time.Now().Format(util.ISO8601), 796 + }, 797 + }); err != nil { 798 + return fmt.Errorf("failed to push event: %s", err) 799 + } 800 + 801 + return nil 802 + } 803 + 804 + func (s *Server) SuspendRepo(ctx context.Context, did string) error { 805 + // Push an Account event 806 + if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{ 807 + RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 808 + Did: did, 809 + Active: false, 810 + Status: &events.AccountStatusSuspended, 811 + Time: time.Now().Format(util.ISO8601), 812 + }, 813 + }); err != nil { 814 + return fmt.Errorf("failed to push event: %s", err) 815 + } 816 + 817 + return nil 818 + } 819 + 820 + func (s *Server) DeactivateRepo(ctx context.Context, did string) error { 821 + // Push an Account event 822 + if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{ 823 + RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 824 + Did: did, 825 + Active: false, 826 + Status: &events.AccountStatusDeactivated, 827 + Time: time.Now().Format(util.ISO8601), 828 + }, 829 + }); err != nil { 830 + return fmt.Errorf("failed to push event: %s", err) 831 + } 832 + 833 + return nil 834 + } 835 + 836 + func (s *Server) ReactivateRepo(ctx context.Context, did string) error { 837 + // Push an Account event 838 + if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{ 839 + RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 840 + Did: did, 841 + Active: true, 842 + Status: &events.AccountStatusActive, 843 + Time: time.Now().Format(util.ISO8601), 712 844 }, 713 845 }); err != nil { 714 846 return fmt.Errorf("failed to push event: %s", err)
+7
sonar/sonar.go
··· 133 133 s.Progress.LastSeq = xe.RepoIdentity.Seq 134 134 s.Progress.LastSeqProcessedAt = now 135 135 s.ProgMux.Unlock() 136 + case xe.RepoAccount != nil: 137 + eventsProcessedCounter.WithLabelValues("account", s.SocketURL).Inc() 138 + now := time.Now() 139 + s.ProgMux.Lock() 140 + s.Progress.LastSeq = xe.RepoAccount.Seq 141 + s.Progress.LastSeqProcessedAt = now 142 + s.ProgMux.Unlock() 136 143 case xe.RepoInfo != nil: 137 144 eventsProcessedCounter.WithLabelValues("repo_info", s.SocketURL).Inc() 138 145 case xe.RepoMigrate != nil:
+105
testing/integ_test.go
··· 11 11 "time" 12 12 13 13 atproto "github.com/bluesky-social/indigo/api/atproto" 14 + "github.com/bluesky-social/indigo/events" 14 15 "github.com/bluesky-social/indigo/repo" 15 16 "github.com/bluesky-social/indigo/xrpc" 16 17 "github.com/ipfs/go-cid" ··· 283 284 fmt.Println(hcevt.RepoHandle) 284 285 idevt := evts.Next() 285 286 fmt.Println(idevt.RepoIdentity) 287 + } 288 + 289 + func TestAccountEvent(t *testing.T) { 290 + assert := assert.New(t) 291 + _ = assert 292 + didr := TestPLC(t) 293 + p1 := MustSetupPDS(t, ".pdsuno", didr) 294 + p1.Run(t) 295 + 296 + b1 := MustSetupRelay(t, didr) 297 + b1.Run(t) 298 + 299 + b1.tr.TrialHosts = []string{p1.RawHost()} 300 + 301 + p1.RequestScraping(t, b1) 302 + p1.BumpLimits(t, b1) 303 + time.Sleep(time.Millisecond * 50) 304 + 305 + evts := b1.Events(t, -1) 306 + 307 + u := p1.MustNewUser(t, usernames[0]+".pdsuno") 308 + 309 + // if the handle changes before the relay processes the first event, things 310 + // get a little weird 311 + time.Sleep(time.Millisecond * 50) 312 + //socialSim(t, []*testUser{u}, 10, 0) 313 + 314 + p1.TakedownRepo(t, u.DID()) 315 + p1.ReactivateRepo(t, u.DID()) 316 + p1.DeactivateRepo(t, u.DID()) 317 + p1.ReactivateRepo(t, u.DID()) 318 + p1.SuspendRepo(t, u.DID()) 319 + p1.ReactivateRepo(t, u.DID()) 320 + 321 + time.Sleep(time.Millisecond * 100) 322 + 323 + initevt := evts.Next() 324 + fmt.Println(initevt.RepoCommit) 325 + 326 + // Takedown 327 + acevt := evts.Next() 328 + fmt.Println(acevt.RepoAccount) 329 + assert.Equal(acevt.RepoAccount.Did, u.DID()) 330 + assert.Equal(acevt.RepoAccount.Active, false) 331 + assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusTakendown) 332 + 333 + // Reactivate 334 + acevt = evts.Next() 335 + fmt.Println(acevt.RepoAccount) 336 + assert.Equal(acevt.RepoAccount.Did, u.DID()) 337 + assert.Equal(acevt.RepoAccount.Active, true) 338 + assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) 339 + 340 + // Deactivate 341 + acevt = evts.Next() 342 + fmt.Println(acevt.RepoAccount) 343 + assert.Equal(acevt.RepoAccount.Did, u.DID()) 344 + assert.Equal(acevt.RepoAccount.Active, false) 345 + assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusDeactivated) 346 + 347 + // Reactivate 348 + acevt = evts.Next() 349 + fmt.Println(acevt.RepoAccount) 350 + assert.Equal(acevt.RepoAccount.Did, u.DID()) 351 + assert.Equal(acevt.RepoAccount.Active, true) 352 + assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) 353 + 354 + // Suspend 355 + acevt = evts.Next() 356 + fmt.Println(acevt.RepoAccount) 357 + assert.Equal(acevt.RepoAccount.Did, u.DID()) 358 + assert.Equal(acevt.RepoAccount.Active, false) 359 + assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusSuspended) 360 + 361 + // Reactivate 362 + acevt = evts.Next() 363 + fmt.Println(acevt.RepoAccount) 364 + assert.Equal(acevt.RepoAccount.Did, u.DID()) 365 + assert.Equal(acevt.RepoAccount.Active, true) 366 + assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) 367 + 368 + // Takedown at Relay level, then emit active event and make sure relay overrides it 369 + b1.bgs.TakeDownRepo(context.TODO(), u.DID()) 370 + p1.ReactivateRepo(t, u.DID()) 371 + 372 + time.Sleep(time.Millisecond * 20) 373 + 374 + acevt = evts.Next() 375 + fmt.Println(acevt.RepoAccount) 376 + assert.Equal(acevt.RepoAccount.Did, u.DID()) 377 + assert.Equal(acevt.RepoAccount.Active, false) 378 + assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusTakendown) 379 + 380 + // Reactivate at Relay level, then emit an active account event and make sure relay passes it through 381 + b1.bgs.ReverseTakedown(context.TODO(), u.DID()) 382 + p1.ReactivateRepo(t, u.DID()) 383 + 384 + time.Sleep(time.Millisecond * 20) 385 + 386 + acevt = evts.Next() 387 + fmt.Println(acevt.RepoAccount) 388 + assert.Equal(acevt.RepoAccount.Did, u.DID()) 389 + assert.Equal(acevt.RepoAccount.Active, true) 390 + assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) 286 391 } 287 392 288 393 func TestRelayTakedown(t *testing.T) {
+75
testing/utils.go
··· 298 298 }, nil 299 299 } 300 300 301 + func (tp *TestPDS) TakedownRepo(t *testing.T, did string) { 302 + req, err := http.NewRequest("GET", tp.HTTPHost()+"/takedownRepo?did="+did, nil) 303 + if err != nil { 304 + t.Fatal(err) 305 + } 306 + 307 + client := &http.Client{} 308 + resp, err := client.Do(req) 309 + if err != nil { 310 + t.Fatal(err) 311 + } 312 + 313 + if resp.StatusCode != http.StatusOK { 314 + t.Fatal("expected 200 OK, got: ", resp.Status) 315 + } 316 + } 317 + 318 + func (tp *TestPDS) SuspendRepo(t *testing.T, did string) { 319 + req, err := http.NewRequest("GET", tp.HTTPHost()+"/suspendRepo?did="+did, nil) 320 + if err != nil { 321 + t.Fatal(err) 322 + } 323 + 324 + client := &http.Client{} 325 + resp, err := client.Do(req) 326 + if err != nil { 327 + t.Fatal(err) 328 + } 329 + 330 + if resp.StatusCode != http.StatusOK { 331 + t.Fatal("expected 200 OK, got: ", resp.Status) 332 + } 333 + } 334 + 335 + func (tp *TestPDS) DeactivateRepo(t *testing.T, did string) { 336 + req, err := http.NewRequest("GET", tp.HTTPHost()+"/deactivateRepo?did="+did, nil) 337 + if err != nil { 338 + t.Fatal(err) 339 + } 340 + 341 + client := &http.Client{} 342 + resp, err := client.Do(req) 343 + if err != nil { 344 + t.Fatal(err) 345 + } 346 + 347 + if resp.StatusCode != http.StatusOK { 348 + t.Fatal("expected 200 OK, got: ", resp.Status) 349 + } 350 + } 351 + 352 + func (tp *TestPDS) ReactivateRepo(t *testing.T, did string) { 353 + req, err := http.NewRequest("GET", tp.HTTPHost()+"/reactivateRepo?did="+did, nil) 354 + if err != nil { 355 + t.Fatal(err) 356 + } 357 + 358 + client := &http.Client{} 359 + resp, err := client.Do(req) 360 + if err != nil { 361 + t.Fatal(err) 362 + } 363 + 364 + if resp.StatusCode != http.StatusOK { 365 + t.Fatal("expected 200 OK, got: ", resp.Status) 366 + } 367 + } 368 + 301 369 func (u *TestUser) Reply(t *testing.T, replyto, root *atproto.RepoStrongRef, body string) string { 302 370 t.Helper() 303 371 ··· 609 677 fmt.Println("received identity event: ", evt.Seq, evt.Did) 610 678 es.Lk.Lock() 611 679 es.Events = append(es.Events, &events.XRPCStreamEvent{RepoIdentity: evt}) 680 + es.Lk.Unlock() 681 + return nil 682 + }, 683 + RepoAccount: func(evt *atproto.SyncSubscribeRepos_Account) error { 684 + fmt.Println("received account event: ", evt.Seq, evt.Did) 685 + es.Lk.Lock() 686 + es.Events = append(es.Events, &events.XRPCStreamEvent{RepoAccount: evt}) 612 687 es.Lk.Unlock() 613 688 return nil 614 689 },