···11-use std::sync::Arc;
22-31use super::*;
42use futures::{SinkExt as _, StreamExt as _};
53use poem::IntoResponse;
···306304 Data(FjallState { fjall, .. }): Data<&FjallState>,
307305) -> poem::Result<impl IntoResponse> {
308306 use poem::web::websocket::Message;
309309- use tokio::sync::Notify;
310307311308 let db = fjall.clone();
312309···358355 };
359356360357 Ok(ws.on_upgrade(move |mut socket| async move {
361361- let errored = Arc::new(Notify::new());
362362-363358 loop {
364359 let (tx, mut op_rx) = tokio::sync::mpsc::channel(64);
365360366366- tokio::task::spawn_blocking({
361361+ let read = tokio::task::spawn_blocking({
367362 let db = db.clone();
368368- let errored = errored.clone();
369369- move || {
370370- let iter = match db.export_ops(cursor..) {
371371- Ok(it) => it,
372372- Err(e) => {
373373- log::error!("read failed: {e}");
374374- errored.notify_one();
375375- return;
376376- }
377377- };
378378- for op in iter.flatten() {
363363+ move || -> anyhow::Result<()> {
364364+ for op in db.export_ops(cursor..)?.flatten() {
379365 if tx.blocking_send(op).is_err() {
380380- return;
366366+ return Ok(());
381367 }
382368 }
369369+ Ok(())
383370 }
384371 });
385372···389376 log::warn!("closing export stream: {e}");
390377 return;
391378 }
392392- cursor = op.seq;
379379+ cursor = op.seq + 1;
393380 }
394381395395- tokio::select! {
396396- _ = db.subscribe() => {},
397397- _ = errored.notified() => return,
382382+ match read.await {
383383+ Ok(Err(e)) => {
384384+ log::error!("stream read failed: {e}");
385385+ return;
386386+ }
387387+ Err(e) => {
388388+ log::error!("stream read task panicked: {e}");
389389+ return;
390390+ }
391391+ Ok(Ok(())) => {}
398392 }
393393+394394+ db.subscribe().await;
399395 }
400396 }))
401397}
+1
src/plc_fjall.rs
···841841}
842842843843impl Op {
844844+ // todo: we should probably just have this in Op tbh as a `r#type: SequencedOpType` or something
844845 /// adds the `type` field to the op
845846 pub fn to_sequenced_json(&self) -> serde_json::Value {
846847 let mut val = serde_json::to_value(self).expect("Op is serializable");