don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(jetstream): improve deserialization of jetstream types

Signed-off-by: tjh <x@tjh.dev>

tjh 726430ee 8cc24c31

+136 -115
+136 -114
crates/gordian-jetstream/src/de.rs
··· 1 + use core::fmt; 2 + 1 3 use gordian_types::Did; 2 4 use serde::Deserialize; 3 5 use serde::Serialize; 4 - use serde::de::Visitor; 5 6 use serde_json::value::RawValue; 6 7 use time::OffsetDateTime; 7 8 ··· 14 13 // To get around this limitation we first deserialize into a less ideal type, 15 14 // then manually transform into the types we want. 16 15 17 - #[derive(Debug, Deserialize, Serialize)] 16 + #[derive(Deserialize, Serialize)] 17 + #[serde(deny_unknown_fields)] 18 18 struct InnerEvent<'a> { 19 19 #[serde(borrow)] 20 20 did: &'a Did, ··· 26 24 account: Option<InnerAccount<'a>>, 27 25 } 28 26 29 - #[derive(Debug, Deserialize, Serialize)] 27 + #[derive(Deserialize, Serialize)] 28 + struct InnerAccount<'a> { 29 + active: bool, 30 + #[serde(borrow)] 31 + did: &'a Did, 32 + seq: i64, 33 + #[serde(skip_serializing_if = "Option::is_none")] 34 + status: Option<InnerAccountStatus>, 35 + #[serde(with = "time::serde::rfc3339")] 36 + time: OffsetDateTime, 37 + } 38 + 39 + // https://github.com/bluesky-social/jetstream/blob/f4e39a4b5bbc98cdb24f1929e7631751c4f681ae/pkg/models/models.go#L23 40 + #[derive(Deserialize, Serialize)] 30 41 struct InnerCommit<'a> { 31 42 #[serde(borrow)] 32 43 rev: &'a str, 33 44 operation: &'a str, 34 45 collection: &'a str, 35 46 rkey: &'a str, 36 - cid: Option<&'a str>, 37 47 record: Option<&'a RawValue>, 48 + cid: Option<&'a str>, 38 49 } 39 50 40 - #[derive(Debug, Deserialize, Serialize)] 51 + #[derive(Deserialize, Serialize)] 41 52 struct InnerIdentity<'a> { 42 53 #[serde(borrow)] 43 54 did: &'a Did, ··· 60 45 time: OffsetDateTime, 61 46 } 62 47 63 - #[derive(Debug, Deserialize, Serialize)] 64 - pub struct InnerAccount<'a> { 65 - #[serde(default)] 66 - pub active: bool, 67 - #[serde(borrow)] 68 - pub did: &'a Did, 69 - pub handle: Option<&'a str>, 70 - #[serde(default)] 71 - pub status: AccountStatus, 72 - pub seq: i64, 73 - #[serde(with = "time::serde::rfc3339")] 74 - pub time: OffsetDateTime, 75 - } 76 - 77 - #[derive(Debug, Default)] 78 - pub enum AccountStatus { 79 - Active, 80 - Takendown, 81 - Suspended, 82 - Deleted, 48 + // For known values see: 49 + // https://github.com/bluesky-social/indigo/blob/bf41e2ee75ab75997bf8cdd92b063c0a96db4aaf/lexicons/com/atproto/sync/subscribeRepos.json#L169 50 + #[derive(PartialEq, Eq, Deserialize, Serialize)] 51 + #[serde(rename_all = "lowercase")] 52 + enum InnerAccountStatus { 83 53 Deactivated, 54 + Deleted, 84 55 Desynchronized, 85 - Other(Box<str>), 86 - #[default] 87 - None, 56 + Suspended, 57 + Takendown, 58 + Throttled, 59 + #[serde(untagged)] 60 + Unknown(Box<str>), 88 61 } 89 62 90 - impl<'de> Deserialize<'de> for AccountStatus { 91 - fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 92 - where 93 - D: serde::Deserializer<'de>, 94 - { 95 - struct StatusVisitor; 96 - 97 - impl Visitor<'_> for StatusVisitor { 98 - type Value = AccountStatus; 99 - 100 - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { 101 - formatter.write_str("Account Status") 102 - } 103 - 104 - fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> 105 - where 106 - E: serde::de::Error, 107 - { 108 - match v { 109 - "active" => Ok(AccountStatus::Active), 110 - "takendown" => Ok(AccountStatus::Takendown), 111 - "suspened" => Ok(AccountStatus::Suspended), 112 - "deleted" => Ok(AccountStatus::Deleted), 113 - "deactivated" => Ok(AccountStatus::Deactivated), 114 - "desynchronized" => Ok(AccountStatus::Desynchronized), 115 - status => { 116 - tracing::warn!(?status, "unexpected account status"); 117 - Ok(AccountStatus::Other(v.into())) 118 - } 119 - } 120 - } 121 - 122 - fn visit_none<E>(self) -> Result<Self::Value, E> 123 - where 124 - E: serde::de::Error, 125 - { 126 - Ok(AccountStatus::None) 127 - } 128 - 129 - fn visit_unit<E>(self) -> Result<Self::Value, E> 130 - where 131 - E: serde::de::Error, 132 - { 133 - Ok(AccountStatus::None) 134 - } 135 - } 136 - 137 - deserializer.deserialize_any(StatusVisitor) 63 + impl fmt::Display for InnerAccountStatus { 64 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 65 + f.write_str(match self { 66 + Self::Deactivated => "deactivated", 67 + Self::Deleted => "deleted", 68 + Self::Desynchronized => "desynchronized", 69 + Self::Suspended => "suspended", 70 + Self::Takendown => "takendown", 71 + Self::Throttled => "throttled", 72 + Self::Unknown(value) => &value, 73 + }) 138 74 } 139 75 } 140 76 141 - impl Serialize for AccountStatus { 142 - fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 143 - where 144 - S: serde::Serializer, 145 - { 146 - match self { 147 - Self::Active => serializer.serialize_str("active"), 148 - Self::Takendown => serializer.serialize_str("takendown"), 149 - Self::Suspended => serializer.serialize_str("suspended"), 150 - Self::Deleted => serializer.serialize_str("deleted"), 151 - Self::Deactivated => serializer.serialize_str("deactivated"), 152 - Self::Desynchronized => serializer.serialize_str("desynchronized"), 153 - Self::Other(value) => serializer.serialize_str(value), 154 - Self::None => serializer.serialize_none(), 77 + // Identical to InnerAccountStatus but with the addition of `Active`. This 78 + // allows us to drop the `active` field from `Account` saving ourselves one 79 + // whole boolean. 80 + #[derive(Debug, PartialEq, Eq)] 81 + pub enum AccountStatus { 82 + Active, 83 + Deactivated, 84 + Deleted, 85 + Desynchronized, 86 + Suspended, 87 + Takendown, 88 + Throttled, 89 + Unknown(Box<str>), 90 + } 91 + 92 + impl AccountStatus { 93 + pub const fn is_active(&self) -> bool { 94 + matches!(self, Self::Active) 95 + } 96 + } 97 + 98 + impl From<Option<InnerAccountStatus>> for AccountStatus { 99 + fn from(value: Option<InnerAccountStatus>) -> Self { 100 + match value { 101 + None => Self::Active, 102 + Some(InnerAccountStatus::Deactivated) => Self::Deactivated, 103 + Some(InnerAccountStatus::Deleted) => Self::Deleted, 104 + Some(InnerAccountStatus::Desynchronized) => Self::Desynchronized, 105 + Some(InnerAccountStatus::Suspended) => Self::Suspended, 106 + Some(InnerAccountStatus::Takendown) => Self::Takendown, 107 + Some(InnerAccountStatus::Throttled) => Self::Throttled, 108 + Some(InnerAccountStatus::Unknown(value)) => Self::Unknown(value), 155 109 } 156 110 } 157 111 } ··· 177 193 } 178 194 } 179 195 180 - #[inline] 181 196 #[must_use] 182 197 pub const fn did_str(&'a self) -> &'a str { 183 198 self.did().as_str() ··· 231 248 #[derive(Debug)] 232 249 pub struct Account<'a> { 233 250 pub ts: OffsetDateTime, 234 - pub active: bool, 235 251 pub did: &'a Did, 236 - pub handle: Option<&'a str>, 237 252 pub status: AccountStatus, 238 253 pub seq: i64, 239 254 pub time: OffsetDateTime, 255 + } 256 + 257 + impl Account<'_> { 258 + pub const fn is_active(&self) -> bool { 259 + self.status.is_active() 260 + } 240 261 } 241 262 242 263 #[derive(Debug)] ··· 252 265 pub time: OffsetDateTime, 253 266 } 254 267 268 + #[derive(Debug, thiserror::Error)] 269 + pub enum Error { 270 + #[error("event timestamp: {0}")] 271 + Timestamp(#[from] time::error::ComponentRange), 272 + #[error("unknown event kind '{0}'")] 273 + EventKind(String), 274 + #[error("unknown commit operation '{0}'")] 275 + CommitOperation(String), 276 + #[error("required field \"{field}\" for commit operation \"{op}\" missing")] 277 + MissingField { 278 + field: &'static str, 279 + op: &'static str, 280 + }, 281 + #[error("account status mismatch; active = {0}, but status = \"{1:?}\"")] 282 + AccountStatus(bool, Option<String>), 283 + } 284 + 255 285 impl<'a> TryFrom<InnerEvent<'a>> for Event<'a> { 256 - type Error = &'static str; 286 + type Error = Error; 257 287 258 288 fn try_from(value: InnerEvent<'a>) -> Result<Self, Self::Error> { 259 - let ts = OffsetDateTime::from_unix_timestamp_nanos(i128::from(value.time_us) * 1000) 260 - .map_err(|_| "Failed to parse timestamp")?; 289 + let ts = OffsetDateTime::from_unix_timestamp_nanos(i128::from(value.time_us) * 1000)?; 290 + 261 291 match (value.kind, value.commit, value.account, value.identity) { 262 292 ("commit", Some(commit), None, None) => { 263 293 match (commit.operation, commit.cid, commit.record) { ··· 285 281 collection: commit.collection, 286 282 rkey: commit.rkey, 287 283 rev: commit.rev, 288 - cid, 289 284 record, 285 + cid, 290 286 }))) 291 287 } 292 288 293 - ("create", None, _) => Err("missing 'cid' field in commit create"), 294 - ("create", _, None) => Err("missing 'record' field in commit create"), 289 + ("create", None, _) => Err(Error::MissingField { 290 + field: "cid", 291 + op: "create", 292 + }), 293 + ("create", _, None) => Err(Error::MissingField { 294 + field: "record", 295 + op: "create", 296 + }), 295 297 ("update", Some(cid), Some(record)) => { 296 298 Ok(Self::Commit(CommitEvent::Update(Commit { 297 299 ts, ··· 305 295 collection: commit.collection, 306 296 rkey: commit.rkey, 307 297 rev: commit.rev, 308 - cid, 309 298 record, 299 + cid, 310 300 }))) 311 301 } 312 - ("update", None, _) => Err("missing 'cid' field in commit update"), 313 - ("update", _, None) => Err("missing 'record' field in commit update"), 302 + ("update", None, _) => Err(Error::MissingField { 303 + field: "cid", 304 + op: "update", 305 + }), 306 + ("update", _, None) => Err(Error::MissingField { 307 + field: "record", 308 + op: "update", 309 + }), 314 310 ("delete", None, None) => Ok(Self::Commit(CommitEvent::Delete(Delete { 315 311 ts, 316 312 did: value.did, ··· 324 308 rkey: commit.rkey, 325 309 rev: commit.rev, 326 310 }))), 327 - _ => Err("unexpected operation"), 311 + (op, _, _) => Err(Error::CommitOperation(op.to_string())), 312 + } 313 + } 314 + ("account", None, Some(account), None) => { 315 + if account.active ^ account.status.is_none() { 316 + return Err(Error::AccountStatus( 317 + account.active, 318 + account.status.map(|status| status.to_string()), 319 + )); 328 320 } 329 321 330 - // 322 + Ok(Self::Account(Account { 323 + ts, 324 + did: account.did, 325 + status: account.status.into(), 326 + seq: account.seq, 327 + time: account.time, 328 + })) 331 329 } 332 - ("account", None, Some(account), None) => Ok(Self::Account(Account { 333 - ts, 334 - active: account.active, 335 - did: account.did, 336 - handle: account.handle, 337 - status: account.status, 338 - seq: account.seq, 339 - time: account.time, 340 - })), 330 + 341 331 ("identity", None, None, Some(identity)) => Ok(Self::Identity(Identity { 342 332 ts, 343 333 did: identity.did, ··· 351 329 seq: identity.seq, 352 330 time: identity.time, 353 331 })), 354 - _ => Err("unexpected event kind"), 332 + (kind, _, _, _) => Err(Error::EventKind(kind.to_string())), 355 333 } 356 334 } 357 335 }
-1
crates/gordian-jetstream/src/lib.rs
··· 17 17 pub use de::Delete; 18 18 pub use de::Event; 19 19 pub use de::Identity; 20 - pub use de::InnerAccount; 21 20 pub use gordian_types::Did; 22 21 pub use gordian_types::Nsid; 23 22 pub use serde_json::Value;