this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

automod consuming ozone events (#698)

This has automod consume events from Ozone and feed them in to the rules
engine, similar to firehose events. It uses the queryEvents endpoint,
polling every 5 seconds. This is all optional; if an ozone admin client
isn't configured, it doesn't happen.

- some types of ozone events on an account subject result in account
metadata getting flushed. this ensures that, eg, manually-added labels
or takedowns end up in automod-visible cached account metadata
- harassment protection rule is updated to look in private tag state
from ozone, not just a deployed config JSON file. this was actually more
complex than I would thought, and results in additional account metadata
fetches, but those are all cached (with cache purging!) so should be
fine
- private/admin account metadata queries go to ozone, not PDS/entryway,
by default. this could increase API load on ozone a bunch; we might need
to optimize that code path
- ignores events caused by mod service automation (eg, automod itself)
- does not currently have any tests :disappointed: but I tested a fair
bit in staging. would go slow and careful with any prod deploy

Some follow-ups for future work (recording these from my local dev
notes):

- automod "flags" and ozone "tags" are kind of duplicate. I originally
thought these would have automod "flags" show up as "tags" in Ozone. but
should think that through again (maybe flags are useful outside ozone?)
and how to refactor things if they should be the same concept
- entirely rip out PDS/entryway private metadata queries, once we are
confident ozone is sufficient
- get publicly-available account creation timestamp working
consistently. this could include tweaks to appview+ozone endpoints, and
possibly even going to PLC directory for more authoritative timestamps
- should probably refactor account metadata struct in to more clearly
distinct "bsky-specific" (from appview), "ozone-specific" (private
metadata), and public account/identity info
- should probably handle account metadata more consistently across
ozone, automod, and other systems. related to the `#identity` and
`#account` refactors, we now have a concept of account status at various
pieces of infrastructure (active, takendown, deactivated, suspended,
deleted), and should represent that cleanly
- `OzoneEventContext` currently extends `AccountContext`, with the
subject account as "the account". currently all ozone events have an
associated subject account, while record is optional. we might end up
with events that don't have a meaningful subject account? or want to
have better ergonomics with a record-variant of ozone event context?
those are more complex, feels like things are fine for now
- would like to refactor the "consumption" code (firehose and ozone
events) out of the `cmd/hepa/` folder and in to `automod/`

Update: created a separate issue to track these refactors:
https://github.com/bluesky-social/indigo/issues/700

authored by

bnewbold and committed by
GitHub
7d123593 01dd2713

+761 -82
+105 -13
api/ozone/moderationdefs.go
··· 143 143 DurationInHours int64 `json:"durationInHours" cborgen:"durationInHours"` 144 144 } 145 145 146 + // ModerationDefs_ModEventMuteReporter is a "modEventMuteReporter" in the tools.ozone.moderation.defs schema. 147 + // 148 + // # Mute incoming reports from an account 149 + // 150 + // RECORDTYPE: ModerationDefs_ModEventMuteReporter 151 + type ModerationDefs_ModEventMuteReporter struct { 152 + LexiconTypeID string `json:"$type,const=tools.ozone.moderation.defs#modEventMuteReporter" cborgen:"$type,const=tools.ozone.moderation.defs#modEventMuteReporter"` 153 + Comment *string `json:"comment,omitempty" cborgen:"comment,omitempty"` 154 + // durationInHours: Indicates how long the account should remain muted. 155 + DurationInHours int64 `json:"durationInHours" cborgen:"durationInHours"` 156 + } 157 + 146 158 // ModerationDefs_ModEventReport is a "modEventReport" in the tools.ozone.moderation.defs schema. 147 159 // 148 160 // # Report a subject ··· 151 163 type ModerationDefs_ModEventReport struct { 152 164 LexiconTypeID string `json:"$type,const=tools.ozone.moderation.defs#modEventReport" cborgen:"$type,const=tools.ozone.moderation.defs#modEventReport"` 153 165 Comment *string `json:"comment,omitempty" cborgen:"comment,omitempty"` 154 - ReportType *string `json:"reportType" cborgen:"reportType"` 166 + // isReporterMuted: Set to true if the reporter was muted from reporting at the time of the event. These reports won't impact the reviewState of the subject. 167 + IsReporterMuted *bool `json:"isReporterMuted,omitempty" cborgen:"isReporterMuted,omitempty"` 168 + ReportType *string `json:"reportType" cborgen:"reportType"` 155 169 } 156 170 157 171 // ModerationDefs_ModEventResolveAppeal is a "modEventResolveAppeal" in the tools.ozone.moderation.defs schema. ··· 214 228 Comment *string `json:"comment,omitempty" cborgen:"comment,omitempty"` 215 229 } 216 230 231 + // ModerationDefs_ModEventUnmuteReporter is a "modEventUnmuteReporter" in the tools.ozone.moderation.defs schema. 232 + // 233 + // # Unmute incoming reports from an account 234 + // 235 + // RECORDTYPE: ModerationDefs_ModEventUnmuteReporter 236 + type ModerationDefs_ModEventUnmuteReporter struct { 237 + LexiconTypeID string `json:"$type,const=tools.ozone.moderation.defs#modEventUnmuteReporter" cborgen:"$type,const=tools.ozone.moderation.defs#modEventUnmuteReporter"` 238 + // comment: Describe reasoning behind the reversal. 239 + Comment *string `json:"comment,omitempty" cborgen:"comment,omitempty"` 240 + } 241 + 217 242 // ModerationDefs_ModEventView is a "modEventView" in the tools.ozone.moderation.defs schema. 218 243 type ModerationDefs_ModEventView struct { 219 244 CreatedAt string `json:"createdAt" cborgen:"createdAt"` ··· 245 270 ModerationDefs_ModEventAcknowledge *ModerationDefs_ModEventAcknowledge 246 271 ModerationDefs_ModEventEscalate *ModerationDefs_ModEventEscalate 247 272 ModerationDefs_ModEventMute *ModerationDefs_ModEventMute 273 + ModerationDefs_ModEventUnmute *ModerationDefs_ModEventUnmute 274 + ModerationDefs_ModEventMuteReporter *ModerationDefs_ModEventMuteReporter 275 + ModerationDefs_ModEventUnmuteReporter *ModerationDefs_ModEventUnmuteReporter 248 276 ModerationDefs_ModEventEmail *ModerationDefs_ModEventEmail 249 277 ModerationDefs_ModEventResolveAppeal *ModerationDefs_ModEventResolveAppeal 250 278 ModerationDefs_ModEventDivert *ModerationDefs_ModEventDivert 279 + ModerationDefs_ModEventTag *ModerationDefs_ModEventTag 251 280 } 252 281 253 282 func (t *ModerationDefs_ModEventViewDetail_Event) MarshalJSON() ([]byte, error) { ··· 283 312 t.ModerationDefs_ModEventMute.LexiconTypeID = "tools.ozone.moderation.defs#modEventMute" 284 313 return json.Marshal(t.ModerationDefs_ModEventMute) 285 314 } 315 + if t.ModerationDefs_ModEventUnmute != nil { 316 + t.ModerationDefs_ModEventUnmute.LexiconTypeID = "tools.ozone.moderation.defs#modEventUnmute" 317 + return json.Marshal(t.ModerationDefs_ModEventUnmute) 318 + } 319 + if t.ModerationDefs_ModEventMuteReporter != nil { 320 + t.ModerationDefs_ModEventMuteReporter.LexiconTypeID = "tools.ozone.moderation.defs#modEventMuteReporter" 321 + return json.Marshal(t.ModerationDefs_ModEventMuteReporter) 322 + } 323 + if t.ModerationDefs_ModEventUnmuteReporter != nil { 324 + t.ModerationDefs_ModEventUnmuteReporter.LexiconTypeID = "tools.ozone.moderation.defs#modEventUnmuteReporter" 325 + return json.Marshal(t.ModerationDefs_ModEventUnmuteReporter) 326 + } 286 327 if t.ModerationDefs_ModEventEmail != nil { 287 328 t.ModerationDefs_ModEventEmail.LexiconTypeID = "tools.ozone.moderation.defs#modEventEmail" 288 329 return json.Marshal(t.ModerationDefs_ModEventEmail) ··· 295 336 t.ModerationDefs_ModEventDivert.LexiconTypeID = "tools.ozone.moderation.defs#modEventDivert" 296 337 return json.Marshal(t.ModerationDefs_ModEventDivert) 297 338 } 339 + if t.ModerationDefs_ModEventTag != nil { 340 + t.ModerationDefs_ModEventTag.LexiconTypeID = "tools.ozone.moderation.defs#modEventTag" 341 + return json.Marshal(t.ModerationDefs_ModEventTag) 342 + } 298 343 return nil, fmt.Errorf("cannot marshal empty enum") 299 344 } 300 345 func (t *ModerationDefs_ModEventViewDetail_Event) UnmarshalJSON(b []byte) error { ··· 328 373 case "tools.ozone.moderation.defs#modEventMute": 329 374 t.ModerationDefs_ModEventMute = new(ModerationDefs_ModEventMute) 330 375 return json.Unmarshal(b, t.ModerationDefs_ModEventMute) 376 + case "tools.ozone.moderation.defs#modEventUnmute": 377 + t.ModerationDefs_ModEventUnmute = new(ModerationDefs_ModEventUnmute) 378 + return json.Unmarshal(b, t.ModerationDefs_ModEventUnmute) 379 + case "tools.ozone.moderation.defs#modEventMuteReporter": 380 + t.ModerationDefs_ModEventMuteReporter = new(ModerationDefs_ModEventMuteReporter) 381 + return json.Unmarshal(b, t.ModerationDefs_ModEventMuteReporter) 382 + case "tools.ozone.moderation.defs#modEventUnmuteReporter": 383 + t.ModerationDefs_ModEventUnmuteReporter = new(ModerationDefs_ModEventUnmuteReporter) 384 + return json.Unmarshal(b, t.ModerationDefs_ModEventUnmuteReporter) 331 385 case "tools.ozone.moderation.defs#modEventEmail": 332 386 t.ModerationDefs_ModEventEmail = new(ModerationDefs_ModEventEmail) 333 387 return json.Unmarshal(b, t.ModerationDefs_ModEventEmail) ··· 337 391 case "tools.ozone.moderation.defs#modEventDivert": 338 392 t.ModerationDefs_ModEventDivert = new(ModerationDefs_ModEventDivert) 339 393 return json.Unmarshal(b, t.ModerationDefs_ModEventDivert) 394 + case "tools.ozone.moderation.defs#modEventTag": 395 + t.ModerationDefs_ModEventTag = new(ModerationDefs_ModEventTag) 396 + return json.Unmarshal(b, t.ModerationDefs_ModEventTag) 340 397 341 398 default: 342 399 return nil ··· 403 460 ModerationDefs_ModEventAcknowledge *ModerationDefs_ModEventAcknowledge 404 461 ModerationDefs_ModEventEscalate *ModerationDefs_ModEventEscalate 405 462 ModerationDefs_ModEventMute *ModerationDefs_ModEventMute 463 + ModerationDefs_ModEventUnmute *ModerationDefs_ModEventUnmute 464 + ModerationDefs_ModEventMuteReporter *ModerationDefs_ModEventMuteReporter 465 + ModerationDefs_ModEventUnmuteReporter *ModerationDefs_ModEventUnmuteReporter 406 466 ModerationDefs_ModEventEmail *ModerationDefs_ModEventEmail 407 467 ModerationDefs_ModEventResolveAppeal *ModerationDefs_ModEventResolveAppeal 408 468 ModerationDefs_ModEventDivert *ModerationDefs_ModEventDivert 469 + ModerationDefs_ModEventTag *ModerationDefs_ModEventTag 409 470 } 410 471 411 472 func (t *ModerationDefs_ModEventView_Event) MarshalJSON() ([]byte, error) { ··· 441 502 t.ModerationDefs_ModEventMute.LexiconTypeID = "tools.ozone.moderation.defs#modEventMute" 442 503 return json.Marshal(t.ModerationDefs_ModEventMute) 443 504 } 505 + if t.ModerationDefs_ModEventUnmute != nil { 506 + t.ModerationDefs_ModEventUnmute.LexiconTypeID = "tools.ozone.moderation.defs#modEventUnmute" 507 + return json.Marshal(t.ModerationDefs_ModEventUnmute) 508 + } 509 + if t.ModerationDefs_ModEventMuteReporter != nil { 510 + t.ModerationDefs_ModEventMuteReporter.LexiconTypeID = "tools.ozone.moderation.defs#modEventMuteReporter" 511 + return json.Marshal(t.ModerationDefs_ModEventMuteReporter) 512 + } 513 + if t.ModerationDefs_ModEventUnmuteReporter != nil { 514 + t.ModerationDefs_ModEventUnmuteReporter.LexiconTypeID = "tools.ozone.moderation.defs#modEventUnmuteReporter" 515 + return json.Marshal(t.ModerationDefs_ModEventUnmuteReporter) 516 + } 444 517 if t.ModerationDefs_ModEventEmail != nil { 445 518 t.ModerationDefs_ModEventEmail.LexiconTypeID = "tools.ozone.moderation.defs#modEventEmail" 446 519 return json.Marshal(t.ModerationDefs_ModEventEmail) ··· 452 525 if t.ModerationDefs_ModEventDivert != nil { 453 526 t.ModerationDefs_ModEventDivert.LexiconTypeID = "tools.ozone.moderation.defs#modEventDivert" 454 527 return json.Marshal(t.ModerationDefs_ModEventDivert) 528 + } 529 + if t.ModerationDefs_ModEventTag != nil { 530 + t.ModerationDefs_ModEventTag.LexiconTypeID = "tools.ozone.moderation.defs#modEventTag" 531 + return json.Marshal(t.ModerationDefs_ModEventTag) 455 532 } 456 533 return nil, fmt.Errorf("cannot marshal empty enum") 457 534 } ··· 486 563 case "tools.ozone.moderation.defs#modEventMute": 487 564 t.ModerationDefs_ModEventMute = new(ModerationDefs_ModEventMute) 488 565 return json.Unmarshal(b, t.ModerationDefs_ModEventMute) 566 + case "tools.ozone.moderation.defs#modEventUnmute": 567 + t.ModerationDefs_ModEventUnmute = new(ModerationDefs_ModEventUnmute) 568 + return json.Unmarshal(b, t.ModerationDefs_ModEventUnmute) 569 + case "tools.ozone.moderation.defs#modEventMuteReporter": 570 + t.ModerationDefs_ModEventMuteReporter = new(ModerationDefs_ModEventMuteReporter) 571 + return json.Unmarshal(b, t.ModerationDefs_ModEventMuteReporter) 572 + case "tools.ozone.moderation.defs#modEventUnmuteReporter": 573 + t.ModerationDefs_ModEventUnmuteReporter = new(ModerationDefs_ModEventUnmuteReporter) 574 + return json.Unmarshal(b, t.ModerationDefs_ModEventUnmuteReporter) 489 575 case "tools.ozone.moderation.defs#modEventEmail": 490 576 t.ModerationDefs_ModEventEmail = new(ModerationDefs_ModEventEmail) 491 577 return json.Unmarshal(b, t.ModerationDefs_ModEventEmail) ··· 495 581 case "tools.ozone.moderation.defs#modEventDivert": 496 582 t.ModerationDefs_ModEventDivert = new(ModerationDefs_ModEventDivert) 497 583 return json.Unmarshal(b, t.ModerationDefs_ModEventDivert) 584 + case "tools.ozone.moderation.defs#modEventTag": 585 + t.ModerationDefs_ModEventTag = new(ModerationDefs_ModEventTag) 586 + return json.Unmarshal(b, t.ModerationDefs_ModEventTag) 498 587 499 588 default: 500 589 return nil ··· 585 674 // RECORDTYPE: ModerationDefs_RepoView 586 675 type ModerationDefs_RepoView struct { 587 676 LexiconTypeID string `json:"$type,const=tools.ozone.moderation.defs#repoView" cborgen:"$type,const=tools.ozone.moderation.defs#repoView"` 677 + DeactivatedAt *string `json:"deactivatedAt,omitempty" cborgen:"deactivatedAt,omitempty"` 588 678 Did string `json:"did" cborgen:"did"` 589 679 Email *string `json:"email,omitempty" cborgen:"email,omitempty"` 590 680 Handle string `json:"handle" cborgen:"handle"` ··· 598 688 599 689 // ModerationDefs_RepoViewDetail is a "repoViewDetail" in the tools.ozone.moderation.defs schema. 600 690 type ModerationDefs_RepoViewDetail struct { 691 + DeactivatedAt *string `json:"deactivatedAt,omitempty" cborgen:"deactivatedAt,omitempty"` 601 692 Did string `json:"did" cborgen:"did"` 602 693 Email *string `json:"email,omitempty" cborgen:"email,omitempty"` 603 694 EmailConfirmedAt *string `json:"emailConfirmedAt,omitempty" cborgen:"emailConfirmedAt,omitempty"` ··· 630 721 CreatedAt string `json:"createdAt" cborgen:"createdAt"` 631 722 Id int64 `json:"id" cborgen:"id"` 632 723 // lastAppealedAt: Timestamp referencing when the author of the subject appealed a moderation action 633 - LastAppealedAt *string `json:"lastAppealedAt,omitempty" cborgen:"lastAppealedAt,omitempty"` 634 - LastReportedAt *string `json:"lastReportedAt,omitempty" cborgen:"lastReportedAt,omitempty"` 635 - LastReviewedAt *string `json:"lastReviewedAt,omitempty" cborgen:"lastReviewedAt,omitempty"` 636 - LastReviewedBy *string `json:"lastReviewedBy,omitempty" cborgen:"lastReviewedBy,omitempty"` 637 - MuteUntil *string `json:"muteUntil,omitempty" cborgen:"muteUntil,omitempty"` 638 - ReviewState *string `json:"reviewState" cborgen:"reviewState"` 639 - Subject *ModerationDefs_SubjectStatusView_Subject `json:"subject" cborgen:"subject"` 640 - SubjectBlobCids []string `json:"subjectBlobCids,omitempty" cborgen:"subjectBlobCids,omitempty"` 641 - SubjectRepoHandle *string `json:"subjectRepoHandle,omitempty" cborgen:"subjectRepoHandle,omitempty"` 642 - SuspendUntil *string `json:"suspendUntil,omitempty" cborgen:"suspendUntil,omitempty"` 643 - Tags []string `json:"tags,omitempty" cborgen:"tags,omitempty"` 644 - Takendown *bool `json:"takendown,omitempty" cborgen:"takendown,omitempty"` 724 + LastAppealedAt *string `json:"lastAppealedAt,omitempty" cborgen:"lastAppealedAt,omitempty"` 725 + LastReportedAt *string `json:"lastReportedAt,omitempty" cborgen:"lastReportedAt,omitempty"` 726 + LastReviewedAt *string `json:"lastReviewedAt,omitempty" cborgen:"lastReviewedAt,omitempty"` 727 + LastReviewedBy *string `json:"lastReviewedBy,omitempty" cborgen:"lastReviewedBy,omitempty"` 728 + MuteReportingUntil *string `json:"muteReportingUntil,omitempty" cborgen:"muteReportingUntil,omitempty"` 729 + MuteUntil *string `json:"muteUntil,omitempty" cborgen:"muteUntil,omitempty"` 730 + ReviewState *string `json:"reviewState" cborgen:"reviewState"` 731 + Subject *ModerationDefs_SubjectStatusView_Subject `json:"subject" cborgen:"subject"` 732 + SubjectBlobCids []string `json:"subjectBlobCids,omitempty" cborgen:"subjectBlobCids,omitempty"` 733 + SubjectRepoHandle *string `json:"subjectRepoHandle,omitempty" cborgen:"subjectRepoHandle,omitempty"` 734 + SuspendUntil *string `json:"suspendUntil,omitempty" cborgen:"suspendUntil,omitempty"` 735 + Tags []string `json:"tags,omitempty" cborgen:"tags,omitempty"` 736 + Takendown *bool `json:"takendown,omitempty" cborgen:"takendown,omitempty"` 645 737 // updatedAt: Timestamp referencing when the last update was made to the moderation status of the subject 646 738 UpdatedAt string `json:"updatedAt" cborgen:"updatedAt"` 647 739 }
+24 -8
api/ozone/moderationemitEvent.go
··· 30 30 ModerationDefs_ModEventLabel *ModerationDefs_ModEventLabel 31 31 ModerationDefs_ModEventReport *ModerationDefs_ModEventReport 32 32 ModerationDefs_ModEventMute *ModerationDefs_ModEventMute 33 - ModerationDefs_ModEventReverseTakedown *ModerationDefs_ModEventReverseTakedown 34 33 ModerationDefs_ModEventUnmute *ModerationDefs_ModEventUnmute 34 + ModerationDefs_ModEventMuteReporter *ModerationDefs_ModEventMuteReporter 35 + ModerationDefs_ModEventUnmuteReporter *ModerationDefs_ModEventUnmuteReporter 36 + ModerationDefs_ModEventReverseTakedown *ModerationDefs_ModEventReverseTakedown 35 37 ModerationDefs_ModEventEmail *ModerationDefs_ModEventEmail 36 38 ModerationDefs_ModEventTag *ModerationDefs_ModEventTag 37 39 } ··· 65 67 t.ModerationDefs_ModEventMute.LexiconTypeID = "tools.ozone.moderation.defs#modEventMute" 66 68 return json.Marshal(t.ModerationDefs_ModEventMute) 67 69 } 68 - if t.ModerationDefs_ModEventReverseTakedown != nil { 69 - t.ModerationDefs_ModEventReverseTakedown.LexiconTypeID = "tools.ozone.moderation.defs#modEventReverseTakedown" 70 - return json.Marshal(t.ModerationDefs_ModEventReverseTakedown) 71 - } 72 70 if t.ModerationDefs_ModEventUnmute != nil { 73 71 t.ModerationDefs_ModEventUnmute.LexiconTypeID = "tools.ozone.moderation.defs#modEventUnmute" 74 72 return json.Marshal(t.ModerationDefs_ModEventUnmute) 75 73 } 74 + if t.ModerationDefs_ModEventMuteReporter != nil { 75 + t.ModerationDefs_ModEventMuteReporter.LexiconTypeID = "tools.ozone.moderation.defs#modEventMuteReporter" 76 + return json.Marshal(t.ModerationDefs_ModEventMuteReporter) 77 + } 78 + if t.ModerationDefs_ModEventUnmuteReporter != nil { 79 + t.ModerationDefs_ModEventUnmuteReporter.LexiconTypeID = "tools.ozone.moderation.defs#modEventUnmuteReporter" 80 + return json.Marshal(t.ModerationDefs_ModEventUnmuteReporter) 81 + } 82 + if t.ModerationDefs_ModEventReverseTakedown != nil { 83 + t.ModerationDefs_ModEventReverseTakedown.LexiconTypeID = "tools.ozone.moderation.defs#modEventReverseTakedown" 84 + return json.Marshal(t.ModerationDefs_ModEventReverseTakedown) 85 + } 76 86 if t.ModerationDefs_ModEventEmail != nil { 77 87 t.ModerationDefs_ModEventEmail.LexiconTypeID = "tools.ozone.moderation.defs#modEventEmail" 78 88 return json.Marshal(t.ModerationDefs_ModEventEmail) ··· 111 121 case "tools.ozone.moderation.defs#modEventMute": 112 122 t.ModerationDefs_ModEventMute = new(ModerationDefs_ModEventMute) 113 123 return json.Unmarshal(b, t.ModerationDefs_ModEventMute) 114 - case "tools.ozone.moderation.defs#modEventReverseTakedown": 115 - t.ModerationDefs_ModEventReverseTakedown = new(ModerationDefs_ModEventReverseTakedown) 116 - return json.Unmarshal(b, t.ModerationDefs_ModEventReverseTakedown) 117 124 case "tools.ozone.moderation.defs#modEventUnmute": 118 125 t.ModerationDefs_ModEventUnmute = new(ModerationDefs_ModEventUnmute) 119 126 return json.Unmarshal(b, t.ModerationDefs_ModEventUnmute) 127 + case "tools.ozone.moderation.defs#modEventMuteReporter": 128 + t.ModerationDefs_ModEventMuteReporter = new(ModerationDefs_ModEventMuteReporter) 129 + return json.Unmarshal(b, t.ModerationDefs_ModEventMuteReporter) 130 + case "tools.ozone.moderation.defs#modEventUnmuteReporter": 131 + t.ModerationDefs_ModEventUnmuteReporter = new(ModerationDefs_ModEventUnmuteReporter) 132 + return json.Unmarshal(b, t.ModerationDefs_ModEventUnmuteReporter) 133 + case "tools.ozone.moderation.defs#modEventReverseTakedown": 134 + t.ModerationDefs_ModEventReverseTakedown = new(ModerationDefs_ModEventReverseTakedown) 135 + return json.Unmarshal(b, t.ModerationDefs_ModEventReverseTakedown) 120 136 case "tools.ozone.moderation.defs#modEventEmail": 121 137 t.ModerationDefs_ModEventEmail = new(ModerationDefs_ModEventEmail) 122 138 return json.Unmarshal(b, t.ModerationDefs_ModEventEmail)
+3 -1
api/ozone/moderationqueryStatuses.go
··· 22 22 // comment: Search subjects by keyword from comments 23 23 // includeMuted: By default, we don't include muted subjects in the results. Set this to true to include them. 24 24 // lastReviewedBy: Get all subject statuses that were reviewed by a specific moderator 25 + // onlyMuted: When set to true, only muted subjects and reporters will be returned. 25 26 // reportedAfter: Search subjects reported after a given timestamp 26 27 // reportedBefore: Search subjects reported before a given timestamp 27 28 // reviewState: Specify when fetching subjects in a certain state 28 29 // reviewedAfter: Search subjects reviewed after a given timestamp 29 30 // reviewedBefore: Search subjects reviewed before a given timestamp 30 31 // takendown: Get subjects that were taken down 31 - func ModerationQueryStatuses(ctx context.Context, c *xrpc.Client, appealed bool, comment string, cursor string, excludeTags []string, ignoreSubjects []string, includeMuted bool, lastReviewedBy string, limit int64, reportedAfter string, reportedBefore string, reviewState string, reviewedAfter string, reviewedBefore string, sortDirection string, sortField string, subject string, tags []string, takendown bool) (*ModerationQueryStatuses_Output, error) { 32 + func ModerationQueryStatuses(ctx context.Context, c *xrpc.Client, appealed bool, comment string, cursor string, excludeTags []string, ignoreSubjects []string, includeMuted bool, lastReviewedBy string, limit int64, onlyMuted bool, reportedAfter string, reportedBefore string, reviewState string, reviewedAfter string, reviewedBefore string, sortDirection string, sortField string, subject string, tags []string, takendown bool) (*ModerationQueryStatuses_Output, error) { 32 33 var out ModerationQueryStatuses_Output 33 34 34 35 params := map[string]interface{}{ ··· 40 41 "includeMuted": includeMuted, 41 42 "lastReviewedBy": lastReviewedBy, 42 43 "limit": limit, 44 + "onlyMuted": onlyMuted, 43 45 "reportedAfter": reportedAfter, 44 46 "reportedBefore": reportedBefore, 45 47 "reviewState": reviewState,
+2 -1
automod/cachestore/cachestore_redis.go
··· 16 16 var _ CacheStore = (*RedisCacheStore)(nil) 17 17 18 18 func NewRedisCacheStore(redisURL string, ttl time.Duration) (*RedisCacheStore, error) { 19 + ctx := context.Background() 19 20 opt, err := redis.ParseURL(redisURL) 20 21 if err != nil { 21 22 return nil, err 22 23 } 23 24 rdb := redis.NewClient(opt) 24 25 // check redis connection 25 - _, err = rdb.Ping(context.TODO()).Result() 26 + _, err = rdb.Ping(ctx).Result() 26 27 if err != nil { 27 28 return nil, err 28 29 }
+2 -1
automod/countstore/countstore_redis.go
··· 15 15 } 16 16 17 17 func NewRedisCountStore(redisURL string) (*RedisCountStore, error) { 18 + ctx := context.Background() 18 19 opt, err := redis.ParseURL(redisURL) 19 20 if err != nil { 20 21 return nil, err 21 22 } 22 23 rdb := redis.NewClient(opt) 23 24 // check redis connection 24 - _, err = rdb.Ping(context.TODO()).Result() 25 + _, err = rdb.Ping(ctx).Result() 25 26 if err != nil { 26 27 return nil, err 27 28 }
+4
automod/engine/account_meta.go
··· 18 18 FollowsCount int64 19 19 PostsCount int64 20 20 Takendown bool 21 + Deactivated bool 22 + // best effort public interpretation of account creation timestamp. not always available, and may be inaccurate/inconsistent for now. 23 + CreatedAt *time.Time 21 24 } 22 25 23 26 type ProfileSummary struct { ··· 30 33 Email string 31 34 EmailConfirmed bool 32 35 IndexedAt time.Time 36 + AccountTags []string 33 37 }
+58
automod/engine/context.go
··· 5 5 "fmt" 6 6 "log/slog" 7 7 8 + toolsozone "github.com/bluesky-social/indigo/api/ozone" 8 9 "github.com/bluesky-social/indigo/atproto/identity" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 10 11 ) ··· 37 38 // TODO: could consider adding commit-level metadata here. probably nullable if so, commit-level metadata isn't always available. might be best to do a separate event/context type for that 38 39 } 39 40 41 + // Represents an ozone event on a subject account. 42 + // 43 + // TODO: for ozone events with a record subject (not account subject), should we extend RecordContext instead? 44 + type OzoneEventContext struct { 45 + AccountContext 46 + 47 + Event OzoneEvent 48 + 49 + // Moderator team member (for ozone internal events) or account that created a report or appeal 50 + CreatorAccount AccountMeta 51 + 52 + // If the subject of the event is a record, this is the record metadata 53 + SubjectRecord *RecordMeta 54 + } 55 + 40 56 var ( 41 57 CreateOp = "create" 42 58 UpdateOp = "update" ··· 53 69 RecordKey syntax.RecordKey 54 70 CID *syntax.CID 55 71 RecordCBOR []byte 72 + } 73 + 74 + // Immutable 75 + type RecordMeta struct { 76 + DID syntax.DID 77 + Collection syntax.NSID 78 + RecordKey syntax.RecordKey 79 + CID *syntax.CID 80 + // TODO: RecordCBOR []byte? optional? 81 + } 82 + 83 + type OzoneEvent struct { 84 + EventType string 85 + EventID int64 86 + CreatedAt syntax.Datetime 87 + CreatedBy syntax.DID 88 + SubjectDID syntax.DID 89 + SubjectURI *syntax.ATURI 90 + // TODO: SubjectBlobs []syntax.CID 91 + Event toolsozone.ModerationDefs_ModEventView_Event 56 92 } 57 93 58 94 // Originally intended for push notifications, but can also work for any inter-account notification. ··· 178 214 return AccountRelationship{DID: other} 179 215 } 180 216 return *rel 217 + } 218 + 219 + // fetch account metadata for the given DID. if there is any problem with lookup, returns nil. 220 + // 221 + // TODO: should this take an AtIdentifier instead? 222 + func (c *BaseContext) GetAccountMeta(did syntax.DID) *AccountMeta { 223 + 224 + ident, err := c.engine.Directory.LookupDID(c.Ctx, did) 225 + if err != nil { 226 + if nil == c.Err { 227 + c.Err = err 228 + } 229 + return nil 230 + } 231 + am, err := c.engine.GetAccountMeta(c.Ctx, ident) 232 + if err != nil { 233 + if nil == c.Err { 234 + c.Err = err 235 + } 236 + return nil 237 + } 238 + return am 181 239 } 182 240 183 241 // update effects (indirect) ======
+218
automod/engine/engine_ozone.go
··· 1 + package engine 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "time" 7 + 8 + toolsozone "github.com/bluesky-social/indigo/api/ozone" 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + ) 11 + 12 + func NewOzoneEventContext(ctx context.Context, eng *Engine, eventView *toolsozone.ModerationDefs_ModEventView) (*OzoneEventContext, error) { 13 + 14 + if eventView.Event == nil { 15 + return nil, fmt.Errorf("nil ozone event type") 16 + } 17 + 18 + eventType := "" 19 + if eventView.Event.ModerationDefs_ModEventTakedown != nil { 20 + eventType = "takedown" 21 + } else if eventView.Event.ModerationDefs_ModEventReverseTakedown != nil { 22 + eventType = "reverseTakedown" 23 + } else if eventView.Event.ModerationDefs_ModEventComment != nil { 24 + eventType = "comment" 25 + } else if eventView.Event.ModerationDefs_ModEventReport != nil { 26 + eventType = "report" 27 + } else if eventView.Event.ModerationDefs_ModEventLabel != nil { 28 + eventType = "label" 29 + } else if eventView.Event.ModerationDefs_ModEventAcknowledge != nil { 30 + eventType = "acknowledge" 31 + } else if eventView.Event.ModerationDefs_ModEventEscalate != nil { 32 + eventType = "escalate" 33 + } else if eventView.Event.ModerationDefs_ModEventMute != nil { 34 + eventType = "mute" 35 + } else if eventView.Event.ModerationDefs_ModEventUnmute != nil { 36 + eventType = "unmute" 37 + } else if eventView.Event.ModerationDefs_ModEventMuteReporter != nil { 38 + eventType = "muteReporter" 39 + } else if eventView.Event.ModerationDefs_ModEventUnmuteReporter != nil { 40 + eventType = "unmuteReporter" 41 + } else if eventView.Event.ModerationDefs_ModEventEmail != nil { 42 + eventType = "email" 43 + } else if eventView.Event.ModerationDefs_ModEventResolveAppeal != nil { 44 + eventType = "resolveAppeal" 45 + } else if eventView.Event.ModerationDefs_ModEventDivert != nil { 46 + eventType = "divert" 47 + } else if eventView.Event.ModerationDefs_ModEventTag != nil { 48 + eventType = "tag" 49 + } else { 50 + return nil, fmt.Errorf("unhandled ozone event type") 51 + } 52 + 53 + creatorDID, err := syntax.ParseDID(eventView.CreatedBy) 54 + if err != nil { 55 + return nil, err 56 + } 57 + 58 + var subjectDID syntax.DID 59 + var subjectURI *syntax.ATURI 60 + var recordMeta *RecordMeta 61 + if eventView.Subject == nil { 62 + return nil, fmt.Errorf("empty ozone event subject") 63 + } else if eventView.Subject.AdminDefs_RepoRef != nil { 64 + subjectDID, err = syntax.ParseDID(eventView.Subject.AdminDefs_RepoRef.Did) 65 + if err != nil { 66 + return nil, err 67 + } 68 + } else if eventView.Subject.RepoStrongRef != nil { 69 + u, err := syntax.ParseATURI(eventView.Subject.RepoStrongRef.Uri) 70 + if err != nil { 71 + return nil, err 72 + } 73 + subjectURI := &u 74 + subjectDID, err = subjectURI.Authority().AsDID() 75 + if err != nil { 76 + return nil, err 77 + } 78 + cidVal, err := syntax.ParseCID(eventView.Subject.RepoStrongRef.Cid) 79 + if err != nil { 80 + return nil, err 81 + } 82 + recordMeta = &RecordMeta{ 83 + DID: subjectDID, 84 + Collection: subjectURI.Collection(), 85 + RecordKey: subjectURI.RecordKey(), 86 + CID: &cidVal, 87 + } 88 + } else { 89 + return nil, fmt.Errorf("empty ozone event subject") 90 + } 91 + 92 + createdAt, err := syntax.ParseDatetime(eventView.CreatedAt) 93 + if err != nil { 94 + return nil, err 95 + } 96 + 97 + evt := OzoneEvent{ 98 + EventType: eventType, 99 + EventID: eventView.Id, 100 + CreatedAt: createdAt, 101 + CreatedBy: creatorDID, 102 + SubjectDID: subjectDID, 103 + SubjectURI: subjectURI, 104 + // TODO: SubjectBlobs []syntax.CID 105 + Event: *eventView.Event, 106 + } 107 + 108 + creatorIdent, err := eng.Directory.LookupDID(ctx, evt.CreatedBy) 109 + if err != nil { 110 + return nil, err 111 + } 112 + if creatorIdent == nil { 113 + return nil, fmt.Errorf("identity not found for DID: %s", evt.CreatedBy) 114 + } 115 + creatorMeta, err := eng.GetAccountMeta(ctx, creatorIdent) 116 + if err != nil { 117 + return nil, err 118 + } 119 + 120 + subjectIdent, err := eng.Directory.LookupDID(ctx, evt.SubjectDID) 121 + if err != nil { 122 + return nil, err 123 + } 124 + if subjectIdent == nil { 125 + return nil, fmt.Errorf("identity not found for DID: %s", evt.SubjectDID) 126 + } 127 + accountMeta, err := eng.GetAccountMeta(ctx, subjectIdent) 128 + if err != nil { 129 + return nil, err 130 + } 131 + 132 + return &OzoneEventContext{ 133 + AccountContext: AccountContext{ 134 + BaseContext: BaseContext{ 135 + Ctx: ctx, 136 + Err: nil, 137 + Logger: eng.Logger.With("eventID", evt.EventID, "ozoneEventType", evt.EventType, "creatorDID", evt.CreatedBy, "subjectDID", evt.SubjectDID), 138 + engine: eng, 139 + effects: &Effects{}, 140 + }, 141 + Account: *accountMeta, 142 + }, 143 + Event: evt, 144 + CreatorAccount: *creatorMeta, 145 + SubjectRecord: recordMeta, 146 + }, nil 147 + } 148 + 149 + // Entrypoint for external code pushing ozone events. 150 + // 151 + // This method can be called concurrently, though cached state may end up inconsistent if multiple events for the same account (DID) are processed in parallel. 152 + func (eng *Engine) ProcessOzoneEvent(ctx context.Context, eventView *toolsozone.ModerationDefs_ModEventView) error { 153 + eventProcessCount.WithLabelValues("ozone").Inc() 154 + start := time.Now() 155 + defer func() { 156 + duration := time.Since(start) 157 + eventProcessDuration.WithLabelValues("ozone").Observe(duration.Seconds()) 158 + }() 159 + 160 + // similar to an HTTP server, we want to recover any panics from rule execution 161 + defer func() { 162 + if r := recover(); r != nil { 163 + eng.Logger.Error("automod ozone event execution exception", "err", r, "eventID", eventView.Id, "createdAt", eventView.CreatedAt) 164 + } 165 + }() 166 + ctx, cancel := context.WithTimeout(ctx, recordEventTimeout) 167 + defer cancel() 168 + 169 + ec, err := NewOzoneEventContext(ctx, eng, eventView) 170 + if err != nil { 171 + eventErrorCount.WithLabelValues("ozoneEvent").Inc() 172 + return fmt.Errorf("failed to hydrate ozone event context: %w", err) 173 + } 174 + 175 + // if this is a "self-event", created by automod itself, skip it to prevent a loop 176 + if ec.Event.CreatedBy.String() == eng.OzoneClient.Auth.Did { 177 + ec.Logger.Debug("skipping ozone self-event") 178 + return nil 179 + } 180 + 181 + ec.Logger.Debug("processing ozone event") 182 + 183 + if err := eng.Rules.CallOzoneEventRules(ec); err != nil { 184 + eventErrorCount.WithLabelValues("ozoneEvent").Inc() 185 + return fmt.Errorf("ozone rule execution failed: %w", err) 186 + } 187 + 188 + eng.CanonicalLogLineOzoneEvent(ec) 189 + 190 + // some ozone events should result in account meta cache flushes 191 + if (ec.Event.EventType == "takedown" || ec.Event.EventType == "reverseTakedown" || ec.Event.EventType == "label" || ec.Event.EventType == "tag") && ec.SubjectRecord == nil { 192 + if err := eng.PurgeAccountCaches(ctx, ec.Event.SubjectDID); err != nil { 193 + eng.Logger.Error("failed to purge identity cache", "err", err, "did", ec.Event.SubjectDID) 194 + } 195 + } 196 + if err := eng.persistAccountModActions(&ec.AccountContext); err != nil { 197 + eventErrorCount.WithLabelValues("ozoneEvent").Inc() 198 + return fmt.Errorf("failed to persist actions for ozone event: %w", err) 199 + } 200 + if err := eng.persistCounters(ctx, ec.effects); err != nil { 201 + eventErrorCount.WithLabelValues("ozoneEvent").Inc() 202 + return fmt.Errorf("failed to persist counts for ozone event: %w", err) 203 + } 204 + return nil 205 + } 206 + 207 + func (e *Engine) CanonicalLogLineOzoneEvent(c *OzoneEventContext) { 208 + c.Logger.Info("canonical-event-line", 209 + "accountLabels", c.effects.AccountLabels, 210 + "accountFlags", c.effects.AccountFlags, 211 + "accountTakedown", c.effects.AccountTakedown, 212 + "accountReports", len(c.effects.AccountReports), 213 + "recordLabels", c.effects.RecordLabels, 214 + "recordFlags", c.effects.RecordFlags, 215 + "recordTakedown", c.effects.RecordTakedown, 216 + "recordReports", len(c.effects.RecordReports), 217 + ) 218 + }
+62 -28
automod/engine/fetch_account_meta.go
··· 7 7 8 8 comatproto "github.com/bluesky-social/indigo/api/atproto" 9 9 appbsky "github.com/bluesky-social/indigo/api/bsky" 10 + toolsozone "github.com/bluesky-social/indigo/api/ozone" 10 11 "github.com/bluesky-social/indigo/atproto/identity" 11 12 "github.com/bluesky-social/indigo/atproto/syntax" 12 13 ) ··· 42 43 43 44 // doing a "full" fetch from here on 44 45 accountMetaFetches.Inc() 46 + am := AccountMeta{ 47 + Identity: ident, 48 + } 45 49 50 + // automod-internal "flags" 46 51 flags, err := e.Flags.Get(ctx, ident.DID.String()) 47 52 if err != nil { 48 53 return nil, fmt.Errorf("failed checking account flag cache: %w", err) 49 54 } 55 + am.AccountFlags = flags 50 56 51 57 // fetch account metadata from AppView 52 58 pv, err := appbsky.ActorGetProfile(ctx, e.BskyClient, ident.DID.String()) 53 59 if err != nil { 54 60 logger.Warn("account profile lookup failed", "err", err) 55 - am := AccountMeta{ 56 - Identity: ident, 57 - // Profile 58 - // AccountLabels 59 - // AccountNegatedLabels 60 - AccountFlags: flags, 61 - } 62 61 return &am, nil 63 62 } 64 63 64 + am.Profile = ProfileSummary{ 65 + HasAvatar: pv.Avatar != nil, 66 + Description: pv.Description, 67 + DisplayName: pv.DisplayName, 68 + } 69 + if pv.PostsCount != nil { 70 + am.PostsCount = *pv.PostsCount 71 + } 72 + if pv.FollowersCount != nil { 73 + am.FollowersCount = *pv.FollowersCount 74 + } 75 + if pv.FollowsCount != nil { 76 + am.FollowsCount = *pv.FollowsCount 77 + } 78 + 65 79 var labels []string 66 80 var negLabels []string 67 81 for _, lbl := range pv.Labels { ··· 71 85 labels = append(labels, lbl.Val) 72 86 } 73 87 } 88 + am.AccountLabels = dedupeStrings(labels) 89 + am.AccountNegatedLabels = dedupeStrings(negLabels) 74 90 75 - am := AccountMeta{ 76 - Identity: ident, 77 - Profile: ProfileSummary{ 78 - HasAvatar: pv.Avatar != nil, 79 - Description: pv.Description, 80 - DisplayName: pv.DisplayName, 81 - }, 82 - AccountLabels: dedupeStrings(labels), 83 - AccountNegatedLabels: dedupeStrings(negLabels), 84 - AccountFlags: flags, 85 - } 86 - if pv.PostsCount != nil { 87 - am.PostsCount = *pv.PostsCount 88 - } 89 - if pv.FollowersCount != nil { 90 - am.FollowersCount = *pv.FollowersCount 91 - } 92 - if pv.FollowsCount != nil { 93 - am.FollowsCount = *pv.FollowsCount 91 + if pv.CreatedAt != nil { 92 + ts, err := syntax.ParseDatetimeTime(*pv.CreatedAt) 93 + if err != nil { 94 + logger.Warn("invalid profile createdAt", "err", err, "createdAt", pv.CreatedAt) 95 + } else { 96 + am.CreatedAt = &ts 97 + } 94 98 } 95 99 96 - if e.AdminClient != nil { 100 + // first attempt to fetch private account metadata from Ozone 101 + if e.OzoneClient != nil { 102 + rd, err := toolsozone.ModerationGetRepo(ctx, e.OzoneClient, ident.DID.String()) 103 + if err != nil { 104 + logger.Warn("failed to fetch private account metadata from Ozone", "err", err) 105 + } else { 106 + ap := AccountPrivate{} 107 + if rd.Email != nil && *rd.Email != "" { 108 + ap.Email = *rd.Email 109 + } 110 + if rd.EmailConfirmedAt != nil && *rd.EmailConfirmedAt != "" { 111 + ap.EmailConfirmed = true 112 + } 113 + ts, err := syntax.ParseDatetimeTime(rd.IndexedAt) 114 + if err != nil { 115 + return nil, fmt.Errorf("bad account IndexedAt: %w", err) 116 + } 117 + ap.IndexedAt = ts 118 + if rd.DeactivatedAt != nil { 119 + am.Deactivated = true 120 + } 121 + if rd.Moderation != nil && rd.Moderation.SubjectStatus != nil { 122 + if rd.Moderation.SubjectStatus.Takendown != nil && *rd.Moderation.SubjectStatus.Takendown == true { 123 + am.Takendown = true 124 + } 125 + ap.AccountTags = dedupeStrings(rd.Moderation.SubjectStatus.Tags) 126 + } 127 + am.Private = &ap 128 + } 129 + } else if e.AdminClient != nil { 130 + // fall back to PDS/entryway fetching; less metadata available 97 131 pv, err := comatproto.AdminGetAccountInfo(ctx, e.AdminClient, ident.DID.String()) 98 132 if err != nil { 99 - logger.Warn("failed to fetch private account metadata", "err", err) 133 + logger.Warn("failed to fetch private account metadata from PDS/entryway", "err", err) 100 134 } else { 101 135 ap := AccountPrivate{} 102 136 if pv.Email != nil && *pv.Email != "" {
+1
automod/engine/persist.go
··· 169 169 if len(newLabels) > 0 && eng.OzoneClient != nil { 170 170 rv, err := toolsozone.ModerationGetRecord(ctx, eng.OzoneClient, c.RecordOp.CID.String(), c.RecordOp.ATURI().String()) 171 171 if err != nil { 172 + // NOTE: there is a frequent 4xx error here from Ozone because this record has not been indexed yet 172 173 c.Logger.Warn("failed to fetch private record metadata", "err", err) 173 174 } else { 174 175 var existingLabels []string
+11
automod/engine/ruleset.go
··· 18 18 IdentityRules []IdentityRuleFunc 19 19 BlobRules []BlobRuleFunc 20 20 NotificationRules []NotificationRuleFunc 21 + OzoneEventRules []OzoneEventRuleFunc 21 22 } 22 23 23 24 // Executes all the various record-related rules. Only dispatches execution, does no other de-dupe or pre/post processing. ··· 93 94 err := f(c) 94 95 if err != nil { 95 96 c.Logger.Error("notification rule execution failed", "err", err) 97 + } 98 + } 99 + return nil 100 + } 101 + 102 + func (r *RuleSet) CallOzoneEventRules(c *OzoneEventContext) error { 103 + for _, f := range r.OzoneEventRules { 104 + err := f(c) 105 + if err != nil { 106 + c.Logger.Error("ozone event rule execution failed", "err", err) 96 107 } 97 108 } 98 109 return nil
+1
automod/engine/ruletypes.go
··· 11 11 type ProfileRuleFunc = func(c *RecordContext, profile *appbsky.ActorProfile) error 12 12 type BlobRuleFunc = func(c *RecordContext, blob lexutil.LexBlob, data []byte) error 13 13 type NotificationRuleFunc = func(c *NotificationContext) error 14 + type OzoneEventRuleFunc = func(c *OzoneEventContext) error
+2 -1
automod/flagstore/flagstore_redis.go
··· 13 13 } 14 14 15 15 func NewRedisFlagStore(redisURL string) (*RedisFlagStore, error) { 16 + ctx := context.Background() 16 17 opt, err := redis.ParseURL(redisURL) 17 18 if err != nil { 18 19 return nil, err 19 20 } 20 21 rdb := redis.NewClient(opt) 21 22 // check redis connection 22 - _, err = rdb.Ping(context.TODO()).Result() 23 + _, err = rdb.Ping(ctx).Result() 23 24 if err != nil { 24 25 return nil, err 25 26 }
+2
automod/pkg.go
··· 14 14 15 15 type AccountContext = engine.AccountContext 16 16 type RecordContext = engine.RecordContext 17 + type OzoneEventContext = engine.OzoneEventContext 17 18 type NotificationContext = engine.NotificationContext 18 19 type RecordOp = engine.RecordOp 19 20 ··· 23 24 type ProfileRuleFunc = engine.ProfileRuleFunc 24 25 type BlobRuleFunc = engine.BlobRuleFunc 25 26 type NotificationRuleFunc = engine.NotificationRuleFunc 27 + type OzoneEventRuleFunc = engine.OzoneEventRuleFunc 26 28 27 29 var ( 28 30 ReportReasonSpam = engine.ReportReasonSpam
+3
automod/rules/all.go
··· 56 56 NotificationRules: []automod.NotificationRuleFunc{ 57 57 // none 58 58 }, 59 + OzoneEventRules: []automod.OzoneEventRuleFunc{ 60 + HarassmentProtectionOzoneEventRule, 61 + }, 59 62 } 60 63 return rules 61 64 }
+56 -12
automod/rules/harassment.go
··· 46 46 } 47 47 48 48 interactionDIDs = dedupeStrings(interactionDIDs) 49 - for _, did := range interactionDIDs { 50 - if did == c.Account.Identity.DID.String() { 49 + for _, d := range interactionDIDs { 50 + did, err := syntax.ParseDID(d) 51 + if err != nil { 52 + c.Logger.Warn("invalid DID in record", "did", d) 53 + continue 54 + } 55 + if did == c.Account.Identity.DID { 51 56 continue 52 57 } 53 - if c.InSet("harassment-target-dids", did) { 54 - // ignore if the target account follows the new account 55 - rel := c.GetAccountRelationship(syntax.DID(did)) 56 - if rel.FollowedBy { 57 - return nil 58 + targetIsProtected := false 59 + if c.InSet("harassment-target-dids", did.String()) { 60 + targetIsProtected = true 61 + } else { 62 + // check if the target account has a harassment protection tag in Ozone 63 + targetAccount := c.GetAccountMeta(did) 64 + if targetAccount == nil { 65 + continue 58 66 } 67 + for _, t := range targetAccount.Private.AccountTags { 68 + if t == "harassment-protection" { 69 + targetIsProtected = true 70 + break 71 + } 72 + } 73 + } 59 74 60 - //c.AddRecordFlag("interaction-harassed-target") 61 - c.ReportAccount(automod.ReportReasonOther, fmt.Sprintf("possible harassment of known target account: %s (also labeled; remove label if this isn't harassment)", did)) 62 - c.AddAccountLabel("!hide") 63 - c.Notify("slack") 64 - return nil 75 + if !targetIsProtected { 76 + continue 65 77 } 78 + 79 + // ignore if the target account follows the new account 80 + rel := c.GetAccountRelationship(syntax.DID(did)) 81 + if rel.FollowedBy { 82 + continue 83 + } 84 + 85 + //c.AddRecordFlag("interaction-harassed-target") 86 + c.ReportAccount(automod.ReportReasonOther, fmt.Sprintf("possible harassment of known target account: %s (also labeled; remove label if this isn't harassment)", did)) 87 + c.AddAccountLabel("!hide") 88 + c.Notify("slack") 89 + return nil 66 90 } 67 91 return nil 68 92 } ··· 97 121 } 98 122 return nil 99 123 } 124 + 125 + var _ automod.OzoneEventRuleFunc = HarassmentProtectionOzoneEventRule 126 + 127 + // looks for new harassment protection tags on accounts, and logs them 128 + func HarassmentProtectionOzoneEventRule(c *automod.OzoneEventContext) error { 129 + if c.Event.EventType != "tag" || c.Event.Event.ModerationDefs_ModEventTag == nil { 130 + return nil 131 + } 132 + 133 + for _, t := range c.Event.Event.ModerationDefs_ModEventTag.Add { 134 + if t == "harassment-protection" { 135 + c.Logger.Info("adding harassment protection to account", "ozoneComment", c.Event.Event.ModerationDefs_ModEventTag.Comment, "did", c.Account.Identity.DID, "handle", c.Account.Identity.Handle) 136 + // to make slack message clearer; bluring flags and tags is a bit weird 137 + c.AddAccountFlag("harassment-protection") 138 + //c.Notify("slack") 139 + break 140 + } 141 + } 142 + return nil 143 + }
+1 -4
automod/rules/misleading.go
··· 1 1 package rules 2 2 3 3 import ( 4 - "context" 5 4 "log/slog" 6 5 "net/url" 7 6 "strings" ··· 106 105 var _ automod.PostRuleFunc = MisleadingMentionPostRule 107 106 108 107 func MisleadingMentionPostRule(c *automod.RecordContext, post *appbsky.FeedPost) error { 109 - // TODO: do we really need to route context around? probably 110 - ctx := context.TODO() 111 108 facets, err := ExtractFacets(post) 112 109 if err != nil { 113 110 c.Logger.Warn("invalid facets", "err", err) ··· 127 124 continue 128 125 } 129 126 130 - mentioned, err := c.Directory().LookupHandle(ctx, handle) 127 + mentioned, err := c.Directory().LookupHandle(c.Ctx, handle) 131 128 if err != nil { 132 129 c.Logger.Warn("could not resolve handle", "handle", handle) 133 130 c.AddRecordFlag("broken-mention")
+23 -11
cmd/hepa/consumer.go
··· 25 25 26 26 func (s *Server) RunConsumer(ctx context.Context) error { 27 27 28 - // TODO: persist cursor in a database or local disk 29 28 cur, err := s.ReadLastCursor(ctx) 30 29 if err != nil { 31 30 return err ··· 53 52 atomic.StoreInt64(&s.lastSeq, evt.Seq) 54 53 return s.HandleRepoCommit(ctx, evt) 55 54 }, 56 - RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 55 + RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 57 56 atomic.StoreInt64(&s.lastSeq, evt.Seq) 58 57 did, err := syntax.ParseDID(evt.Did) 59 58 if err != nil { 60 - s.logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 59 + s.logger.Error("bad DID in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err) 61 60 return nil 62 61 } 63 - if err := s.engine.ProcessIdentityEvent(ctx, "handle", did); err != nil { 64 - s.logger.Error("processing handle update failed", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 62 + if err := s.engine.ProcessIdentityEvent(ctx, "identity", did); err != nil { 63 + s.logger.Error("processing repo identity failed", "did", evt.Did, "seq", evt.Seq, "err", err) 65 64 } 66 65 return nil 67 66 }, 68 - RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 67 + RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 69 68 atomic.StoreInt64(&s.lastSeq, evt.Seq) 70 69 did, err := syntax.ParseDID(evt.Did) 71 70 if err != nil { 72 - s.logger.Error("bad DID in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err) 71 + s.logger.Error("bad DID in RepoAccount event", "did", evt.Did, "seq", evt.Seq, "err", err) 73 72 return nil 74 73 } 75 - if err := s.engine.ProcessIdentityEvent(ctx, "identity", did); err != nil { 76 - s.logger.Error("processing repo identity failed", "did", evt.Did, "seq", evt.Seq, "err", err) 74 + if err := s.engine.ProcessIdentityEvent(ctx, "account", did); err != nil { 75 + s.logger.Error("processing repo account failed", "did", evt.Did, "seq", evt.Seq, "err", err) 77 76 } 78 77 return nil 79 78 }, 79 + // TODO: deprecated 80 + RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 81 + atomic.StoreInt64(&s.lastSeq, evt.Seq) 82 + did, err := syntax.ParseDID(evt.Did) 83 + if err != nil { 84 + s.logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 85 + return nil 86 + } 87 + if err := s.engine.ProcessIdentityEvent(ctx, "handle", did); err != nil { 88 + s.logger.Error("processing handle update failed", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 89 + } 90 + return nil 91 + }, 92 + // TODO: deprecated 80 93 RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { 81 94 atomic.StoreInt64(&s.lastSeq, evt.Seq) 82 95 did, err := syntax.ParseDID(evt.Did) ··· 89 102 } 90 103 return nil 91 104 }, 92 - // TODO: other event callbacks as needed 93 105 } 94 106 95 107 var scheduler events.Scheduler ··· 220 232 continue 221 233 } 222 234 default: 223 - // TODO: other event types: update, delete 235 + // TODO: should this be an error? 224 236 } 225 237 } 226 238
+97
cmd/hepa/consumer_ozone.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "time" 7 + 8 + toolsozone "github.com/bluesky-social/indigo/api/ozone" 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + ) 11 + 12 + func (s *Server) RunOzoneConsumer(ctx context.Context) error { 13 + 14 + cur, err := s.ReadLastOzoneCursor(ctx) 15 + if err != nil { 16 + return err 17 + } 18 + 19 + if cur == "" { 20 + cur = syntax.DatetimeNow().String() 21 + } 22 + since, err := syntax.ParseDatetime(cur) 23 + if err != nil { 24 + return err 25 + } 26 + 27 + s.logger.Info("subscribing to ozone event log", "upstream", s.engine.OzoneClient.Host, "cursor", cur, "since", since) 28 + var limit int64 = 50 29 + period := time.Second * 5 30 + 31 + for { 32 + //func ModerationQueryEvents(ctx context.Context, c *xrpc.Client, addedLabels []string, addedTags []string, comment string, createdAfter string, createdBefore string, createdBy string, cursor string, hasComment bool, includeAllUserRecords bool, limit int64, removedLabels []string, removedTags []string, reportTypes []string, sortDirection string, subject string, types []string) (*ModerationQueryEvents_Output, error) { 33 + me, err := toolsozone.ModerationQueryEvents( 34 + ctx, 35 + s.engine.OzoneClient, 36 + nil, // addedLabels: If specified, only events where all of these labels were added are returned 37 + nil, // addedTags: If specified, only events where all of these tags were added are returned 38 + "", // comment: If specified, only events with comments containing the keyword are returned 39 + since.String(), // createdAfter: Retrieve events created after a given timestamp 40 + "", // createdBefore: Retrieve events created before a given timestamp 41 + "", // createdBy 42 + "", // cursor 43 + false, // hasComment: If true, only events with comments are returned 44 + true, // includeAllUserRecords: If true, events on all record types (posts, lists, profile etc.) owned by the did are returned 45 + limit, 46 + nil, // removedLabels: If specified, only events where all of these labels were removed are returned 47 + nil, // removedTags 48 + nil, // reportTypes 49 + "asc", // sortDirection: Sort direction for the events. Defaults to descending order of created at timestamp. 50 + "", // subject 51 + nil, // types: The types of events (fully qualified string in the format of tools.ozone.moderation.defs#modEvent<name>) to filter by. If not specified, all events are returned. 52 + ) 53 + if err != nil { 54 + s.logger.Warn("ozone query events failed; sleeping then will retrying", "err", err, "period", period.String()) 55 + time.Sleep(period) 56 + continue 57 + } 58 + 59 + // track if the response contained anything new 60 + anyNewEvents := false 61 + for _, evt := range me.Events { 62 + createdAt, err := syntax.ParseDatetime(evt.CreatedAt) 63 + if err != nil { 64 + return fmt.Errorf("invalid time format for ozone 'createdAt': %w", err) 65 + } 66 + // skip if the timestamp is the exact same 67 + if createdAt == since { 68 + continue 69 + } 70 + anyNewEvents = true 71 + // TODO: is there a race condition here? 72 + if !createdAt.Time().After(since.Time()) { 73 + s.logger.Error("out of order ozone event", "createdAt", createdAt, "since", since) 74 + return fmt.Errorf("out of order ozone event") 75 + } 76 + if err = s.HandleOzoneEvent(ctx, evt); err != nil { 77 + s.logger.Error("failed to process ozone event", "event", evt) 78 + } 79 + since = createdAt 80 + s.lastOzoneCursor.Store(since.String()) 81 + } 82 + if !anyNewEvents { 83 + s.logger.Debug("... ozone poller sleeping", "period", period.String()) 84 + time.Sleep(period) 85 + } 86 + } 87 + } 88 + 89 + func (s *Server) HandleOzoneEvent(ctx context.Context, eventView *toolsozone.ModerationDefs_ModEventView) error { 90 + 91 + s.logger.Debug("received ozone event", "eventID", eventView.Id, "createdAt", eventView.CreatedAt) 92 + 93 + if err := s.engine.ProcessOzoneEvent(ctx, eventView); err != nil { 94 + s.logger.Error("engine failed to process ozone event", "err", err) 95 + } 96 + return nil 97 + }
+16 -1
cmd/hepa/main.go
··· 264 264 } 265 265 }() 266 266 267 - // the main service loop 267 + // ozone event consumer (if configured) 268 + if srv.engine.OzoneClient != nil { 269 + go func() { 270 + if err := srv.RunOzoneConsumer(ctx); err != nil { 271 + slog.Error("ozone consumer failed", "err", err) 272 + } 273 + }() 274 + 275 + go func() { 276 + if err := srv.RunPersistOzoneCursor(ctx); err != nil { 277 + slog.Error("ozone cursor routine failed", "err", err) 278 + } 279 + }() 280 + } 281 + 282 + // firehose event consumer (main processor) 268 283 if err := srv.RunConsumer(ctx); err != nil { 269 284 return fmt.Errorf("failure consuming and processing firehose: %w", err) 270 285 }
+70 -1
cmd/hepa/server.go
··· 38 38 // The value is best-effort (the stream handling itself is concurrent, so event numbers may not be monotonic), 39 39 // but nonetheless, you must use atomics when updating or reading this (to avoid data races). 40 40 lastSeq int64 41 + 42 + // same as lastSeq, but for Ozone timestamp cursor. the value is a string. 43 + lastOzoneCursor atomic.Value 41 44 } 42 45 43 46 type Config struct { ··· 236 239 } 237 240 238 241 var cursorKey = "hepa/seq" 242 + var ozoneCursorKey = "hepa/ozoneTimestamp" 239 243 240 244 func (s *Server) ReadLastCursor(ctx context.Context) (int64, error) { 241 245 // if redis isn't configured, just skip ··· 248 252 if err == redis.Nil { 249 253 s.logger.Info("no pre-existing cursor in redis") 250 254 return 0, nil 255 + } else if err != nil { 256 + return 0, err 251 257 } 252 258 s.logger.Info("successfully found prior subscription cursor seq in redis", "seq", val) 253 - return val, err 259 + return val, nil 260 + } 261 + 262 + func (s *Server) ReadLastOzoneCursor(ctx context.Context) (string, error) { 263 + // if redis isn't configured, just skip 264 + if s.rdb == nil { 265 + s.logger.Info("redis not configured, skipping ozone cursor read") 266 + return "", nil 267 + } 268 + 269 + val, err := s.rdb.Get(ctx, ozoneCursorKey).Result() 270 + if err == redis.Nil || val == "" { 271 + s.logger.Info("no pre-existing ozone cursor in redis") 272 + return "", nil 273 + } else if err != nil { 274 + return "", err 275 + } 276 + s.logger.Info("successfully found prior ozone offset timestamp in redis", "cursor", val) 277 + return val, nil 254 278 } 255 279 256 280 func (s *Server) PersistCursor(ctx context.Context) error { ··· 266 290 return err 267 291 } 268 292 293 + func (s *Server) PersistOzoneCursor(ctx context.Context) error { 294 + // if redis isn't configured, just skip 295 + if s.rdb == nil { 296 + return nil 297 + } 298 + lastCursor := s.lastOzoneCursor.Load() 299 + if lastCursor == nil || lastCursor == "" { 300 + return nil 301 + } 302 + err := s.rdb.Set(ctx, ozoneCursorKey, lastCursor, 14*24*time.Hour).Err() 303 + return err 304 + } 305 + 269 306 // this method runs in a loop, persisting the current cursor state every 5 seconds 270 307 func (s *Server) RunPersistCursor(ctx context.Context) error { 271 308 ··· 297 334 } 298 335 } 299 336 } 337 + 338 + // this method runs in a loop, persisting the current cursor state every 5 seconds 339 + func (s *Server) RunPersistOzoneCursor(ctx context.Context) error { 340 + 341 + // if redis isn't configured, just skip 342 + if s.rdb == nil { 343 + return nil 344 + } 345 + ticker := time.NewTicker(5 * time.Second) 346 + for { 347 + select { 348 + case <-ctx.Done(): 349 + lastCursor := s.lastOzoneCursor.Load() 350 + if lastCursor != nil && lastCursor != "" { 351 + s.logger.Info("persisting final ozone cursor timestamp", "cursor", lastCursor) 352 + err := s.PersistOzoneCursor(ctx) 353 + if err != nil { 354 + s.logger.Error("failed to persist ozone cursor", "err", err, "cursor", lastCursor) 355 + } 356 + } 357 + return nil 358 + case <-ticker.C: 359 + lastCursor := s.lastOzoneCursor.Load() 360 + if lastCursor != nil && lastCursor != "" { 361 + err := s.PersistOzoneCursor(ctx) 362 + if err != nil { 363 + s.logger.Error("failed to persist ozone cursor", "err", err, "cursor", lastCursor) 364 + } 365 + } 366 + } 367 + } 368 + }