Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

plc_rocksdb: wrap operation data in gzip

geesawra 934d915a 20716773

+54 -2
+54 -2
src/plc_rocksdb.rs
··· 1 + use async_compression::tokio::write::{GzipDecoder, GzipEncoder}; 1 2 use rocksdb::{ColumnFamilyDescriptor, Options}; 3 + use serde::{Deserialize, Serialize}; 2 4 use std::{fmt::Display, ops::Add, path::PathBuf, sync::Arc, time::Instant}; 3 - use tokio::sync::{Mutex, mpsc, oneshot}; 5 + use tokio::{ 6 + io::{AsyncWrite, AsyncWriteExt}, 7 + sync::{Mutex, mpsc, oneshot}, 8 + }; 4 9 5 10 use crate::{ 6 11 Dt, ExportPage, Op, PageBoundaryState, ··· 45 50 async fn insert_op(&self, db: &rocksdb::DB, op: &Op) -> Result<(), rocksdb::Error> { 46 51 let cf = db.cf_handle("ops").unwrap(); 47 52 let id = self.store_did(db, op.did.clone()).await?; 48 - db.put_cf(&cf, op_key(&op, id), serde_json::to_vec(&op).unwrap())?; 53 + db.put_cf( 54 + &cf, 55 + op_key(&op, id), 56 + serde_json::to_vec(&RocksOp::from_op(&op).await).unwrap(), 57 + )?; 49 58 self.set_latest_timestamp(&db, op.created_at).await?; 50 59 51 60 Ok(()) ··· 106 115 let op: Op = serde_json::from_slice(&v).unwrap(); 107 116 op 108 117 })) 118 + } 119 + } 120 + 121 + #[derive(Serialize, Deserialize)] 122 + struct RocksOp { 123 + pub did: String, 124 + pub cid: String, 125 + pub created_at: Dt, 126 + pub nullified: bool, 127 + pub operation: Vec<u8>, 128 + } 129 + 130 + impl RocksOp { 131 + async fn from_op(op: &Op) -> Self { 132 + let op_str = op.operation.to_string().into_bytes(); 133 + let mut gz = GzipEncoder::new(op_str); 134 + let op_gz = vec![]; 135 + gz.write_all(&op_gz).await.unwrap(); 136 + gz.flush().await.unwrap(); 137 + 138 + Self { 139 + operation: op_gz, 140 + did: op.did.clone(), 141 + cid: op.cid.clone(), 142 + created_at: op.created_at, 143 + nullified: op.nullified, 144 + } 145 + } 146 + 147 + async fn to_op(&self) -> Op { 148 + let mut dec = GzipDecoder::new(self.operation.clone()); 149 + let dec_op = vec![]; 150 + dec.write_all(&dec_op).await.unwrap(); 151 + dec.flush().await.unwrap(); 152 + let dec_op = String::from_utf8(dec_op).unwrap(); 153 + 154 + Op { 155 + did: self.did.clone(), 156 + cid: self.cid.clone(), 157 + created_at: self.created_at, 158 + nullified: self.nullified, 159 + operation: serde_json::value::RawValue::from_string(dec_op).unwrap(), 160 + } 109 161 } 110 162 } 111 163