Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

*: an incredibly sad attempt at adding rocksdb support

geesawra d9b0ecb9 d66bb7f3

+417 -58
+77
Cargo.lock
··· 32 32 dependencies = [ 33 33 "anyhow", 34 34 "async-compression", 35 + "bytevec", 35 36 "chrono", 36 37 "clap", 37 38 "futures", ··· 44 45 "reqwest", 45 46 "reqwest-middleware", 46 47 "reqwest-retry", 48 + "rocksdb", 47 49 "rustls", 48 50 "serde", 49 51 "serde_json", ··· 53 55 "tokio-stream", 54 56 "tokio-util", 55 57 "tracing-subscriber", 58 + "trait-variant", 56 59 ] 57 60 58 61 [[package]] ··· 340 343 version = "1.10.1" 341 344 source = "registry+https://github.com/rust-lang/crates.io-index" 342 345 checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 346 + 347 + [[package]] 348 + name = "bytevec" 349 + version = "0.2.0" 350 + source = "registry+https://github.com/rust-lang/crates.io-index" 351 + checksum = "0d1ee6ad2d80504e769c077bd18fc6b11409518c3f20e4918a5c286d693c2b43" 352 + 353 + [[package]] 354 + name = "bzip2-sys" 355 + version = "0.1.13+1.0.8" 356 + source = "registry+https://github.com/rust-lang/crates.io-index" 357 + checksum = "225bff33b2141874fe80d71e07d6eec4f85c5c216453dd96388240f96e1acc14" 358 + dependencies = [ 359 + "cc", 360 + "pkg-config", 361 + ] 343 362 344 363 [[package]] 345 364 name = "cc" ··· 1331 1350 ] 1332 1351 1333 1352 [[package]] 1353 + name = "librocksdb-sys" 1354 + version = "0.17.3+10.4.2" 1355 + source = "registry+https://github.com/rust-lang/crates.io-index" 1356 + checksum = "cef2a00ee60fe526157c9023edab23943fae1ce2ab6f4abb2a807c1746835de9" 1357 + dependencies = [ 1358 + "bindgen", 1359 + "bzip2-sys", 1360 + "cc", 1361 + "libc", 1362 + "libz-sys", 1363 + "lz4-sys", 1364 + "zstd-sys", 1365 + ] 1366 + 1367 + [[package]] 1368 + name = "libz-sys" 1369 + version = "1.1.22" 1370 + source = "registry+https://github.com/rust-lang/crates.io-index" 1371 + checksum = "8b70e7a7df205e92a1a4cd9aaae7898dac0aa555503cc0a649494d0d60e7651d" 1372 + dependencies = [ 1373 + "cc", 1374 + "pkg-config", 1375 + "vcpkg", 1376 + ] 1377 + 1378 + [[package]] 1334 1379 name = "linux-raw-sys" 1335 1380 version = "0.11.0" 1336 1381 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1363 1408 version = "0.1.2" 1364 1409 source = "registry+https://github.com/rust-lang/crates.io-index" 1365 1410 checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" 1411 + 1412 + [[package]] 1413 + name = "lz4-sys" 1414 + version = "1.11.1+lz4-1.10.0" 1415 + source = "registry+https://github.com/rust-lang/crates.io-index" 1416 + checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" 1417 + dependencies = [ 1418 + "cc", 1419 + "libc", 1420 + ] 1366 1421 1367 1422 [[package]] 1368 1423 name = "matchers" ··· 2169 2224 ] 2170 2225 2171 2226 [[package]] 2227 + name = "rocksdb" 2228 + version = "0.24.0" 2229 + source = "registry+https://github.com/rust-lang/crates.io-index" 2230 + checksum = "ddb7af00d2b17dbd07d82c0063e25411959748ff03e8d4f96134c2ff41fce34f" 2231 + dependencies = [ 2232 + "libc", 2233 + "librocksdb-sys", 2234 + "serde", 2235 + ] 2236 + 2237 + [[package]] 2172 2238 name = "rustc-demangle" 2173 2239 version = "0.1.26" 2174 2240 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2906 2972 "tracing", 2907 2973 "tracing-core", 2908 2974 "tracing-log", 2975 + ] 2976 + 2977 + [[package]] 2978 + name = "trait-variant" 2979 + version = "0.1.2" 2980 + source = "registry+https://github.com/rust-lang/crates.io-index" 2981 + checksum = "70977707304198400eb4835a78f6a9f928bf41bba420deb8fdb175cd965d77a7" 2982 + dependencies = [ 2983 + "proc-macro2", 2984 + "quote", 2985 + "syn", 2909 2986 ] 2910 2987 2911 2988 [[package]]
+3
Cargo.toml
··· 9 9 [dependencies] 10 10 anyhow = "1.0.99" 11 11 async-compression = { version = "0.4.30", features = ["futures-io", "tokio", "gzip"] } 12 + bytevec = "0.2.0" 12 13 chrono = { version = "0.4.42", features = ["serde"] } 13 14 clap = { version = "4.5.47", features = ["derive", "env"] } 14 15 futures = "0.3.31" ··· 21 22 reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] } 22 23 reqwest-middleware = "0.4.2" 23 24 reqwest-retry = "0.7.0" 25 + rocksdb = { version = "0.24.0", features = ["multi-threaded-cf", "serde"] } 24 26 rustls = "0.23.32" 25 27 serde = "1.0.219" 26 28 serde_json = { version = "1.0.143", features = ["raw_value"] } ··· 30 32 tokio-stream = { version = "0.1.17", features = ["io-util"] } 31 33 tokio-util = { version = "0.7.16", features = ["compat"] } 32 34 tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } 35 + trait-variant = "0.1.2"
+31 -3
src/bin/backfill.rs
··· 1 1 use allegedly::{ 2 - Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs, 3 - bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream, 2 + Db, Dt, ExportPage, FolderSource, HttpSource, RocksDatastore, backfill, backfill_to_pg, 3 + backfill_to_rocksdb, bin::GlobalArgs, bin_init, datastore::Datastore, full_pages, 4 + pages_to_stdout, poll_upstream, 4 5 }; 6 + use anyhow::anyhow; 5 7 use clap::Parser; 6 8 use reqwest::Url; 7 9 use std::{path::PathBuf, time::Duration}; ··· 44 46 /// only used if `--to-postgres` is present 45 47 #[arg(long, action)] 46 48 postgres_reset: bool, 49 + /// Path to the database for RocksDB. 50 + /// Will be created if it doesn't exist. 51 + /// Cannot be used alongside any postgres-related flag. 52 + #[arg(long, action)] 53 + rocksdb_db_path: Option<PathBuf>, 47 54 /// Stop at the week ending before this date 48 55 #[arg(long)] 49 56 until: Option<Dt>, ··· 67 74 postgres_reset, 68 75 until, 69 76 catch_up, 77 + rocksdb_db_path, 70 78 }: Args, 71 79 ) -> anyhow::Result<()> { 72 80 let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new(); ··· 154 162 found_last_tx, 155 163 )); 156 164 if catch_up { 157 - tasks.spawn(pages_to_pg(db, full_out)); 165 + tasks.spawn(async move { 166 + let mut db_conn = db.datastore().await?; 167 + match db_conn.observe_pages(full_out).await { 168 + Ok(_) => Ok("observe_pages"), 169 + Err(e) => Err(anyhow!(e)), 170 + } 171 + }); 172 + } 173 + } else if let Some(rocksdb_path) = rocksdb_db_path { 174 + log::trace!("opening rocksdb database..."); 175 + let mut db = RocksDatastore::new(rocksdb_path)?; 176 + log::trace!("opened!"); 177 + 178 + tasks.spawn(backfill_to_rocksdb(db.clone(), bulk_out, found_last_tx)); 179 + if catch_up { 180 + tasks.spawn(async move { 181 + match db.observe_pages(full_out).await { 182 + Ok(_) => Ok("observe_pages"), 183 + Err(e) => Err(anyhow!(e)), 184 + } 185 + }); 158 186 } 159 187 } else { 160 188 tasks.spawn(pages_to_stdout(bulk_out, found_last_tx));
+13 -3
src/bin/mirror.rs
··· 1 1 use allegedly::{ 2 - Db, ExperimentalConf, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve, 2 + Db, ExperimentalConf, ListenConf, bin::GlobalArgs, bin_init, datastore::Datastore, 3 + poll_upstream, serve, 3 4 }; 5 + use anyhow::anyhow; 4 6 use clap::Parser; 5 7 use reqwest::Url; 6 8 use std::{net::SocketAddr, path::PathBuf, time::Duration}; ··· 108 110 let wrap_pg = wrap_pg.ok_or(anyhow::anyhow!( 109 111 "a wrapped reference postgres must be provided to sync" 110 112 ))?; 111 - let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 113 + let db = Db::new(wrap_pg.clone().as_str(), wrap_pg_cert.clone()).await?; 112 114 113 115 // TODO: allow starting up with polling backfill from beginning? 114 116 log::debug!("getting the latest op from the db..."); ··· 124 126 let throttle = Duration::from_millis(upstream_throttle_ms); 125 127 126 128 tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 127 - tasks.spawn(pages_to_pg(db.clone(), recv_page)); 129 + 130 + tasks.spawn(async move { 131 + let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 132 + let mut db_conn = db.datastore().await?; 133 + match db_conn.observe_pages(recv_page).await { 134 + Ok(_) => Ok("observe_pages"), 135 + Err(e) => Err(anyhow!(e)), 136 + } 137 + }); 128 138 Some(db) 129 139 } else { 130 140 None
+30
src/datastore.rs
··· 1 + use tokio::sync::mpsc; 2 + 3 + use crate::{Dt, ExportPage, Op}; 4 + 5 + #[derive(Debug)] 6 + pub enum Error { 7 + GetLatest(String), 8 + InsertOperation(String), 9 + } 10 + 11 + impl std::error::Error for Error {} 12 + 13 + impl std::fmt::Display for Error { 14 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 15 + todo!() 16 + } 17 + } 18 + 19 + pub struct InsertResult { 20 + pub op: u64, 21 + pub did: u64, 22 + } 23 + 24 + // #[allow(async_fn_in_trait)] 25 + #[trait_variant::make(SendDatastore: Send)] 26 + pub trait Datastore { 27 + async fn get_latest_timestamp(&self) -> Result<Option<Dt>, Error>; 28 + async fn insert_operation(&mut self, op: Op) -> Result<InsertResult, Error>; 29 + async fn observe_pages(&mut self, pages: mpsc::Receiver<ExportPage>) -> Result<(), Error>; 30 + }
+4 -1
src/lib.rs
··· 4 4 mod backfill; 5 5 mod cached_value; 6 6 mod client; 7 + pub mod datastore; 7 8 mod mirror; 8 9 mod plc_pg; 10 + mod plc_rocksdb; 9 11 mod poll; 10 12 mod ratelimit; 11 13 mod weekly; ··· 16 18 pub use cached_value::{CachedValue, Fetcher}; 17 19 pub use client::{CLIENT, UA}; 18 20 pub use mirror::{ExperimentalConf, ListenConf, serve}; 19 - pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 21 + pub use plc_pg::{Db, backfill_to_pg}; 22 + pub use plc_rocksdb::{RocksDatastore, backfill_to_rocksdb}; 20 23 pub use poll::{PageBoundaryState, get_page, poll_upstream}; 21 24 pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters}; 22 25 pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages};
+104 -45
src/plc_pg.rs
··· 1 - use crate::{Dt, ExportPage, PageBoundaryState}; 1 + use crate::{ 2 + Dt, ExportPage, PageBoundaryState, 3 + datastore::{self, InsertResult, SendDatastore}, 4 + }; 2 5 use native_tls::{Certificate, TlsConnector}; 3 6 use postgres_native_tls::MakeTlsConnector; 4 7 use std::path::PathBuf; ··· 109 112 drop(task); 110 113 Ok(dt) 111 114 } 115 + 116 + pub async fn datastore(&self) -> Result<impl SendDatastore, PgError> { 117 + let (client, task) = self.connect().await?; 118 + 119 + Ok(DbDatastore { client, task }) 120 + } 112 121 } 113 122 114 - pub async fn pages_to_pg( 115 - db: Db, 116 - mut pages: mpsc::Receiver<ExportPage>, 117 - ) -> anyhow::Result<&'static str> { 118 - log::info!("starting pages_to_pg writer..."); 123 + pub struct DbDatastore { 124 + client: Client, 125 + task: JoinHandle<Result<(), PgError>>, 126 + } 127 + 128 + impl SendDatastore for DbDatastore { 129 + async fn get_latest_timestamp(&self) -> Result<Option<Dt>, crate::datastore::Error> { 130 + match self 131 + .client 132 + .query_opt( 133 + r#"SELECT "createdAt" 134 + FROM operations 135 + ORDER BY "createdAt" DESC 136 + LIMIT 1"#, 137 + &[], 138 + ) 139 + .await 140 + { 141 + Ok(dt) => Ok(dt.map(|row| row.get(0))), 142 + Err(e) => Err(datastore::Error::GetLatest(e.to_string())), 143 + } 144 + } 145 + 146 + async fn insert_operation( 147 + &mut self, 148 + op: crate::Op, 149 + ) -> Result<datastore::InsertResult, crate::datastore::Error> { 150 + let ops_stmt = self 151 + .client 152 + .prepare( 153 + r#"INSERT INTO operations (did, operation, cid, nullified, "createdAt") 154 + VALUES ($1, $2, $3, $4, $5) 155 + ON CONFLICT do nothing"#, 156 + ) 157 + .await 158 + .map_err(|e| datastore::Error::InsertOperation(e.to_string()))?; 159 + let did_stmt = self 160 + .client 161 + .prepare(r#"INSERT INTO dids (did) VALUES ($1) ON CONFLICT do nothing"#) 162 + .await 163 + .map_err(|e| datastore::Error::InsertOperation(e.to_string()))?; 164 + 165 + let tx = self 166 + .client 167 + .transaction() 168 + .await 169 + .map_err(|e| datastore::Error::InsertOperation(e.to_string()))?; 170 + let op_res = tx 171 + .execute( 172 + &ops_stmt, 173 + &[ 174 + &op.did, 175 + &Json(op.operation), 176 + &op.cid, 177 + &op.nullified, 178 + &op.created_at, 179 + ], 180 + ) 181 + .await 182 + .map_err(|e| datastore::Error::InsertOperation(e.to_string()))?; 183 + let did_res = tx 184 + .execute(&did_stmt, &[&op.did]) 185 + .await 186 + .map_err(|e| datastore::Error::InsertOperation(e.to_string()))?; 187 + tx.commit() 188 + .await 189 + .map_err(|e| datastore::Error::InsertOperation(e.to_string()))?; 119 190 120 - let (mut client, task) = db.connect().await?; 191 + Ok(InsertResult { 192 + op: op_res, 193 + did: did_res, 194 + }) 195 + } 121 196 122 - let ops_stmt = client 123 - .prepare( 124 - r#"INSERT INTO operations (did, operation, cid, nullified, "createdAt") 125 - VALUES ($1, $2, $3, $4, $5) 126 - ON CONFLICT do nothing"#, 127 - ) 128 - .await?; 129 - let did_stmt = client 130 - .prepare(r#"INSERT INTO dids (did) VALUES ($1) ON CONFLICT do nothing"#) 131 - .await?; 197 + async fn observe_pages( 198 + &mut self, 199 + mut pages: mpsc::Receiver<ExportPage>, 200 + ) -> Result<(), datastore::Error> { 201 + log::info!("starting pages_to_pg writer..."); 132 202 133 - let t0 = Instant::now(); 134 - let mut ops_inserted = 0; 135 - let mut dids_inserted = 0; 203 + let t0 = Instant::now(); 204 + let mut ops_inserted = 0; 205 + let mut dids_inserted = 0; 136 206 137 - while let Some(page) = pages.recv().await { 138 - log::trace!("writing page with {} ops", page.ops.len()); 139 - let tx = client.transaction().await?; 140 - for op in page.ops { 141 - ops_inserted += tx 142 - .execute( 143 - &ops_stmt, 144 - &[ 145 - &op.did, 146 - &Json(op.operation), 147 - &op.cid, 148 - &op.nullified, 149 - &op.created_at, 150 - ], 151 - ) 152 - .await?; 153 - dids_inserted += tx.execute(&did_stmt, &[&op.did]).await?; 207 + while let Some(page) = pages.recv().await { 208 + log::trace!("writing page with {} ops", page.ops.len()); 209 + for op in page.ops { 210 + let res = SendDatastore::insert_operation(self, op).await?; 211 + ops_inserted += res.op; 212 + dids_inserted += res.did; 213 + } 154 214 } 155 - tx.commit().await?; 156 - } 157 - drop(task); 158 215 159 - log::info!( 160 - "no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}", 161 - t0.elapsed() 162 - ); 163 - Ok("pages_to_pg") 216 + log::info!( 217 + "no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}", 218 + t0.elapsed() 219 + ); 220 + 221 + Ok(()) 222 + } 164 223 } 165 224 166 225 /// Dump rows into an empty operations table quickly
+126
src/plc_rocksdb.rs
··· 1 + use std::{path::PathBuf, sync::Arc, time::Instant}; 2 + use tokio::sync::{Mutex, mpsc, oneshot}; 3 + 4 + use crate::{ 5 + Dt, ExportPage, Op, PageBoundaryState, 6 + datastore::{Error, InsertResult, SendDatastore}, 7 + }; 8 + 9 + #[derive(Clone)] 10 + pub struct RocksDatastore { 11 + db: Arc<Mutex<rocksdb::DB>>, 12 + } 13 + 14 + impl RocksDatastore { 15 + pub fn new(path: PathBuf) -> Result<Self, rocksdb::Error> { 16 + let rdb = rocksdb::DB::open_default(path)?; 17 + 18 + Ok(RocksDatastore { 19 + db: Arc::new(Mutex::new(rdb)), 20 + }) 21 + } 22 + 23 + async fn latest_timestamp(&self) -> Result<Option<Dt>, rocksdb::Error> { 24 + let db = self.db.lock().await; 25 + let res = db.get(LATEST_TIMESTAMP_KEY)?; 26 + 27 + match res { 28 + None => Ok(None), 29 + Some(ts) => { 30 + let ts_str = String::from_utf8(ts).unwrap(); 31 + Ok(Dt::from_timestamp_micros(ts_str.parse().unwrap())) 32 + } 33 + } 34 + } 35 + 36 + async fn set_latest_timestamp(&self, ts: Dt) -> Result<(), rocksdb::Error> { 37 + let db = self.db.lock().await; 38 + db.put(LATEST_TIMESTAMP_KEY, ts.timestamp_micros().to_string()) 39 + .map(|_| Ok(()))? 40 + } 41 + 42 + async fn insert_op(&self, op: &Op) -> Result<(), rocksdb::Error> { 43 + { 44 + let db = self.db.lock().await; 45 + db.put(op_key(&op), serde_json::to_vec(&op).unwrap())?; 46 + } 47 + 48 + self.set_latest_timestamp(op.created_at).await?; 49 + 50 + Ok(()) 51 + } 52 + } 53 + 54 + fn op_key(op: &Op) -> String { 55 + format!("{}_{}", op.did, op.cid) 56 + } 57 + 58 + const LATEST_TIMESTAMP_KEY: &str = "latest_timestamp"; 59 + const OPERATIONS_KEY: &str = "operations"; 60 + const DIDS_KEY: &str = "dids"; 61 + 62 + impl SendDatastore for RocksDatastore { 63 + async fn get_latest_timestamp(&self) -> Result<Option<Dt>, Error> { 64 + self.latest_timestamp() 65 + .await 66 + .map_err(|e| Error::GetLatest(e.to_string())) 67 + } 68 + 69 + async fn insert_operation(&mut self, op: Op) -> Result<InsertResult, Error> { 70 + self.insert_op(&op) 71 + .await 72 + .map_err(|e| Error::InsertOperation(e.to_string())) 73 + .map(|_| InsertResult { op: 1, did: 1 }) 74 + } 75 + 76 + async fn observe_pages(&mut self, mut pages: mpsc::Receiver<ExportPage>) -> Result<(), Error> { 77 + log::info!("starting pages_to_rocksdb writer..."); 78 + 79 + let t0 = Instant::now(); 80 + let mut ops_inserted = 0; 81 + let mut dids_inserted = 0; 82 + 83 + while let Some(page) = pages.recv().await { 84 + log::trace!("writing page with {} ops", page.ops.len()); 85 + for op in page.ops { 86 + let res = self.insert_operation(op).await?; 87 + ops_inserted += res.op; 88 + dids_inserted += res.did; 89 + } 90 + } 91 + 92 + log::info!( 93 + "no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}", 94 + t0.elapsed() 95 + ); 96 + 97 + Ok(()) 98 + } 99 + } 100 + 101 + pub async fn backfill_to_rocksdb( 102 + db: RocksDatastore, 103 + mut pages: mpsc::Receiver<ExportPage>, 104 + notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 105 + ) -> anyhow::Result<&'static str> { 106 + let mut last_at = None; 107 + while let Some(page) = pages.recv().await { 108 + for op in &page.ops { 109 + db.insert_op(op).await?; 110 + } 111 + if notify_last_at.is_some() 112 + && let Some(s) = PageBoundaryState::new(&page) 113 + { 114 + last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 115 + } 116 + } 117 + log::debug!("finished receiving bulk pages"); 118 + 119 + if let Some(notify) = notify_last_at { 120 + log::trace!("notifying last_at: {last_at:?}"); 121 + if notify.send(last_at).is_err() { 122 + log::error!("receiver for last_at dropped, can't notify"); 123 + }; 124 + } 125 + Ok("backfill_to_rocksdb") 126 + }
+29 -6
src/weekly.rs
··· 200 200 let reader = source 201 201 .reader_for(week) 202 202 .await 203 - .inspect_err(|e| log::error!("week_to_pages reader failed: {e}"))?; 203 + .inspect_err(|e| log::error!("week_to_pages reader failed: {e}")); 204 + let reader = match reader { 205 + Ok(r) => Ok(r), 206 + Err(e) => { 207 + if e.to_string().contains("404") { 208 + // i hate anyhow 209 + return Ok(()); 210 + } 211 + 212 + Err(e) 213 + } 214 + }?; 215 + 204 216 let decoder = GzipDecoder::new(BufReader::new(reader)); 205 217 let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000)); 206 218 207 - while let Some(chunk) = chunks 208 - .try_next() 209 - .await 210 - .inspect_err(|e| log::error!("failed to get next chunk: {e}"))? 211 - { 219 + loop { 220 + let chunk = chunks.try_next().await; 221 + 222 + let chunk = match chunk { 223 + Ok(c) => c, 224 + Err(e) => match e.1.kind() { 225 + std::io::ErrorKind::UnexpectedEof => break, 226 + _ => return Err(e)?, 227 + }, 228 + }; 229 + 230 + let chunk = match chunk { 231 + Some(c) => c, 232 + None => break, 233 + }; 234 + 212 235 let ops: Vec<Op> = chunk 213 236 .into_iter() 214 237 .filter_map(|s| {