this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: Using i64 values from peer feedback.

Signed-off-by: Nick Gerakines <12125+ngerakines@users.noreply.github.com>

+40 -97
-1
README.md
··· 72 72 # TODO 73 73 74 74 * use i64, it's fine 75 - * look up keys on startup 76 75 * possible scoring function for queries 77 76 * add likes 78 77 * support deletes
+1 -1
migrations/20241103180245_init.down.sql
··· 2 2 3 3 DROP TABLE feed_content; 4 4 DROP TABLE consumer_control; 5 - 5 + DROP TABLE verification_method_cache;
+2 -3
migrations/20241103180245_init.up.sql
··· 4 4 feed_id TEXT NOT NULL, 5 5 uri TEXT NOT NULL, 6 6 indexed_at INTEGER NOT NULL, 7 - indexed_at_more INTEGER NOT NULL, 8 7 cid TEXT NOT NULL, 9 8 updated_at DATETIME NOT NULL DEFAULT (datetime('now')), 10 9 PRIMARY KEY (feed_id, uri) 11 10 ); 12 11 13 - CREATE INDEX feed_content_idx_feed ON feed_content(feed_id, indexed_at DESC, indexed_at_more DESC, cid DESC); 12 + CREATE INDEX feed_content_idx_feed ON feed_content(feed_id, indexed_at DESC, cid DESC); 14 13 15 14 CREATE TABLE consumer_control ( 16 15 source TEXT NOT NULL, 17 - time_us VARCHAR NOT NULL, 16 + time_us INTEGER NOT NULL, 18 17 updated_at DATETIME NOT NULL DEFAULT (datetime('now')), 19 18 PRIMARY KEY (source) 20 19 );
+10 -5
src/consumer.rs
··· 96 96 let sleeper = sleep(interval); 97 97 tokio::pin!(sleeper); 98 98 99 - let mut time_usec = 0u64; 99 + let mut time_usec = 0i64; 100 100 101 101 loop { 102 102 tokio::select! { ··· 104 104 break; 105 105 }, 106 106 () = &mut sleeper => { 107 - consumer_control_insert(&self.pool, &self.config.jetstream_hostname, &time_usec.to_string()).await?; 107 + consumer_control_insert(&self.pool, &self.config.jetstream_hostname, time_usec).await?; 108 108 sleeper.as_mut().reset(Instant::now() + interval); 109 109 }, 110 110 item = client.next() => { ··· 164 164 if feed_matcher.matches(&event_value) { 165 165 tracing::debug!(feed_id = ?feed_matcher.feed, "matched event"); 166 166 if let Some((uri, cid)) = model::to_post_strong_ref(&event) { 167 - let feed_content = storage::model::FeedContent::new(feed_matcher.feed.clone(), uri, event.clone().time_us, cid); 167 + let feed_content = storage::model::FeedContent{ 168 + feed_id: feed_matcher.feed.clone(), 169 + uri, 170 + indexed_at: event.clone().time_us, 171 + cid, 172 + }; 168 173 feed_content_insert(&self.pool, &feed_content).await?; 169 174 } 170 175 } ··· 200 205 max_message_size_bytes: u64, 201 206 202 207 #[serde(skip_serializing_if = "Option::is_none")] 203 - cursor: Option<u64>, 208 + cursor: Option<i64>, 204 209 }, 205 210 } 206 211 ··· 273 278 pub(crate) struct Event { 274 279 pub(crate) did: String, 275 280 pub(crate) kind: String, 276 - pub(crate) time_us: u64, 281 + pub(crate) time_us: i64, 277 282 pub(crate) commit: Option<CommitOp>, 278 283 } 279 284
+5 -18
src/http/handle_get_feed_skeleton.rs
··· 58 58 59 59 let feed_control = feed_control.unwrap(); 60 60 61 - if feed_control.allowed.len() > 0 { 61 + if !feed_control.allowed.is_empty() { 62 62 let authorization = headers.get("Authorization").and_then(|value| { 63 63 value 64 64 .to_str() ··· 104 104 let cursor = feed_items 105 105 .iter() 106 106 .last() 107 - .map(|last_feed_item| format!("{},{}", last_feed_item.time_us(), last_feed_item.cid)); 107 + .map(|last_feed_item| format!("{},{}", last_feed_item.indexed_at, last_feed_item.cid)); 108 108 109 109 let feed_item_views = feed_items 110 110 .iter() ··· 197 197 Ok(claims.iss) 198 198 } 199 199 200 - fn parse_cursor(value: Option<String>) -> Option<(u64, u32, u32, String)> { 200 + fn parse_cursor(value: Option<String>) -> Option<(i64, String)> { 201 201 let value = value.as_ref()?; 202 202 203 203 let parts = value.split(",").collect::<Vec<&str>>(); ··· 205 205 return None; 206 206 } 207 207 208 - let time_us = parts[0].parse::<u64>(); 208 + let time_us = parts[0].parse::<i64>(); 209 209 if time_us.is_err() { 210 210 return None; 211 211 } 212 212 let time_us = time_us.unwrap(); 213 213 214 - let time_us_bytes = time_us.to_be_bytes(); 215 - let indexed_at = u32::from_be_bytes([ 216 - time_us_bytes[0], 217 - time_us_bytes[1], 218 - time_us_bytes[2], 219 - time_us_bytes[3], 220 - ]); 221 - let indexed_at_more = u32::from_be_bytes([ 222 - time_us_bytes[4], 223 - time_us_bytes[5], 224 - time_us_bytes[6], 225 - time_us_bytes[7], 226 - ]); 227 - Some((time_us, indexed_at, indexed_at_more, parts[1].to_string())) 214 + Some((time_us, parts[1].to_string())) 228 215 }
+22 -69
src/storage.rs
··· 14 14 pub struct FeedContent { 15 15 pub feed_id: String, 16 16 pub uri: String, 17 - pub indexed_at: u32, 18 - pub indexed_at_more: u32, 17 + pub indexed_at: i64, 19 18 pub cid: String, 20 19 } 21 - 22 - impl FeedContent { 23 - pub fn new(feed_id: String, uri: String, time_us: u64, cid: String) -> Self { 24 - // Are their better ways to do this? Probably. 25 - let time_us_bytes = time_us.to_be_bytes(); 26 - let indexed_at = u32::from_be_bytes([ 27 - time_us_bytes[0], 28 - time_us_bytes[1], 29 - time_us_bytes[2], 30 - time_us_bytes[3], 31 - ]); 32 - let indexed_at_more = u32::from_be_bytes([ 33 - time_us_bytes[4], 34 - time_us_bytes[5], 35 - time_us_bytes[6], 36 - time_us_bytes[7], 37 - ]); 38 - 39 - Self { 40 - feed_id, 41 - uri, 42 - indexed_at, 43 - indexed_at_more, 44 - cid, 45 - } 46 - } 47 - pub fn time_us(&self) -> u64 { 48 - let indexed_at_bytes = self.indexed_at.to_be_bytes(); 49 - let indexed_at_more_bytes = self.indexed_at_more.to_be_bytes(); 50 - u64::from_be_bytes([ 51 - indexed_at_bytes[0], 52 - indexed_at_bytes[1], 53 - indexed_at_bytes[2], 54 - indexed_at_bytes[3], 55 - indexed_at_more_bytes[0], 56 - indexed_at_more_bytes[1], 57 - indexed_at_more_bytes[2], 58 - indexed_at_more_bytes[3], 59 - ]) 60 - } 61 - } 62 20 } 63 21 64 22 pub async fn feed_content_insert( ··· 68 26 let mut tx = pool.begin().await.context("failed to begin transaction")?; 69 27 70 28 let now = Utc::now(); 71 - sqlx::query("INSERT OR REPLACE INTO feed_content (feed_id, uri, indexed_at, indexed_at_more, cid, updated_at) VALUES (?, ?, ?, ?, ?, ?)") 29 + sqlx::query("INSERT OR REPLACE INTO feed_content (feed_id, uri, indexed_at, cid, updated_at) VALUES (?, ?, ?, ?, ?)") 72 30 .bind(&feed_content.feed_id) 73 31 .bind(&feed_content.uri) 74 32 .bind(feed_content.indexed_at) 75 - .bind(feed_content.indexed_at_more) 76 33 .bind(&feed_content.cid) 77 34 .bind(now) 78 35 .execute(tx.as_mut()) ··· 85 42 pool: &StoragePool, 86 43 feed_uri: &str, 87 44 limit: Option<u16>, 88 - cursor: Option<(u64, u32, u32, String)>, 45 + cursor: Option<(i64, String)>, 89 46 ) -> Result<Vec<FeedContent>> { 90 47 let mut tx = pool.begin().await.context("failed to begin transaction")?; 91 48 92 49 let limit = limit.unwrap_or(20).clamp(1, 100); 93 50 94 - let results = if let Some((_time_us, indexed_at, indexed_at_more, cid)) = cursor { 95 - let query = "SELECT * FROM feed_content WHERE feed_id = ? AND (indexed_at, indexed_at_more, cid) < (?, ?, ?) ORDER BY indexed_at DESC, indexed_at_more DESC, cid DESC LIMIT ?"; 51 + let results = if let Some((indexed_at, cid)) = cursor { 52 + let query = "SELECT * FROM feed_content WHERE feed_id = ? AND (indexed_at, cid) < (?, ?) ORDER BY indexed_at DESC, cid DESC LIMIT ?"; 96 53 97 54 sqlx::query_as::<_, FeedContent>(query) 98 55 .bind(feed_uri) 99 56 .bind(indexed_at) 100 - .bind(indexed_at_more) 101 57 .bind(cid) 102 58 .bind(limit) 103 59 .fetch_all(tx.as_mut()) 104 60 .await? 105 61 } else { 106 - let query = "SELECT * FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC, indexed_at_more DESC, cid DESC LIMIT ?"; 62 + let query = "SELECT * FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC, cid DESC LIMIT ?"; 107 63 108 64 sqlx::query_as::<_, FeedContent>(query) 109 65 .bind(feed_uri) ··· 117 73 Ok(results) 118 74 } 119 75 120 - pub async fn consumer_control_insert( 121 - pool: &StoragePool, 122 - source: &str, 123 - time_us: &str, 124 - ) -> Result<()> { 76 + pub async fn consumer_control_insert(pool: &StoragePool, source: &str, time_us: i64) -> Result<()> { 125 77 let mut tx = pool.begin().await.context("failed to begin transaction")?; 126 78 127 79 let now = Utc::now(); ··· 137 89 tx.commit().await.context("failed to commit transaction") 138 90 } 139 91 140 - pub async fn consumer_control_get(pool: &StoragePool, source: &str) -> Result<Option<u64>> { 92 + pub async fn consumer_control_get(pool: &StoragePool, source: &str) -> Result<Option<i64>> { 141 93 let mut tx = pool.begin().await.context("failed to begin transaction")?; 142 94 143 95 let result = 144 - sqlx::query_scalar::<_, String>("SELECT time_us FROM consumer_control WHERE source = ?") 96 + sqlx::query_scalar::<_, i64>("SELECT time_us FROM consumer_control WHERE source = ?") 145 97 .bind(source) 146 98 .fetch_optional(tx.as_mut()) 147 99 .await ··· 149 101 150 102 tx.commit().await.context("failed to commit transaction")?; 151 103 152 - Ok(result.and_then(|value| value.parse::<u64>().ok())) 104 + Ok(result) 153 105 } 154 106 155 107 pub async fn verifcation_method_insert( ··· 226 178 227 179 #[sqlx::test] 228 180 async fn record_feed_content(pool: SqlitePool) -> sqlx::Result<()> { 229 - let record = super::model::FeedContent::new( 230 - "feed".to_string(), 231 - "at://did:plc:qadlgs4xioohnhi2jg54mqds/app.bsky.feed.post/3la3bqjg4hx2n".to_string(), 232 - 1730673934229172_u64, 233 - "bafyreih74qdc6zskq7yarqi3xm634vnubf4g3ac5ieegbvakprxpjnsj74".to_string(), 234 - ); 181 + let record = super::model::FeedContent { 182 + feed_id: "feed".to_string(), 183 + uri: "at://did:plc:qadlgs4xioohnhi2jg54mqds/app.bsky.feed.post/3la3bqjg4hx2n" 184 + .to_string(), 185 + indexed_at: 1730673934229172_i64, 186 + cid: "bafyreih74qdc6zskq7yarqi3xm634vnubf4g3ac5ieegbvakprxpjnsj74".to_string(), 187 + }; 235 188 super::feed_content_insert(&pool, &record) 236 189 .await 237 190 .expect("failed to insert record"); ··· 246 199 records[0].uri, 247 200 "at://did:plc:qadlgs4xioohnhi2jg54mqds/app.bsky.feed.post/3la3bqjg4hx2n" 248 201 ); 249 - assert_eq!(records[0].time_us(), 1730673934229172_u64); 202 + assert_eq!(records[0].indexed_at, 1730673934229172_i64); 250 203 251 204 Ok(()) 252 205 } 253 206 254 207 #[sqlx::test] 255 208 async fn consumer_control(pool: SqlitePool) -> sqlx::Result<()> { 256 - super::consumer_control_insert(&pool, "foo", "1730673934229172") 209 + super::consumer_control_insert(&pool, "foo", 1730673934229172_i64) 257 210 .await 258 211 .expect("failed to insert record"); 259 212 ··· 261 214 super::consumer_control_get(&pool, "foo") 262 215 .await 263 216 .expect("failed to get record"), 264 - Some(1730673934229172_u64) 217 + Some(1730673934229172_i64) 265 218 ); 266 219 267 - super::consumer_control_insert(&pool, "foo", "1730673934229173") 220 + super::consumer_control_insert(&pool, "foo", 1730673934229173_i64) 268 221 .await 269 222 .expect("failed to insert record"); 270 223 ··· 272 225 super::consumer_control_get(&pool, "foo") 273 226 .await 274 227 .expect("failed to get record"), 275 - Some(1730673934229173_u64) 228 + Some(1730673934229173_i64) 276 229 ); 277 230 278 231 Ok(())