this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

improve account status updates; emit #account for local takedowns

+45 -12
+4 -5
cmd/relay/handlers_admin.go
··· 15 15 16 16 "github.com/labstack/echo/v4" 17 17 dto "github.com/prometheus/client_model/go" 18 - "gorm.io/gorm" 19 18 ) 20 19 21 20 // this is the same as the regular com.atproto.sync.requestCrawl endpoint, except it sets a flag to bypass configuration checks ··· 85 84 return err 86 85 } 87 86 88 - if err := s.relay.UpdateAccountStatus(ctx, did, models.AccountStatusTakendown); err != nil { 89 - if errors.Is(err, gorm.ErrRecordNotFound) { 87 + if err := s.relay.UpdateAccountLocalStatus(ctx, did, models.AccountStatusTakendown, true); err != nil { 88 + if errors.Is(err, relay.ErrAccountNotFound) { 90 89 return &echo.HTTPError{ 91 90 Code: http.StatusNotFound, 92 91 Message: "account not found", ··· 112 111 return err 113 112 } 114 113 115 - if err := s.relay.UpdateAccountStatus(ctx, did, models.AccountStatusActive); err != nil { 116 - if errors.Is(err, gorm.ErrRecordNotFound) { 114 + if err := s.relay.UpdateAccountLocalStatus(ctx, did, models.AccountStatusActive, true); err != nil { 115 + if errors.Is(err, relay.ErrAccountNotFound) { 117 116 return &echo.HTTPError{ 118 117 Code: http.StatusNotFound, 119 118 Message: "repo not found",
+40 -2
cmd/relay/relay/account.go
··· 6 6 "fmt" 7 7 "strings" 8 8 9 + comatproto "github.com/bluesky-social/indigo/api/atproto" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 10 11 "github.com/bluesky-social/indigo/cmd/relay/relay/models" 12 + "github.com/bluesky-social/indigo/cmd/relay/stream" 11 13 12 14 "gorm.io/gorm" 13 15 ) ··· 181 183 return nil 182 184 } 183 185 184 - func (r *Relay) UpdateAccountStatus(ctx context.Context, did syntax.DID, status models.AccountStatus) error { 186 + // This updates the account's "upstream" status (eg, at the account's PDS). Usually this is called in response to an `#account` event. 187 + // 188 + // The DID and UID are both required, and *must* match; it is assumed that calling code has already done an account lookup. 189 + func (r *Relay) UpdateAccountUpstreamStatus(ctx context.Context, did syntax.DID, uid uint64, status models.AccountStatus) error { 190 + 191 + if err := r.db.Model(models.Account{}).Where("uid = ?", uid).Update("upstream_status", status).Error; err != nil { 192 + return err 193 + } 194 + 195 + // clear account cache 196 + r.accountCache.Remove(did.String()) 197 + 198 + return nil 199 + } 200 + 201 + // This method updates the "local" account status (as opposed to "upstream" status, eg at the account's PDS). 202 + // 203 + // If the `emitEvent` flag is set true, a `#account` event is broadcase. This should be used for account-level takedowns. 204 + func (r *Relay) UpdateAccountLocalStatus(ctx context.Context, did syntax.DID, status models.AccountStatus, emitEvent bool) error { 185 205 acc, err := r.GetAccount(ctx, did) 186 206 if err != nil { 187 207 return err ··· 194 214 // clear account cache 195 215 r.accountCache.Remove(did.String()) 196 216 197 - // NOTE: not wiping events for user from persister (backfill window) 217 + // update copy of row for computing public status field 218 + acc.Status = status 219 + 220 + if emitEvent { 221 + err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{ 222 + RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 223 + Active: acc.IsActive(), 224 + Did: acc.DID, 225 + Status: acc.StatusField(), 226 + Time: syntax.DatetimeNow().String(), 227 + }, 228 + PrivUid: acc.UID, 229 + }) 230 + if err != nil { 231 + r.Logger.Error("failed to emit #account event after status change", "did", did, "newStatus", status, "error", err) 232 + return fmt.Errorf("failed to broadcast #account event: %w", err) 233 + } 234 + } 235 + 198 236 return nil 199 237 } 200 238
+1 -5
cmd/relay/relay/ingest.go
··· 267 267 } 268 268 269 269 if newStatus != acc.UpstreamStatus { 270 - // updates upstream status in account database 271 - if err := r.db.Model(models.Account{}).Where("uid = ?", acc.UID).Update("upstream_status", newStatus).Error; err != nil { 270 + if err := r.UpdateAccountUpstreamStatus(ctx, syntax.DID(acc.DID), acc.UID, newStatus); err != nil { 272 271 return err 273 272 } 274 273 acc.UpstreamStatus = newStatus 275 - 276 - // clear account cache 277 - r.accountCache.Remove(acc.DID) 278 274 } 279 275 280 276 // emit the event