···2626 let dest = dest.clone();
2727 let source = source.clone();
2828 workers.spawn(async move {
2929- log::info!("about to get weeks...");
2929+ log::trace!("about to get weeks...");
30303131 while let Some(week) = weeks.lock().await.pop() {
3232- log::info!(
3232+ log::trace!(
3333 "worker {w}: fetching week {} (-{})",
3434 Into::<Dt>::into(week).to_rfc3339(),
3535 week.n_ago(),
3636 );
3737 week_to_pages(source.clone(), week, dest.clone()).await?;
3838- log::info!("done a week");
3838+ log::trace!("week {}", Into::<Dt>::into(week).to_rfc3339());
3939 }
4040 log::info!("done with the weeks ig");
4141 Ok(())
+7-1
src/bin/allegedly.rs
···3737 /// Pass a postgres connection url like "postgresql://localhost:5432"
3838 #[arg(long)]
3939 to_postgres: Option<Url>,
4040+ /// Delete all operations from the postgres db before starting
4141+ ///
4242+ /// only used if `--to-postgres` is present
4343+ #[arg(long, action)]
4444+ postgres_reset: bool,
4045 /// Stop at the week ending before this date
4146 #[arg(long)]
4247 until: Option<Dt>,
···9196 dir,
9297 source_workers,
9398 to_postgres,
9999+ postgres_reset,
94100 until,
95101 } => {
96102 let (tx, rx) = flume::bounded(32); // big pages
···109115 });
110116 if let Some(url) = to_postgres {
111117 let db = Db::new(url.as_str()).await.unwrap();
112112- pages_to_pg(db, rx).await.unwrap();
118118+ pages_to_pg(db, rx, postgres_reset).await.unwrap();
113119 } else {
114120 pages_to_stdout(rx).await.unwrap();
115121 }
+80-55
src/plc_pg.rs
···7171 }
7272}
73737474-pub async fn write_bulk(db: Db, pages: flume::Receiver<ExportPage>) -> Result<(), PgError> {
7474+/// Dump rows into an empty operations table quickly
7575+///
7676+/// you must run this after initializing the db with kysely migrations from the
7777+/// typescript app, but before inserting any content.
7878+///
7979+/// it's an invasive process: it will drop the indexes that kysely created (and
8080+/// restore them after) in order to get the backfill in as quickly as possible.
8181+///
8282+/// fails: if the backfill data violates the primary key constraint (unique did*cid)
8383+///
8484+/// panics: if the operations or dids tables are not empty, unless reset is true
8585+///
8686+/// recommended postgres setting: `max_wal_size=4GB` (or more)
8787+pub async fn write_bulk(
8888+ db: Db,
8989+ pages: flume::Receiver<ExportPage>,
9090+ reset: bool,
9191+) -> Result<(), PgError> {
7592 let mut client = db.connect().await?;
76937777- // TODO: maybe we want to be more cautious
7878- client
7979- .execute(
8080- r#"
8181- DROP TABLE IF EXISTS allegedly_backfill"#,
8282- &[],
8383- )
9494+ let t0 = Instant::now();
9595+ let tx = client.transaction().await?;
9696+9797+ let t_step = Instant::now();
9898+ for table in ["operations", "dids"] {
9999+ if reset {
100100+ let n = tx.execute(&format!("DELETE FROM {table}"), &[]).await?;
101101+ if n > 0 {
102102+ log::warn!("postgres reset: deleted {n} from {table}");
103103+ }
104104+ } else {
105105+ let n: i64 = tx
106106+ .query_one(&format!("SELECT count(*) FROM {table}"), &[])
107107+ .await?
108108+ .get(0);
109109+ if n > 0 {
110110+ panic!("postgres: {table} table was not empty and `reset` not requested");
111111+ }
112112+ }
113113+ }
114114+ log::trace!("tables clean: {:?}", t_step.elapsed());
115115+116116+ let t_step = Instant::now();
117117+ tx.execute("ALTER TABLE operations SET UNLOGGED", &[])
84118 .await?;
119119+ tx.execute("ALTER TABLE dids SET UNLOGGED", &[]).await?;
120120+ log::trace!("set tables unlogged: {:?}", t_step.elapsed());
851218686- let tx = client.transaction().await?;
122122+ let t_step = Instant::now();
123123+ tx.execute(r#"DROP INDEX "operations_createdAt_index""#, &[])
124124+ .await?;
125125+ tx.execute("DROP INDEX operations_did_createdat_idx", &[])
126126+ .await?;
127127+ log::trace!("indexes dropped: {:?}", t_step.elapsed());
871288888- tx.execute(
8989- r#"
9090- CREATE UNLOGGED TABLE allegedly_backfill (
9191- did text not null,
9292- cid text not null,
9393- operation jsonb not null,
9494- nullified boolean not null,
9595- "createdAt" timestamptz not null
9696- )"#,
9797- &[],
9898- )
9999- .await?;
100100-129129+ let t_step = Instant::now();
130130+ log::trace!("starting binary COPY IN...");
101131 let types = &[
102102- Type::TEXT,
103132 Type::TEXT,
104133 Type::JSONB,
134134+ Type::TEXT,
105135 Type::BOOL,
106136 Type::TIMESTAMPTZ,
107137 ];
108108- let t0 = Instant::now();
109109-110138 let sync = tx
111111- .copy_in("COPY allegedly_backfill FROM STDIN BINARY")
139139+ .copy_in(
140140+ r#"COPY operations (did, operation, cid, nullified, "createdAt") FROM STDIN BINARY"#,
141141+ )
112142 .await?;
113143 let mut writer = pin!(BinaryCopyInWriter::new(sync, types));
114114-115144 while let Ok(page) = pages.recv_async().await {
116145 for s in page.ops {
117146 let Ok(op) = serde_json::from_str::<Op>(&s) else {
···122151 .as_mut()
123152 .write(&[
124153 &op.did,
125125- &op.cid,
126154 &Json(op.operation),
155155+ &op.cid,
127156 &op.nullified,
128157 &op.created_at,
129158 ])
130159 .await?;
131160 }
132161 }
133133-134162 let n = writer.as_mut().finish().await?;
135135- log::info!("copied in {n} rows");
163163+ log::trace!("COPY IN wrote {n} ops: {:?}", t_step.elapsed());
136164137137- tx.commit().await?;
138138- log::info!("copy in time: {:?}", t0.elapsed());
165165+ // CAUTION: these indexes MUST match up exactly with the kysely ones we dropped
166166+ let t_step = Instant::now();
167167+ tx.execute(
168168+ r#"CREATE INDEX operations_did_createdat_idx ON operations (did, "createdAt")"#,
169169+ &[],
170170+ )
171171+ .await?;
172172+ tx.execute(
173173+ r#"CREATE INDEX "operations_createdAt_index" ON operations ("createdAt")"#,
174174+ &[],
175175+ )
176176+ .await?;
177177+ log::trace!("indexes recreated: {:?}", t_step.elapsed());
139178140140- log::info!("copying dids into plc table...");
141141- let n = client
179179+ let t_step = Instant::now();
180180+ let n = tx
142181 .execute(
143143- r#"
144144- INSERT INTO dids
145145- SELECT distinct did FROM allegedly_backfill
146146- ON CONFLICT do nothing"#,
182182+ r#"INSERT INTO dids SELECT distinct did FROM operations"#,
147183 &[],
148184 )
149185 .await?;
150150- log::info!("{n} inserted; elapsed: {:?}", t0.elapsed());
186186+ log::trace!("INSERT wrote {n} dids: {:?}", t_step.elapsed());
151187152152- log::info!("copying ops into plc table...");
153153- let n = client
154154- .execute(
155155- r#"
156156- INSERT INTO operations (did, cid, operation, nullified, "createdAt")
157157- SELECT did, cid, operation, nullified, "createdAt" FROM allegedly_backfill
158158- ON CONFLICT do nothing"#,
159159- &[],
160160- )
161161- .await?;
162162- log::info!("{n} inserted; elapsed: {:?}", t0.elapsed());
188188+ let t_step = Instant::now();
189189+ tx.execute("ALTER TABLE dids SET LOGGED", &[]).await?;
190190+ tx.execute("ALTER TABLE operations SET LOGGED", &[]).await?;
191191+ log::trace!("set tables LOGGED: {:?}", t_step.elapsed());
163192164164- log::info!("clean up backfill table...");
165165- client
166166- .execute(r#"DROP TABLE allegedly_backfill"#, &[])
167167- .await?;
168168-193193+ tx.commit().await?;
169194 log::info!("total backfill time: {:?}", t0.elapsed());
170195171196 Ok(())