A rust implementation of skywatch-phash
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix: Use CowStr for all String types in types.rs

This commit updates the `BlobCheck`, `BlobReference`, `ImageJob`, and
`MatchResult` structs to use `CowStr` instead of `String` for all string
fields. This change improves performance by reducing the number of
allocations. It also adds deserialization helpers.

- Updated all string fields in structs to use CowStr - Added
deserialization helpers

Skywatch 188ba7d9 447f38e7

+476 -358
+6 -5
rules/blobs.json
··· 5 5 "d9794408f1f3fffb", 6 6 "0f093139797b7967", 7 7 "fdedc3030100c0fd", 8 - "0f7f707dcc0c0600" 8 + "0f7f707dcc0c0600", 9 + "87030303199dff81" 9 10 ], 10 11 "label": "troll", 11 12 "comment": "Image is used in harrassment campaign targeting Will Stancil", ··· 13 14 "labelAcct": true, 14 15 "reportPost": false, 15 16 "toLabel": true, 16 - "hammingThreshold": 3, 17 - "description": "Sample harassment image variants (placeholder hashes)", 17 + "hammingThreshold": 1, 18 + "description": "Will Stancil Harrassment Memes", 18 19 "ignoreDID": ["did:plc:7umvpuxe2vbrc3zrzuquzniu"] 19 20 }, 20 21 { ··· 29 30 "description": "Sample harassment image variants (placeholder hashes)" 30 31 }, 31 32 { 32 - "phashes": ["3e7e6e561202627c"], 33 + "phashes": ["3e7e6e561202627c", "013078fcf2bff860"], 33 34 "label": "sensual-alf", 34 35 "comment": "Posting Alf", 35 36 "reportAcct": false, ··· 37 38 "reportPost": false, 38 39 "toLabel": true, 39 40 "hammingThreshold": 3, 40 - "description": "Sample harassment image variants (placeholder hashes)" 41 + "description": "Lewd Alf" 41 42 } 42 43 ]
+3 -3
src/bin/test-image.rs
··· 53 53 for check in &blob_checks { 54 54 // Check if test DID is ignored 55 55 if let Some(ignore_list) = &check.ignore_did { 56 - if ignore_list.contains(&test_did.to_string()) { 56 + if ignore_list.iter().any(|did| did.as_str() == test_did) { 57 57 println!("⏭️ Skipped rule '{}' (test DID is ignored)", check.label); 58 58 continue; 59 59 } ··· 63 63 64 64 // Check each phash in the rule 65 65 for check_phash in &check.phashes { 66 - match skywatch_phash_rs::processor::phash::hamming_distance(&phash, check_phash) { 66 + match skywatch_phash_rs::processor::phash::hamming_distance(&phash, check_phash.as_str()) { 67 67 Ok(distance) => { 68 68 if distance <= threshold { 69 69 found_match = true; 70 70 println!("✅ MATCH FOUND!"); 71 71 println!(" Rule: {}", check.label); 72 - println!(" Description: {}", check.description.as_ref().unwrap_or(&"N/A".to_string())); 72 + println!(" Description: {}", check.description.as_deref().unwrap_or("N/A")); 73 73 println!(" Matched phash: {}", check_phash); 74 74 println!(" Hamming distance: {} (threshold: {})", distance, threshold); 75 75 println!(" Actions:");
+1 -1
src/cache/mod.rs
··· 116 116 117 117 #[cfg(test)] 118 118 mod tests { 119 - use super::*; 119 + 120 120 121 121 // Note: These are integration tests that require a running Redis instance 122 122 // Run with: cargo test --test cache -- --ignored
+125 -79
src/jetstream/events.rs
··· 1 + use jacquard_common::types::string::{AtprotoStr, Cid}; 2 + use jacquard_common::types::value::Data; 1 3 use miette::Result; 2 - use serde_json::Value; 3 4 4 5 use crate::types::BlobReference; 5 6 ··· 8 9 /// Handles two cases: 9 10 /// 1. Direct images: record.embed.images[].image.ref.$link 10 11 /// 2. Quote posts with media: record.embed.media.images[].image.ref.$link 11 - pub fn extract_blobs_from_record(record: &Value) -> Result<Vec<BlobReference>> { 12 + pub fn extract_blobs_from_record(record: &Data) -> Result<Vec<BlobReference>> { 12 13 let mut blobs = Vec::new(); 13 14 14 - let Some(embed) = record.get("embed") else { 15 + let Data::Object(record_obj) = record else { 16 + return Ok(blobs); 17 + }; 18 + 19 + let Some(Data::Object(embed)) = record_obj.0.get("embed") else { 15 20 return Ok(blobs); 16 21 }; 17 22 18 23 // Case 1: Direct images (embed.images) 19 - if let Some(images) = embed.get("images").and_then(|v| v.as_array()) { 20 - for img in images { 24 + if let Some(Data::Array(images)) = embed.0.get("images") { 25 + for img in images.0.iter() { 21 26 if let Some(blob_ref) = extract_blob_from_image(img) { 22 27 blobs.push(blob_ref); 23 28 } ··· 25 30 } 26 31 27 32 // Case 2: Quote posts with media (embed.media.images) 28 - if let Some(media) = embed.get("media") { 29 - if let Some(images) = media.get("images").and_then(|v| v.as_array()) { 30 - for img in images { 33 + if let Some(Data::Object(media)) = embed.0.get("media") { 34 + if let Some(Data::Array(images)) = media.0.get("images") { 35 + for img in images.0.iter() { 31 36 if let Some(blob_ref) = extract_blob_from_image(img) { 32 37 blobs.push(blob_ref); 33 38 } ··· 39 44 } 40 45 41 46 /// Extract a single blob reference from an image object 42 - fn extract_blob_from_image(img: &Value) -> Option<BlobReference> { 43 - let image_obj = img.get("image")?; 44 - let ref_obj = image_obj.get("ref")?; 45 - let cid = ref_obj.get("$link")?.as_str()?; 47 + fn extract_blob_from_image(img: &Data) -> Option<BlobReference> { 48 + use jacquard_common::IntoStatic; 49 + 50 + let Data::Object(img_obj) = img else { 51 + return None; 52 + }; 53 + 54 + let Data::Object(image_obj) = img_obj.0.get("image")? else { 55 + return None; 56 + }; 57 + 58 + let Data::Object(ref_obj) = image_obj.0.get("ref")? else { 59 + return None; 60 + }; 61 + 62 + let Data::String(AtprotoStr::String(cid_str)) = ref_obj.0.get("$link")? else { 63 + return None; 64 + }; 46 65 47 66 let mime_type = image_obj 67 + .0 48 68 .get("mimeType") 49 - .and_then(|v| v.as_str()) 50 - .map(String::from); 69 + .and_then(|v| match v { 70 + Data::String(AtprotoStr::String(s)) => Some(s.to_string().into()), 71 + _ => None, 72 + }); 51 73 52 74 Some(BlobReference { 53 - cid: cid.to_string(), 75 + cid: Cid::str(cid_str.as_str()).into_static(), 54 76 mime_type, 55 77 }) 56 78 } ··· 58 80 #[cfg(test)] 59 81 mod tests { 60 82 use super::*; 61 - use serde_json::json; 83 + use jacquard_common::types::value::{Array, Object}; 84 + use jacquard_common::CowStr; 85 + use std::collections::BTreeMap; 86 + 87 + fn build_image_obj(cid: &str, mime_type: &str) -> Data<'static> { 88 + let mut ref_map = BTreeMap::new(); 89 + ref_map.insert( 90 + "$link".into(), 91 + Data::String(AtprotoStr::String(CowStr::from(cid.to_string()))), 92 + ); 93 + 94 + let mut image_map = BTreeMap::new(); 95 + image_map.insert("ref".into(), Data::Object(Object(ref_map))); 96 + image_map.insert( 97 + "mimeType".into(), 98 + Data::String(AtprotoStr::String(CowStr::from(mime_type.to_string()))), 99 + ); 100 + 101 + let mut img_obj = BTreeMap::new(); 102 + img_obj.insert("image".into(), Data::Object(Object(image_map))); 103 + 104 + Data::Object(Object(img_obj)) 105 + } 106 + 107 + fn build_direct_images_record(images: Vec<(&str, &str)>) -> Data<'static> { 108 + let image_array: Vec<Data<'static>> = images 109 + .into_iter() 110 + .map(|(cid, mime)| build_image_obj(cid, mime)) 111 + .collect(); 112 + 113 + let mut embed_map = BTreeMap::new(); 114 + embed_map.insert( 115 + "$type".into(), 116 + Data::String(AtprotoStr::String(CowStr::from( 117 + "app.bsky.embed.images".to_string(), 118 + ))), 119 + ); 120 + embed_map.insert("images".into(), Data::Array(Array(image_array))); 121 + 122 + let mut record_map = BTreeMap::new(); 123 + record_map.insert("embed".into(), Data::Object(Object(embed_map))); 124 + 125 + Data::Object(Object(record_map)) 126 + } 127 + 128 + fn build_quote_with_media_record(cid: &str, mime_type: &str) -> Data<'static> { 129 + let image_array = vec![build_image_obj(cid, mime_type)]; 130 + 131 + let mut media_map = BTreeMap::new(); 132 + media_map.insert("images".into(), Data::Array(Array(image_array))); 133 + 134 + let mut embed_map = BTreeMap::new(); 135 + embed_map.insert( 136 + "$type".into(), 137 + Data::String(AtprotoStr::String(CowStr::from( 138 + "app.bsky.embed.recordWithMedia".to_string(), 139 + ))), 140 + ); 141 + embed_map.insert("media".into(), Data::Object(Object(media_map))); 142 + 143 + let mut record_map = BTreeMap::new(); 144 + record_map.insert("embed".into(), Data::Object(Object(embed_map))); 145 + 146 + Data::Object(Object(record_map)) 147 + } 148 + 149 + fn build_no_embed_record() -> Data<'static> { 150 + let mut record_map = BTreeMap::new(); 151 + record_map.insert( 152 + "text".into(), 153 + Data::String(AtprotoStr::String(CowStr::from( 154 + "Just a text post".to_string(), 155 + ))), 156 + ); 157 + 158 + Data::Object(Object(record_map)) 159 + } 62 160 63 161 #[test] 64 162 fn test_extract_blobs_direct_images() { 65 - let record = json!({ 66 - "embed": { 67 - "$type": "app.bsky.embed.images", 68 - "images": [ 69 - { 70 - "alt": "Test image", 71 - "image": { 72 - "ref": { 73 - "$link": "bafyreiabc123" 74 - }, 75 - "mimeType": "image/jpeg" 76 - } 77 - } 78 - ] 79 - } 80 - }); 163 + let record = build_direct_images_record(vec![("bafyreiabc123", "image/jpeg")]); 81 164 82 165 let blobs = extract_blobs_from_record(&record).unwrap(); 83 166 assert_eq!(blobs.len(), 1); 84 - assert_eq!(blobs[0].cid, "bafyreiabc123"); 167 + assert_eq!(blobs[0].cid.as_str(), "bafyreiabc123"); 85 168 assert_eq!(blobs[0].mime_type.as_deref(), Some("image/jpeg")); 86 169 } 87 170 88 171 #[test] 89 172 fn test_extract_blobs_quote_with_media() { 90 - let record = json!({ 91 - "embed": { 92 - "$type": "app.bsky.embed.recordWithMedia", 93 - "media": { 94 - "images": [ 95 - { 96 - "alt": "Test image", 97 - "image": { 98 - "ref": { 99 - "$link": "bafyreiabc456" 100 - }, 101 - "mimeType": "image/png" 102 - } 103 - } 104 - ] 105 - } 106 - } 107 - }); 173 + let record = build_quote_with_media_record("bafyreiabc456", "image/png"); 108 174 109 175 let blobs = extract_blobs_from_record(&record).unwrap(); 110 176 assert_eq!(blobs.len(), 1); 111 - assert_eq!(blobs[0].cid, "bafyreiabc456"); 177 + assert_eq!(blobs[0].cid.as_str(), "bafyreiabc456"); 112 178 assert_eq!(blobs[0].mime_type.as_deref(), Some("image/png")); 113 179 } 114 180 115 181 #[test] 116 182 fn test_extract_blobs_no_embed() { 117 - let record = json!({ 118 - "text": "Just a text post" 119 - }); 183 + let record = build_no_embed_record(); 120 184 121 185 let blobs = extract_blobs_from_record(&record).unwrap(); 122 186 assert_eq!(blobs.len(), 0); ··· 124 188 125 189 #[test] 126 190 fn test_extract_blobs_multiple_images() { 127 - let record = json!({ 128 - "embed": { 129 - "images": [ 130 - { 131 - "image": { 132 - "ref": { 133 - "$link": "bafyreiabc111" 134 - }, 135 - "mimeType": "image/jpeg" 136 - } 137 - }, 138 - { 139 - "image": { 140 - "ref": { 141 - "$link": "bafyreiabc222" 142 - }, 143 - "mimeType": "image/png" 144 - } 145 - } 146 - ] 147 - } 148 - }); 191 + let record = build_direct_images_record(vec![ 192 + ("bafyreiabc111", "image/jpeg"), 193 + ("bafyreiabc222", "image/png"), 194 + ]); 149 195 150 196 let blobs = extract_blobs_from_record(&record).unwrap(); 151 197 assert_eq!(blobs.len(), 2); 152 - assert_eq!(blobs[0].cid, "bafyreiabc111"); 153 - assert_eq!(blobs[1].cid, "bafyreiabc222"); 198 + assert_eq!(blobs[0].cid.as_str(), "bafyreiabc111"); 199 + assert_eq!(blobs[1].cid.as_str(), "bafyreiabc222"); 154 200 } 155 201 }
+11 -9
src/jetstream/mod.rs
··· 1 1 use jacquard_common::jetstream::{CommitOperation, JetstreamMessage, JetstreamParams}; 2 + use jacquard_common::types::string::AtUri; 2 3 use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 4 + use jacquard_common::IntoStatic; 3 5 use miette::{IntoDiagnostic, Result}; 4 6 use futures::StreamExt; 5 7 use tokio::sync::mpsc; ··· 138 140 return Ok(()); 139 141 }; 140 142 141 - // Convert Data to serde_json::Value 142 - let record_value: serde_json::Value = 143 - serde_json::to_value(record_data).into_diagnostic()?; 144 - let blobs = events::extract_blobs_from_record(&record_value)?; 143 + let blobs = events::extract_blobs_from_record(record_data)?; 145 144 146 145 if blobs.is_empty() { 147 146 return Ok(()); 148 147 } 149 148 150 - let post_uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 149 + let post_uri_str = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 150 + let post_uri = AtUri::new(&post_uri_str) 151 + .into_diagnostic()? 152 + .into_static(); 151 153 152 154 debug!( 153 155 "Post with {} blob(s): {}", ··· 159 161 let post_cid = commit 160 162 .cid 161 163 .as_ref() 162 - .map(|cid| cid.to_string()) 163 - .unwrap_or_default(); 164 + .map(|cid| cid.clone().into_static()) 165 + .unwrap_or_else(|| jacquard_common::types::string::Cid::str("")); 164 166 165 167 let job = ImageJob { 166 - post_uri: post_uri.clone(), 168 + post_uri, 167 169 post_cid, 168 - post_did: did.to_string(), 170 + post_did: did.into_static(), 169 171 blobs, 170 172 timestamp: chrono::Utc::now().timestamp_millis(), 171 173 attempts: 0,
+19 -68
src/moderation/account.rs
··· 8 8 ModEventLabel, ModEventReport, ModEventTakedown, ModTool, 9 9 }; 10 10 use jacquard_common::CowStr; 11 - use jacquard_common::types::string::Did; 12 - use jacquard_common::types::value::{Data, Object}; 13 - use jacquard_common::xrpc::{CallOptions, XrpcClient}; 11 + use jacquard_common::xrpc::XrpcClient; 14 12 15 - use jacquard_common::smol_str::SmolStr; 16 13 use miette::{IntoDiagnostic, Result}; 17 14 use std::collections::BTreeMap; 18 15 use tracing::{debug, info}; 19 16 20 17 use crate::config::Config; 21 18 use crate::moderation::rate_limiter::RateLimiter; 19 + use crate::moderation::{ 20 + build_mod_tool_meta, build_moderation_call_opts, build_timestamped_comment, parse_did, 21 + }; 22 22 23 23 /// Label an account with a specific label via Ozone moderation API 24 24 pub async fn label_account( ··· 37 37 38 38 info!("Labeling account {} with label: {}", did, label_val); 39 39 40 - let timestamp = chrono::Utc::now().to_rfc3339(); 41 - let comment = format!("{}: {}", timestamp, check_comment); 42 - 43 - // Build mod_tool meta 44 - let mut meta_map = BTreeMap::new(); 45 - meta_map.insert( 46 - SmolStr::new("externalUrl"), 47 - format!("https://pdsls.dev/{}", post_uri).into(), 48 - ); 49 - meta_map.insert(SmolStr::new("phash"), phash.into()); 40 + let comment = build_timestamped_comment(check_comment); 41 + let meta = build_mod_tool_meta(post_uri, phash); 50 42 51 43 // Create moderation label event using jacquard-api types 52 44 let event = EmitEvent::new() 53 - .created_by(Did::new(created_by).into_diagnostic()?) 45 + .created_by(parse_did(created_by)?) 54 46 .event(EmitEventEvent::ModEventLabel(Box::new( 55 47 ModEventLabel::builder() 56 48 .create_label_vals(vec![CowStr::from(label_val)]) ··· 60 52 ))) 61 53 .subject(EmitEventSubject::RepoRef(Box::new( 62 54 RepoRef::builder() 63 - .did(Did::new(did).into_diagnostic()?) 55 + .did(parse_did(did)?) 64 56 .build(), 65 57 ))) 66 58 .mod_tool(ModTool { 67 59 name: CowStr::from("skywatch/skywatch-phash-rs"), 68 - meta: Some(Data::Object(Object::from(meta_map))), 60 + meta: Some(meta), 69 61 extra_data: BTreeMap::new(), 70 62 }) 71 63 .build(); 72 64 73 - // Build call options with proxy headers 74 - let opts = CallOptions { 75 - auth: None, 76 - atproto_proxy: Some(CowStr::from(format!( 77 - "{}#atproto_labeler", 78 - config.moderation.labeler_did 79 - ))), 80 - atproto_accept_labelers: Some(vec![CowStr::from( 81 - "did:plc:ar7c4by46qjdydhdevvrndac;redact", 82 - )]), 83 - extra_headers: vec![], 84 - }; 65 + let opts = build_moderation_call_opts(config); 85 66 86 67 // Send request via jacquard agent 87 68 let _response = agent.send_with_opts(event, opts).await.into_diagnostic()?; ··· 108 89 109 90 info!("Reporting account {} to ozone: {:?}", did, reason); 110 91 111 - let timestamp = chrono::Utc::now().to_rfc3339(); 112 - let comment = format!("{}: {}", timestamp, check_comment); 113 - 114 - // Build mod_tool meta 115 - let mut meta_map = BTreeMap::new(); 116 - meta_map.insert( 117 - SmolStr::new("externalUrl"), 118 - format!("https://pdsls.dev/{}", post_uri).into(), 119 - ); 120 - meta_map.insert(SmolStr::new("phash"), phash.into()); 92 + let comment = build_timestamped_comment(check_comment); 93 + let meta = build_mod_tool_meta(post_uri, phash); 121 94 122 95 // Create moderation report event using jacquard-api types 123 96 let event = EmitEvent::new() 124 - .created_by(Did::new(created_by).into_diagnostic()?) 97 + .created_by(parse_did(created_by)?) 125 98 .event(EmitEventEvent::ModEventReport(Box::new( 126 99 ModEventReport::builder() 127 100 .report_type(reason) ··· 130 103 ))) 131 104 .subject(EmitEventSubject::RepoRef(Box::new( 132 105 RepoRef::builder() 133 - .did(Did::new(did).into_diagnostic()?) 106 + .did(parse_did(did)?) 134 107 .build(), 135 108 ))) 136 109 .subject_blob_cids(vec![]) 137 110 .mod_tool(ModTool { 138 111 name: CowStr::from("skywatch/skywatch-phash-rs"), 139 - meta: Some(Data::Object(Object::from(meta_map))), 112 + meta: Some(meta), 140 113 extra_data: BTreeMap::new(), 141 114 }) 142 115 .build(); 143 116 144 - // Build call options with proxy headers 145 - let opts = CallOptions { 146 - auth: None, 147 - atproto_proxy: Some(CowStr::from(format!( 148 - "{}#atproto_labeler", 149 - config.moderation.labeler_did 150 - ))), 151 - atproto_accept_labelers: Some(vec![CowStr::from( 152 - "did:plc:ar7c4by46qjdydhdevvrndac;redact", 153 - )]), 154 - extra_headers: vec![], 155 - }; 117 + let opts = build_moderation_call_opts(config); 156 118 157 119 // Send request via jacquard agent 158 120 let _response = agent.send_with_opts(event, opts).await.into_diagnostic()?; ··· 178 140 179 141 // Create moderation takedown event using jacquard-api types 180 142 let event = EmitEvent::new() 181 - .created_by(Did::new(created_by).into_diagnostic()?) 143 + .created_by(parse_did(created_by)?) 182 144 .event(EmitEventEvent::ModEventTakedown(Box::new( 183 145 ModEventTakedown { 184 146 comment: Some(CowStr::from(comment)), ··· 187 149 ))) 188 150 .subject(EmitEventSubject::RepoRef(Box::new( 189 151 RepoRef::builder() 190 - .did(Did::new(did).into_diagnostic()?) 152 + .did(parse_did(did)?) 191 153 .build(), 192 154 ))) 193 155 .build(); 194 156 195 - // Build call options with proxy headers 196 - let opts = CallOptions { 197 - auth: None, 198 - atproto_proxy: Some(CowStr::from(format!( 199 - "{}#atproto_labeler", 200 - config.moderation.labeler_did 201 - ))), 202 - atproto_accept_labelers: Some(vec![CowStr::from( 203 - "did:plc:ar7c4by46qjdydhdevvrndac;redact", 204 - )]), 205 - extra_headers: vec![], 206 - }; 157 + let opts = build_moderation_call_opts(config); 207 158 208 159 // Send request via jacquard agent 209 160 let _response = agent.send_with_opts(event, opts).await.into_diagnostic()?;
+51
src/moderation/helpers.rs
··· 1 + use jacquard_common::types::string::{AtprotoStr, Did}; 2 + use jacquard_common::types::value::{Data, Object}; 3 + use jacquard_common::xrpc::CallOptions; 4 + use jacquard_common::CowStr; 5 + use jacquard_common::smol_str::SmolStr; 6 + use jacquard_common::IntoStatic; 7 + use miette::{IntoDiagnostic, Result}; 8 + use std::collections::BTreeMap; 9 + 10 + use crate::config::Config; 11 + 12 + pub fn build_timestamped_comment(check_comment: &str) -> String { 13 + let timestamp = chrono::Utc::now().to_rfc3339(); 14 + format!("{}: {}", timestamp, check_comment) 15 + } 16 + 17 + pub fn build_mod_tool_meta(uri: &str, phash: &str) -> Data<'static> { 18 + let mut meta_map = BTreeMap::new(); 19 + meta_map.insert( 20 + SmolStr::new("externalUrl"), 21 + Data::String(AtprotoStr::String(CowStr::from(format!( 22 + "https://pdsls.dev/{}", 23 + uri 24 + )))), 25 + ); 26 + meta_map.insert( 27 + SmolStr::new("phash"), 28 + Data::String(AtprotoStr::String(CowStr::from(phash.to_string()))), 29 + ); 30 + Data::Object(Object::from(meta_map)) 31 + } 32 + 33 + pub fn build_moderation_call_opts(config: &Config) -> CallOptions<'_> { 34 + CallOptions { 35 + auth: None, 36 + atproto_proxy: Some(CowStr::from(format!( 37 + "{}#atproto_labeler", 38 + config.moderation.labeler_did 39 + ))), 40 + atproto_accept_labelers: Some(vec![CowStr::from( 41 + "did:plc:ar7c4by46qjdydhdevvrndac;redact", 42 + )]), 43 + extra_headers: vec![], 44 + } 45 + } 46 + 47 + pub fn parse_did(did_str: &str) -> Result<Did<'static>> { 48 + Did::new(did_str) 49 + .into_diagnostic() 50 + .map(|did| did.into_static()) 51 + }
+3
src/moderation/mod.rs
··· 1 1 pub mod account; 2 2 pub mod claims; 3 + mod helpers; 3 4 pub mod post; 4 5 pub mod rate_limiter; 6 + 7 + pub use helpers::*;
+19 -70
src/moderation/post.rs
··· 9 9 ModEventLabel, ModEventReport, ModEventTakedown, ModTool, 10 10 }; 11 11 use jacquard_common::CowStr; 12 - use jacquard_common::types::string::{AtUri, Cid, Did}; 13 - use jacquard_common::types::value::{Data, Object}; 14 - use jacquard_common::xrpc::{CallOptions, XrpcClient}; 12 + use jacquard_common::types::string::{AtUri, Cid}; 13 + use jacquard_common::xrpc::XrpcClient; 15 14 use miette::{IntoDiagnostic, Result}; 16 - use tracing::{debug, info}; 17 - 18 - use jacquard_common::smol_str::SmolStr; 19 15 use std::collections::BTreeMap; 16 + use tracing::{debug, info}; 20 17 21 18 use crate::config::Config; 22 19 use crate::moderation::rate_limiter::RateLimiter; 20 + use crate::moderation::{ 21 + build_mod_tool_meta, build_moderation_call_opts, build_timestamped_comment, parse_did, 22 + }; 23 23 24 24 /// Label a post with a specific label via Ozone moderation API 25 25 pub async fn label_post( ··· 38 38 39 39 info!("Labeling post {} with label: {}", post_uri, label_val); 40 40 41 - let timestamp = chrono::Utc::now().to_rfc3339(); 42 - let comment = format!("{}: {}", timestamp, check_comment); 43 - 44 - // Build mod_tool meta 45 - let mut meta_map = BTreeMap::new(); 46 - meta_map.insert( 47 - SmolStr::new("externalUrl"), 48 - format!("https://pdsls.dev/{}", post_uri).into(), 49 - ); 50 - meta_map.insert(SmolStr::new("phash"), phash.into()); 41 + let comment = build_timestamped_comment(check_comment); 42 + let meta = build_mod_tool_meta(post_uri, phash); 51 43 52 44 // Create moderation label event using jacquard-api types 53 45 let event = EmitEvent::new() 54 - .created_by(Did::new(created_by).into_diagnostic()?) 46 + .created_by(parse_did(created_by)?) 55 47 .event(EmitEventEvent::ModEventLabel(Box::new( 56 48 ModEventLabel::builder() 57 49 .create_label_vals(vec![CowStr::from(label_val)]) ··· 67 59 ))) 68 60 .mod_tool(ModTool { 69 61 name: CowStr::from("skywatch/skywatch-phash-rs"), 70 - meta: Some(Data::Object(Object::from(meta_map))), 62 + meta: Some(meta), 71 63 extra_data: BTreeMap::new(), 72 64 }) 73 65 .build(); 74 66 75 - // Build call options with proxy headers 76 - let opts = CallOptions { 77 - auth: None, // Agent handles auth automatically 78 - atproto_proxy: Some(CowStr::from(format!( 79 - "{}#atproto_labeler", 80 - config.moderation.labeler_did 81 - ))), 82 - atproto_accept_labelers: Some(vec![CowStr::from( 83 - "did:plc:ar7c4by46qjdydhdevvrndac;redact", 84 - )]), 85 - extra_headers: vec![], 86 - }; 67 + let opts = build_moderation_call_opts(config); 87 68 88 69 // Send request via jacquard agent 89 70 let _response = agent.send_with_opts(event, opts).await.into_diagnostic()?; ··· 110 91 111 92 info!("Reporting post {} to ozone: {:?}", post_uri, reason); 112 93 113 - let timestamp = chrono::Utc::now().to_rfc3339(); 114 - let comment = format!("{}: {}", timestamp, check_comment); 115 - 116 - // Extract DID from URI 94 + let comment = build_timestamped_comment(check_comment); 95 + let meta = build_mod_tool_meta(post_uri, phash); 117 96 let did_str = extract_did_from_uri(post_uri)?; 118 97 119 - // Build mod_tool meta 120 - let mut meta_map = BTreeMap::new(); 121 - meta_map.insert( 122 - SmolStr::new("externalUrl"), 123 - format!("https://pdsls.dev/{}", post_uri).into(), 124 - ); 125 - meta_map.insert(SmolStr::new("phash"), phash.into()); 126 - 127 98 // Create moderation report event using jacquard-api types 128 99 let event = EmitEvent::new() 129 - .created_by(Did::new(created_by).into_diagnostic()?) 100 + .created_by(parse_did(created_by)?) 130 101 .event(EmitEventEvent::ModEventReport(Box::new( 131 102 ModEventReport::builder() 132 103 .report_type(reason) ··· 135 106 ))) 136 107 .subject(EmitEventSubject::RepoRef(Box::new( 137 108 RepoRef::builder() 138 - .did(Did::new(&did_str).into_diagnostic()?) 109 + .did(parse_did(&did_str)?) 139 110 .build(), 140 111 ))) 141 112 .subject_blob_cids(vec![]) 142 113 .mod_tool(ModTool { 143 114 name: CowStr::from("skywatch/skywatch-phash-rs"), 144 - meta: Some(Data::Object(Object::from(meta_map))), 115 + meta: Some(meta), 145 116 extra_data: BTreeMap::new(), 146 117 }) 147 118 .build(); 148 119 149 - // Build call options with proxy headers 150 - let opts = CallOptions { 151 - auth: None, 152 - atproto_proxy: Some(CowStr::from(format!( 153 - "{}#atproto_labeler", 154 - config.moderation.labeler_did 155 - ))), 156 - atproto_accept_labelers: Some(vec![CowStr::from( 157 - "did:plc:ar7c4by46qjdydhdevvrndac;redact", 158 - )]), 159 - extra_headers: vec![], 160 - }; 120 + let opts = build_moderation_call_opts(config); 161 121 162 122 // Send request via jacquard agent 163 123 let _response = agent.send_with_opts(event, opts).await.into_diagnostic()?; ··· 184 144 185 145 // Create moderation takedown event using jacquard-api types 186 146 let event = EmitEvent::new() 187 - .created_by(Did::new(created_by).into_diagnostic()?) 147 + .created_by(parse_did(created_by)?) 188 148 .event(EmitEventEvent::ModEventTakedown(Box::new( 189 149 ModEventTakedown { 190 150 comment: Some(CowStr::from(comment)), ··· 199 159 ))) 200 160 .build(); 201 161 202 - // Build call options with proxy headers 203 - let opts = CallOptions { 204 - auth: None, 205 - atproto_proxy: Some(CowStr::from(format!( 206 - "{}#atproto_labeler", 207 - config.moderation.labeler_did 208 - ))), 209 - atproto_accept_labelers: Some(vec![CowStr::from( 210 - "did:plc:ar7c4by46qjdydhdevvrndac;redact", 211 - )]), 212 - extra_headers: vec![], 213 - }; 162 + let opts = build_moderation_call_opts(config); 214 163 215 164 // Send request via jacquard agent 216 165 let _response = agent.send_with_opts(event, opts).await.into_diagnostic()?;
+25 -16
src/processor/matcher.rs
··· 78 78 for check in blob_checks { 79 79 // Check if DID is in ignore list 80 80 if let Some(ignore_list) = &check.ignore_did { 81 - if ignore_list.contains(&did.to_string()) { 81 + if ignore_list.iter().any(|ignored_did| ignored_did.as_str() == did) { 82 82 debug!("Skipping check '{}' for ignored DID: {}", check.label, did); 83 83 continue; 84 84 } ··· 88 88 89 89 // Check each phash in the check 90 90 for check_phash in &check.phashes { 91 - match phash::hamming_distance(phash, check_phash) { 91 + match phash::hamming_distance(phash, check_phash.as_str()) { 92 92 Ok(distance) => { 93 93 if distance <= threshold { 94 94 info!( ··· 96 96 check.label, distance, threshold 97 97 ); 98 98 return Some(MatchResult { 99 - phash: phash.to_string(), 99 + phash: phash.to_string().into(), 100 100 matched_check: check.clone(), 101 101 matched_phash: check_phash.clone(), 102 102 hamming_distance: distance, ··· 182 182 183 183 #[test] 184 184 fn test_match_phash_exact() { 185 + 186 + 185 187 let checks = vec![BlobCheck { 186 - phashes: vec!["deadbeefdeadbeef".to_string()], 187 - label: "test-label".to_string(), 188 - comment: "Test".to_string(), 188 + phashes: vec!["deadbeefdeadbeef".to_string().into()], 189 + label: "test-label".to_string().into(), 190 + comment: "Test".to_string().into(), 189 191 report_acct: false, 190 192 label_acct: false, 191 193 report_post: true, ··· 204 206 205 207 #[test] 206 208 fn test_match_phash_within_threshold() { 209 + 210 + 207 211 let checks = vec![BlobCheck { 208 - phashes: vec!["deadbeefdeadbeef".to_string()], 209 - label: "test-label".to_string(), 210 - comment: "Test".to_string(), 212 + phashes: vec!["deadbeefdeadbeef".to_string().into()], 213 + label: "test-label".to_string().into(), 214 + comment: "Test".to_string().into(), 211 215 report_acct: false, 212 216 label_acct: false, 213 217 report_post: true, ··· 227 231 228 232 #[test] 229 233 fn test_match_phash_exceeds_threshold() { 234 + 235 + 230 236 let checks = vec![BlobCheck { 231 - phashes: vec!["deadbeefdeadbeef".to_string()], 232 - label: "test-label".to_string(), 233 - comment: "Test".to_string(), 237 + phashes: vec!["deadbeefdeadbeef".to_string().into()], 238 + label: "test-label".to_string().into(), 239 + comment: "Test".to_string().into(), 234 240 report_acct: false, 235 241 label_acct: false, 236 242 report_post: true, ··· 249 255 250 256 #[test] 251 257 fn test_match_phash_ignored_did() { 258 + use jacquard_common::types::string::Did; 259 + use jacquard_common::IntoStatic; 260 + 252 261 let checks = vec![BlobCheck { 253 - phashes: vec!["deadbeefdeadbeef".to_string()], 254 - label: "test-label".to_string(), 255 - comment: "Test".to_string(), 262 + phashes: vec!["deadbeefdeadbeef".to_string().into()], 263 + label: "test-label".to_string().into(), 264 + comment: "Test".to_string().into(), 256 265 report_acct: false, 257 266 label_acct: false, 258 267 report_post: true, ··· 261 270 takedown_acct: false, 262 271 hamming_threshold: Some(3), 263 272 description: None, 264 - ignore_did: Some(vec!["did:plc:ignored".to_string()]), 273 + ignore_did: Some(vec![Did::new("did:plc:ignored").unwrap().into_static()]), 265 274 }]; 266 275 267 276 let result = match_phash("deadbeefdeadbeef", &checks, "did:plc:ignored", 3);
+1 -1
src/queue/redis_queue.rs
··· 153 153 154 154 #[cfg(test)] 155 155 mod tests { 156 - use super::*; 156 + 157 157 158 158 // Note: These are integration tests that require a running Redis instance 159 159 // Run with: cargo test --test queue -- --ignored
+110 -90
src/queue/worker.rs
··· 16 16 use crate::queue::redis_queue::JobQueue; 17 17 use crate::types::{BlobCheck, ImageJob, MatchResult}; 18 18 19 + /// Macro to handle moderation actions with claim checking 20 + macro_rules! moderation_action { 21 + // Report pattern: claim_X_report -> action 22 + (report: $check_field:expr, $action_name:expr, $claim_fn:expr, $action:expr, $metrics_done:expr, $metrics_skip:expr, $subject:expr) => { 23 + if $check_field { 24 + if $claim_fn { 25 + $action.await?; 26 + $metrics_done; 27 + info!(concat!($action_name, " completed for: {}"), $subject); 28 + } else { 29 + $metrics_skip; 30 + info!(concat!($action_name, " already done, skipping: {}"), $subject); 31 + } 32 + } 33 + }; 34 + 35 + // Label pattern: !has_label -> action -> set_label 36 + (label: $check_field:expr, $action_name:expr, $has_label_fn:expr, $action:expr, $set_label_fn:expr, $metrics_done:expr, $metrics_skip:expr, $subject:expr) => { 37 + if $check_field { 38 + if !$has_label_fn { 39 + $action.await?; 40 + $metrics_done; 41 + $set_label_fn.await?; 42 + info!(concat!($action_name, " completed for: {}"), $subject); 43 + } else { 44 + $metrics_skip; 45 + info!(concat!($action_name, " already done, skipping: {}"), $subject); 46 + } 47 + } 48 + }; 49 + } 50 + 19 51 /// Worker pool for processing image jobs 20 52 pub struct WorkerPool { 21 53 config: Config, ··· 226 258 check.label, job.post_uri 227 259 ); 228 260 229 - // Report post if configured 230 - if check.report_post { 231 - if claims::claim_post_report(redis_conn, &job.post_uri, &check.label).await? { 232 - post::report_post( 233 - agent.as_ref(), 234 - config, 235 - rate_limiter, 236 - &job.post_uri, 237 - &job.post_cid, 238 - ReasonType::ComAtprotoModerationDefsReasonSpam, 239 - &check.comment, 240 - &match_result.phash, 241 - created_by, 242 - ) 243 - .await?; 244 - metrics.inc_posts_reported(); 245 - info!("Reported post: {}", job.post_uri); 246 - } else { 247 - metrics.inc_posts_already_reported(); 248 - info!("Post already reported, skipping: {}", job.post_uri); 249 - } 250 - } 261 + moderation_action!( 262 + report: check.report_post, 263 + "Report post", 264 + claims::claim_post_report(redis_conn, &job.post_uri, &check.label).await?, 265 + post::report_post( 266 + agent.as_ref(), 267 + config, 268 + rate_limiter, 269 + &job.post_uri, 270 + &job.post_cid, 271 + ReasonType::ComAtprotoModerationDefsReasonSpam, 272 + &check.comment, 273 + &match_result.phash, 274 + created_by, 275 + ), 276 + metrics.inc_posts_reported(), 277 + metrics.inc_posts_already_reported(), 278 + job.post_uri 279 + ); 251 280 252 - // Label post if configured 253 - if check.to_label { 254 - if !claims::has_label(redis_conn, &job.post_uri, &check.label).await? { 255 - post::label_post( 256 - agent.as_ref(), 257 - config, 258 - rate_limiter, 259 - &job.post_uri, 260 - &job.post_cid, 261 - &check.label, 262 - &check.comment, 263 - &match_result.phash, 264 - created_by, 265 - ) 266 - .await?; 267 - metrics.inc_posts_labeled(); 268 - claims::set_label(redis_conn, &job.post_uri, &check.label, None).await?; 269 - info!("Labeled post: {}", job.post_uri); 270 - } else { 271 - metrics.inc_posts_already_labeled(); 272 - info!("Post already labeled, skipping: {}", job.post_uri); 273 - } 274 - } 281 + moderation_action!( 282 + label: check.to_label, 283 + "Label post", 284 + claims::has_label(redis_conn, &job.post_uri, &check.label).await?, 285 + post::label_post( 286 + agent.as_ref(), 287 + config, 288 + rate_limiter, 289 + &job.post_uri, 290 + &job.post_cid, 291 + &check.label, 292 + &check.comment, 293 + &match_result.phash, 294 + created_by, 295 + ), 296 + claims::set_label(redis_conn, &job.post_uri, &check.label, None), 297 + metrics.inc_posts_labeled(), 298 + metrics.inc_posts_already_labeled(), 299 + job.post_uri 300 + ); 275 301 276 - // Report account if configured 277 - if check.report_acct { 278 - if claims::claim_account_report(redis_conn, &job.post_did, &check.label).await? { 279 - account::report_account( 280 - agent.as_ref(), 281 - config, 282 - rate_limiter, 283 - &job.post_did, 284 - ReasonType::ComAtprotoModerationDefsReasonSpam, 285 - &check.comment, 286 - &job.post_uri, 287 - &match_result.phash, 288 - created_by, 289 - ) 290 - .await?; 291 - metrics.inc_accounts_reported(); 292 - info!("Reported account: {}", job.post_did); 293 - } else { 294 - metrics.inc_accounts_already_reported(); 295 - info!("Account already reported, skipping: {}", job.post_did); 296 - } 297 - } 302 + moderation_action!( 303 + report: check.report_acct, 304 + "Report account", 305 + claims::claim_account_report(redis_conn, &job.post_did, &check.label).await?, 306 + account::report_account( 307 + agent.as_ref(), 308 + config, 309 + rate_limiter, 310 + &job.post_did, 311 + ReasonType::ComAtprotoModerationDefsReasonSpam, 312 + &check.comment, 313 + &job.post_uri, 314 + &match_result.phash, 315 + created_by, 316 + ), 317 + metrics.inc_accounts_reported(), 318 + metrics.inc_accounts_already_reported(), 319 + job.post_did 320 + ); 298 321 299 - // Label account if configured 300 - if check.label_acct { 301 - if !claims::has_label(redis_conn, &job.post_did, &check.label).await? { 302 - account::label_account( 303 - agent.as_ref(), 304 - config, 305 - rate_limiter, 306 - &job.post_did, 307 - &check.label, 308 - &check.comment, 309 - &job.post_uri, 310 - &match_result.phash, 311 - created_by, 312 - ) 313 - .await?; 314 - metrics.inc_accounts_labeled(); 315 - claims::set_label(redis_conn, &job.post_did, &check.label, None).await?; 316 - info!("Labeled account: {}", job.post_did); 317 - } else { 318 - metrics.inc_accounts_already_labeled(); 319 - info!("Account already labeled, skipping: {}", job.post_did); 320 - } 321 - } 322 + moderation_action!( 323 + label: check.label_acct, 324 + "Label account", 325 + claims::has_label(redis_conn, &job.post_did, &check.label).await?, 326 + account::label_account( 327 + agent.as_ref(), 328 + config, 329 + rate_limiter, 330 + &job.post_did, 331 + &check.label, 332 + &check.comment, 333 + &job.post_uri, 334 + &match_result.phash, 335 + created_by, 336 + ), 337 + claims::set_label(redis_conn, &job.post_did, &check.label, None), 338 + metrics.inc_accounts_labeled(), 339 + metrics.inc_accounts_already_labeled(), 340 + job.post_did 341 + ); 322 342 323 343 Ok(()) 324 344 }
+102 -16
src/types/mod.rs
··· 1 - use serde::{Deserialize, Serialize}; 1 + use jacquard_common::types::string::{AtUri, Cid, Did}; 2 + use jacquard_common::{CowStr, IntoStatic}; 3 + use serde::{Deserialize, Deserializer, Serialize}; 2 4 3 5 #[derive(Debug, Clone, Serialize, Deserialize)] 4 6 #[serde(rename_all = "camelCase")] 7 + #[serde(bound(deserialize = ""))] 5 8 pub struct BlobCheck { 6 - pub phashes: Vec<String>, 7 - pub label: String, 8 - pub comment: String, 9 + #[serde(deserialize_with = "deserialize_cowstr_vec")] 10 + pub phashes: Vec<CowStr<'static>>, 11 + #[serde(deserialize_with = "deserialize_cowstr")] 12 + pub label: CowStr<'static>, 13 + #[serde(deserialize_with = "deserialize_cowstr")] 14 + pub comment: CowStr<'static>, 9 15 pub report_acct: bool, 10 16 pub label_acct: bool, 11 17 pub report_post: bool, ··· 16 22 pub takedown_acct: bool, 17 23 #[serde(default)] 18 24 pub hamming_threshold: Option<u32>, 19 - #[serde(default)] 20 - pub description: Option<String>, 21 - #[serde(default, alias = "ignoreDID")] 22 - pub ignore_did: Option<Vec<String>>, 25 + #[serde(default, deserialize_with = "deserialize_option_cowstr")] 26 + pub description: Option<CowStr<'static>>, 27 + #[serde(default, alias = "ignoreDID", deserialize_with = "deserialize_option_did_vec")] 28 + pub ignore_did: Option<Vec<Did<'static>>>, 23 29 } 24 30 25 31 #[derive(Debug, Clone, Serialize, Deserialize)] 26 32 #[serde(rename_all = "camelCase")] 33 + #[serde(bound(deserialize = ""))] 27 34 pub struct BlobReference { 28 - pub cid: String, 29 - #[serde(default)] 30 - pub mime_type: Option<String>, 35 + #[serde(deserialize_with = "deserialize_cid")] 36 + pub cid: Cid<'static>, 37 + #[serde(default, deserialize_with = "deserialize_option_cowstr")] 38 + pub mime_type: Option<CowStr<'static>>, 31 39 } 32 40 33 41 #[derive(Debug, Clone, Serialize, Deserialize)] 34 42 #[serde(rename_all = "camelCase")] 43 + #[serde(bound(deserialize = ""))] 35 44 pub struct ImageJob { 36 - pub post_uri: String, 37 - pub post_cid: String, 38 - pub post_did: String, 45 + #[serde(deserialize_with = "deserialize_at_uri")] 46 + pub post_uri: AtUri<'static>, 47 + #[serde(deserialize_with = "deserialize_cid")] 48 + pub post_cid: Cid<'static>, 49 + #[serde(deserialize_with = "deserialize_did")] 50 + pub post_did: Did<'static>, 39 51 pub blobs: Vec<BlobReference>, 40 52 pub timestamp: i64, 41 53 pub attempts: u32, ··· 43 55 44 56 #[derive(Debug, Clone, Serialize, Deserialize)] 45 57 #[serde(rename_all = "camelCase")] 58 + #[serde(bound(deserialize = ""))] 46 59 pub struct MatchResult { 47 - pub phash: String, 60 + #[serde(deserialize_with = "deserialize_cowstr")] 61 + pub phash: CowStr<'static>, 48 62 pub matched_check: BlobCheck, 49 - pub matched_phash: String, 63 + #[serde(deserialize_with = "deserialize_cowstr")] 64 + pub matched_phash: CowStr<'static>, 50 65 pub hamming_distance: u32, 51 66 } 67 + 68 + fn deserialize_cowstr<'de, D>(deserializer: D) -> Result<CowStr<'static>, D::Error> 69 + where 70 + D: Deserializer<'de>, 71 + { 72 + let s = String::deserialize(deserializer)?; 73 + Ok(CowStr::from(s)) 74 + } 75 + 76 + fn deserialize_cowstr_vec<'de, D>(deserializer: D) -> Result<Vec<CowStr<'static>>, D::Error> 77 + where 78 + D: Deserializer<'de>, 79 + { 80 + let strings: Vec<String> = Vec::deserialize(deserializer)?; 81 + Ok(strings.into_iter().map(CowStr::from).collect()) 82 + } 83 + 84 + fn deserialize_option_cowstr<'de, D>(deserializer: D) -> Result<Option<CowStr<'static>>, D::Error> 85 + where 86 + D: Deserializer<'de>, 87 + { 88 + let opt: Option<String> = Option::deserialize(deserializer)?; 89 + Ok(opt.map(CowStr::from)) 90 + } 91 + 92 + fn deserialize_cid<'de, D>(deserializer: D) -> Result<Cid<'static>, D::Error> 93 + where 94 + D: Deserializer<'de>, 95 + { 96 + let s = String::deserialize(deserializer)?; 97 + Ok(Cid::str(&s).into_static()) 98 + } 99 + 100 + fn deserialize_did<'de, D>(deserializer: D) -> Result<Did<'static>, D::Error> 101 + where 102 + D: Deserializer<'de>, 103 + { 104 + let s = String::deserialize(deserializer)?; 105 + Did::new(&s) 106 + .map(|d| d.into_static()) 107 + .map_err(serde::de::Error::custom) 108 + } 109 + 110 + fn deserialize_option_did_vec<'de, D>( 111 + deserializer: D, 112 + ) -> Result<Option<Vec<Did<'static>>>, D::Error> 113 + where 114 + D: Deserializer<'de>, 115 + { 116 + let opt: Option<Vec<String>> = Option::deserialize(deserializer)?; 117 + match opt { 118 + Some(strings) => { 119 + let dids: Result<Vec<Did<'static>>, _> = strings 120 + .iter() 121 + .map(|s| Did::new(s).map(|d| d.into_static())) 122 + .collect(); 123 + Ok(Some(dids.map_err(serde::de::Error::custom)?)) 124 + } 125 + None => Ok(None), 126 + } 127 + } 128 + 129 + fn deserialize_at_uri<'de, D>(deserializer: D) -> Result<AtUri<'static>, D::Error> 130 + where 131 + D: Deserializer<'de>, 132 + { 133 + let s = String::deserialize(deserializer)?; 134 + AtUri::new(&s) 135 + .map(|u| u.into_static()) 136 + .map_err(serde::de::Error::custom) 137 + }