Rust wrapper for the ATProto tap utility
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Improve record slice handling to avoid needing self_cell

+55 -99
-7
Cargo.lock
··· 2225 2225 ] 2226 2226 2227 2227 [[package]] 2228 - name = "self_cell" 2229 - version = "1.2.2" 2230 - source = "registry+https://github.com/rust-lang/crates.io-index" 2231 - checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" 2232 - 2233 - [[package]] 2234 2228 name = "semver" 2235 2229 version = "1.0.27" 2236 2230 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2602 2596 "futures-util", 2603 2597 "libc", 2604 2598 "reqwest", 2605 - "self_cell", 2606 2599 "serde", 2607 2600 "serde_json", 2608 2601 "thiserror 2.0.18",
-1
tapped/Cargo.toml
··· 22 22 tracing = "0.1" 23 23 base64 = "0.22" 24 24 libc = "0.2" 25 - self_cell = "1.2.2" 26 25 27 26 [dev-dependencies] 28 27 tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
+9 -28
tapped/src/channel.rs
··· 7 7 use tungstenite::protocol::frame::Utf8Bytes; 8 8 use url::Url; 9 9 10 - use crate::types::{RawEvent, RawEventOwned, RawRecordEventOwned, Record, UnparsedRecord}; 10 + use crate::types::RawEvent; 11 11 use crate::{Error, Event, Result}; 12 12 13 13 type WsStream = ··· 124 124 loop { 125 125 match self.event_rx.recv().await { 126 126 Some(event_with_ack) => { 127 - let mut raw_event_owned = None::<RawEventOwned>; 128 - let mut raw_record_owned = None::<RawRecordEventOwned>; 129 - let inner_record = Record::new(event_with_ack.event, |json| { 130 - match serde_json::from_str::<RawEvent>(json) { 131 - Ok(mut raw) => { 132 - let inner = if let Some(mut rec) = raw.record.take() { 133 - let inner = rec.record.take(); 134 - raw_record_owned = Some(rec.owned); 135 - inner 136 - } else { 137 - None 138 - }; 139 - raw_event_owned = Some(raw.owned); 140 - UnparsedRecord(inner) 141 - } 142 - Err(e) => { 143 - tracing::warn!("Failed to parse event: {}", e); 144 - UnparsedRecord(None) 145 - } 127 + let json = event_with_ack.event; 128 + let raw = match serde_json::from_str::<RawEvent>(json.as_str()) { 129 + Ok(raw) => raw, 130 + Err(e) => { 131 + tracing::warn!("Failed to parse event: {}", e); 132 + continue; 146 133 } 147 - }); 148 - let inner_record = if inner_record.borrow_dependent().0.is_some() { 149 - Some(inner_record) 150 - } else { 151 - None 152 134 }; 153 - if let Some(event) = 154 - raw_event_owned.and_then(|r| r.into_event(raw_record_owned, inner_record)) 155 - { 135 + 136 + if let Some(event) = raw.into_event(json.clone()) { 156 137 let id = event.id(); 157 138 break Ok(ReceivedEvent { 158 139 event,
+46 -63
tapped/src/types.rs
··· 7 7 8 8 use crate::Error; 9 9 10 - self_cell::self_cell!( 11 - pub(crate) struct Record { 12 - owner: Utf8Bytes, 13 - 14 - #[covariant] 15 - dependent: UnparsedRecord, 16 - } 17 - ); 18 - 19 - pub(crate) struct UnparsedRecord<'a>(pub(crate) Option<&'a RawValue>); 20 - 21 10 /// Action performed on a record. 22 11 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 23 12 #[serde(rename_all = "lowercase")] ··· 73 62 pub action: RecordAction, 74 63 /// CID of the record (None on delete). 75 64 pub cid: Option<String>, 76 - /// The record data (None on delete). 77 - pub(crate) record: Option<Record>, 65 + // Inner record JSON pointing into the outer JSON 66 + json: Option<Utf8Bytes>, 67 + record_offset: usize, 68 + record_len: usize, 78 69 } 79 70 80 71 impl RecordEvent { 81 72 /// Get the record's content as a reference to a JSON string 82 73 pub fn record_as_str(&self) -> Option<&str> { 83 - self.record 74 + self.json 84 75 .as_ref() 85 - .and_then(|r| r.borrow_dependent().0) 86 - .map(|rv| rv.get()) 76 + .map(|j| &j.as_str()[self.record_offset..self.record_offset + self.record_len]) 87 77 } 88 78 89 79 /// Parse the record's content to a compatible struct ··· 217 207 218 208 // Internal deserialisation structures for parsing tap's JSON format 219 209 220 - #[derive(Deserialize, Clone)] 210 + #[derive(Deserialize)] 221 211 #[serde(bound(deserialize = "'de: 'a"))] 222 212 pub(crate) struct RawEvent<'a> { 223 - #[serde(flatten)] 224 - pub(crate) owned: RawEventOwned, 225 - pub(crate) record: Option<RawRecordEvent<'a>>, 226 - } 227 - 228 - #[derive(Deserialize, Clone)] 229 - pub(crate) struct RawEventOwned { 230 - pub(crate) id: u64, 213 + pub id: u64, 231 214 #[serde(rename = "type")] 232 - pub(crate) type_: String, 233 - pub(crate) identity: Option<RawIdentityEvent>, 215 + pub type_: String, 216 + pub identity: Option<RawIdentityEvent>, 217 + pub record: Option<RawRecordEvent<'a>>, 234 218 } 235 219 236 - #[derive(Deserialize, Clone)] 220 + #[derive(Deserialize)] 237 221 #[serde(bound(deserialize = "'de: 'a"))] 238 222 pub(crate) struct RawRecordEvent<'a> { 239 - #[serde(flatten)] 240 - pub owned: RawRecordEventOwned, 241 - pub record: Option<&'a RawValue>, 242 - } 243 - 244 - #[derive(Deserialize, Clone)] 245 - pub(crate) struct RawRecordEventOwned { 246 223 pub live: bool, 247 224 pub did: String, 248 225 pub rev: String, ··· 250 227 pub rkey: String, 251 228 pub action: RecordAction, 252 229 pub cid: Option<String>, 230 + pub record: Option<&'a RawValue>, 253 231 } 254 232 255 233 #[derive(Deserialize, Clone)] ··· 261 239 pub status: AccountStatus, 262 240 } 263 241 264 - impl RawEventOwned { 242 + impl RawEvent<'_> { 265 243 /// Convert to the public Event type. 266 - pub fn into_event( 267 - self, 268 - raw_record: Option<RawRecordEventOwned>, 269 - inner_record: Option<Record>, 270 - ) -> Option<Event> { 244 + pub fn into_event(self, json: Utf8Bytes) -> Option<Event> { 271 245 match self.type_.as_str() { 272 246 "record" => { 273 - let r = raw_record?; 247 + let r = self.record?; 248 + let (json, record_offset, record_len) = if let Some(rv) = r.record.as_ref() { 249 + let json_str = json.as_str(); 250 + let rv_str = rv.get(); 251 + let offset = rv_str.as_ptr() as usize - json_str.as_ptr() as usize; 252 + (Some(json), offset, rv_str.len()) 253 + } else { 254 + (None, 0, 0) 255 + }; 274 256 Some(Event::Record(RecordEvent { 275 257 id: self.id, 276 258 live: r.live, ··· 280 262 rkey: r.rkey, 281 263 action: r.action, 282 264 cid: r.cid, 283 - record: inner_record, 265 + json, 266 + record_offset, 267 + record_len, 284 268 })) 285 269 } 286 270 "identity" => { ··· 477 461 }) 478 462 .to_string(); 479 463 480 - let raw: RawEvent = serde_json::from_str(&json).unwrap(); 481 - assert_eq!(raw.owned.id, 12345); 482 - assert_eq!(raw.owned.type_, "record"); 464 + let json: Utf8Bytes = json.into(); 465 + let raw: RawEvent = serde_json::from_str(json.as_str()).unwrap(); 466 + assert_eq!(raw.id, 12345); 467 + assert_eq!(raw.type_, "record"); 483 468 484 - let event = raw 485 - .owned 486 - .into_event(raw.record.map(|r| r.owned), None) 487 - .unwrap(); 469 + let event = raw.into_event(json.clone()).unwrap(); 488 470 match event { 489 471 Event::Record(r) => { 490 472 assert_eq!(r.id, 12345); ··· 499 481 500 482 #[test] 501 483 fn raw_identity_event_deserialize() { 502 - let json = json!({ 484 + let json: Utf8Bytes = json!({ 503 485 "id": 99999, 504 486 "type": "identity", 505 487 "identity": { ··· 509 491 "status": "active" 510 492 } 511 493 }) 512 - .to_string(); 494 + .to_string() 495 + .into(); 513 496 514 - let raw: RawEvent = serde_json::from_str(&json).unwrap(); 515 - let event = raw.owned.into_event(None, None).unwrap(); 497 + let raw: RawEvent = serde_json::from_str(json.as_str()).unwrap(); 498 + let event = raw.into_event(json.clone()).unwrap(); 516 499 517 500 match event { 518 501 Event::Identity(i) => { ··· 528 511 529 512 #[test] 530 513 fn raw_delete_event_no_record() { 531 - let json = json!({ 514 + let json: Utf8Bytes = json!({ 532 515 "id": 55555, 533 516 "type": "record", 534 517 "record": { ··· 542 525 "record": null 543 526 } 544 527 }) 545 - .to_string(); 528 + .to_string() 529 + .into(); 546 530 547 - let raw: RawEvent = serde_json::from_str(&json).unwrap(); 548 - let event = raw 549 - .owned 550 - .into_event(raw.record.map(|r| r.owned), None) 551 - .unwrap(); 531 + let raw: RawEvent = serde_json::from_str(json.as_str()).unwrap(); 532 + let event = raw.into_event(json.clone()).unwrap(); 552 533 553 534 match event { 554 535 Event::Record(r) => { 555 536 assert_eq!(r.action, RecordAction::Delete); 556 537 assert!(r.cid.is_none()); 557 - assert!(r.record.is_none()); 538 + assert!(r.record_as_str().is_none()); 558 539 } 559 540 _ => panic!("Expected Record event"), 560 541 } ··· 571 552 rkey: "key".to_string(), 572 553 action: RecordAction::Create, 573 554 cid: None, 574 - record: None, 555 + json: None, 556 + record_offset: 0, 557 + record_len: 0, 575 558 }); 576 559 577 560 assert_eq!(record_event.id(), 123);