Very (very) fast point-in-polyon queries, with Parquet ingestion
0
fast_point_in_polygon.rs
326 lines 13 kB view raw
1//! The reusable piece is [`MultiPolygonIndex`]: hand it a slice 2//! of `MultiPolygon<f64>` and it lets you ask which polygons contain a given 3//! `Point<f64>`. It combines an `rstar::RTree` of bounding rectangles 4//! (coarse candidate filter) with one `IntervalTreeMultiPolygon` per polygon 5//! (the actual point-in-polygon predicate). Build once; query many times 6//! from any number of threads: `MultiPolygonIndex` is `Send + Sync`. 7//! 8//! The rest of the file is the parquet driver: it loads 33k zipcode polygons 9//! from a GeoParquet file, builds the index, then streams ~130M building 10//! centroids from a plain Parquet file (one row group per rayon worker) and 11//! aggregates hits into a `Vec<AtomicU64>`. 12//! 13//! Assumes `data/us-zip-codes.parquet` and 14//! `data/microsoft-buildings_point.parquet` already exist. 15//! 16//! Run with `cargo run --release --example zipcode_join`. 17//! To give you an idea of what is meant by "fast" in this context: the calculation runs in around 4.3 s 18 19use std::fs::File; 20use std::sync::Arc; 21use std::sync::atomic::{AtomicU64, Ordering}; 22use std::time::Instant; 23 24use anyhow::Result; 25use arrow_array::{Array, BinaryArray, RecordBatch, StringArray}; 26use geo::algorithm::indexed::IntervalTreeMultiPolygon; 27use geo::algorithm::{BoundingRect, Contains}; 28use geo_traits::to_geo::ToGeoMultiPolygon; 29use geo_types::{MultiPolygon, Point, Rect}; 30use geoarrow::array::{AsGeoArrowArray, GeoArrowArray, GeoArrowArrayAccessor, from_arrow_array}; 31use geoparquet::reader::{GeoParquetReaderBuilder, GeoParquetRecordBatchReader}; 32use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; 33use rayon::prelude::*; 34use rstar::primitives::{GeomWithData, Rectangle}; 35use rstar::{AABB, RTree}; 36 37// ============================================================================= 38// Reusable spatial index 39// ============================================================================= 40 41/// Spatial index for repeated point-in-polygon queries over many 42/// `MultiPolygon`s. 43/// 44/// Built from a slice of polygons; subsequent queries return the *position* 45/// of each containing polygon in that slice. Threads can share one 46/// `Arc<MultiPolygonIndex>`: both inner structures are `Send + Sync`. 47pub struct MultiPolygonIndex { 48 // rstar's bare `AABB` doesn't implement `RTreeObject`; `Rectangle` does. 49 // `GeomWithData` pairs each rectangle with its polygon's slice index. 50 rtree: RTree<GeomWithData<Rectangle<(f64, f64)>, usize>>, 51 // One per polygon, in the same order. `IntervalTreeMultiPolygon` is 52 // `geo`'s purpose-built point-in-multipolygon index: a Y-interval tree of 53 // polygon edges plus a winding-number test on the candidates. 54 trees: Vec<IntervalTreeMultiPolygon<f64>>, 55} 56 57impl MultiPolygonIndex { 58 /// Build both indices over `polys`. The expensive piece is the per-polygon 59 /// edge tree, so we build them in parallel. 60 pub fn new(polys: &[MultiPolygon<f64>]) -> Self { 61 let aabbs: Vec<_> = polys 62 .iter() 63 .enumerate() 64 .filter_map(|(i, mp)| { 65 let r: Rect<f64> = mp.bounding_rect()?; 66 let rect = Rectangle::from_corners((r.min().x, r.min().y), (r.max().x, r.max().y)); 67 Some(GeomWithData::new(rect, i)) 68 }) 69 .collect(); 70 let trees = polys 71 .par_iter() 72 .map(IntervalTreeMultiPolygon::new) 73 .collect(); 74 Self { 75 rtree: RTree::bulk_load(aabbs), 76 trees, 77 } 78 } 79 80 /// Number of polygons the index was built over. 81 pub fn len(&self) -> usize { 82 self.trees.len() 83 } 84 85 pub fn is_empty(&self) -> bool { 86 self.trees.is_empty() 87 } 88 89 /// Call `f(i)` for every polygon `i` that contains `point`. 90 /// 91 /// Allocation-free; this is the inner-loop primitive both bulk and 92 /// streaming usage build on. 93 pub fn for_each_containing<F: FnMut(usize)>(&self, point: Point<f64>, mut f: F) { 94 let envelope = AABB::from_point((point.x(), point.y())); 95 // RTree narrows ~33k polygons down to a handful of bbox candidates; 96 // the IntervalTree then runs the precise predicate on each. 97 for hit in self.rtree.locate_in_envelope_intersecting(&envelope) { 98 if self.trees[hit.data].contains(&point) { 99 f(hit.data); 100 } 101 } 102 } 103 104 /// For each polygon, count how many of `points` lie inside it. Returns a 105 /// `Vec<u64>` of length [`len`](Self::len), indexed by polygon position. 106 /// Parallelises across `points` via rayon. 107 pub fn count_points_par(&self, points: &[Point<f64>]) -> Vec<u64> { 108 let counts: Vec<AtomicU64> = (0..self.len()).map(|_| AtomicU64::new(0)).collect(); 109 points.par_iter().for_each(|p| { 110 self.for_each_containing(*p, |i| { 111 counts[i].fetch_add(1, Ordering::Relaxed); 112 }); 113 }); 114 counts.into_iter().map(AtomicU64::into_inner).collect() 115 } 116} 117 118// ============================================================================= 119// Parquet driver 120// ============================================================================= 121 122// ZIPCODES is a GeoParquet file (geometry stored as GeoArrow MultiPolygon). 123// BUILDINGS is plain Parquet — geometry is a `geometry` column of WKB bytes. 124const ZIPCODES: &str = "data/us-zip-codes.parquet"; 125const BUILDINGS: &str = "data/microsoft-buildings_point.parquet"; 126 127fn main() -> Result<()> { 128 let t0 = Instant::now(); 129 130 let (codes, polys) = load_zipcodes()?; 131 let index = Arc::new(MultiPolygonIndex::new(&polys)); 132 133 // One counter per zipcode, indexed by position in `polys`. Dense indices 134 // beat a HashMap here: no hashing, no per-thread map to merge. 135 let counts: Arc<Vec<AtomicU64>> = 136 Arc::new((0..index.len()).map(|_| AtomicU64::new(0)).collect()); 137 138 let total = join_from_parquet(&index, &counts)?; 139 140 let dt = t0.elapsed().as_secs_f64(); 141 println!( 142 "{total} points joined in {dt:.2}s ({:.1}M pts/s)", 143 (total as f64 / 1.0e6) / dt 144 ); 145 print_top5(&counts, &codes); 146 Ok(()) 147} 148 149fn load_zipcodes() -> Result<(Vec<String>, Vec<MultiPolygon<f64>>)> { 150 let builder = ParquetRecordBatchReaderBuilder::try_new(File::open(ZIPCODES)?)?; 151 let geo_meta = builder.geoparquet_metadata().unwrap()?; 152 let schema = builder.geoarrow_schema(&geo_meta, true, Default::default())?; 153 let geom_col = geo_meta.primary_column.clone(); 154 let reader = GeoParquetRecordBatchReader::try_new(builder.build()?, schema)?; 155 156 let mut codes = Vec::new(); 157 let mut polys = Vec::new(); 158 for batch in reader { 159 let batch: RecordBatch = batch?; 160 let zip_idx = batch.schema().index_of("zipcode")?; 161 let geom_idx = batch.schema().index_of(&geom_col)?; 162 let zips = batch 163 .column(zip_idx) 164 .as_any() 165 .downcast_ref::<StringArray>() 166 .unwrap(); 167 let geoms = from_arrow_array( 168 batch.column(geom_idx).as_ref(), 169 batch.schema().field(geom_idx), 170 )?; 171 let mp = geoms.as_multi_polygon_opt().unwrap(); 172 for i in 0..batch.num_rows() { 173 if zips.is_null(i) || mp.is_null(i) { 174 continue; 175 } 176 polys.push(mp.value(i)?.to_multi_polygon()); 177 codes.push(zips.value(i).to_string()); 178 } 179 } 180 Ok((codes, polys)) 181} 182 183fn join_from_parquet(index: &MultiPolygonIndex, counts: &[AtomicU64]) -> Result<u64> { 184 // One probe to get the row-group count and confirm the column exists; 185 // the actual data is read by the per-row-group workers below. 186 let probe = ParquetRecordBatchReaderBuilder::try_new(File::open(BUILDINGS)?)?; 187 let n_groups = probe.metadata().num_row_groups(); 188 let total = probe.metadata().file_metadata().num_rows() as u64; 189 let geom_idx = probe.schema().index_of("geometry")?; 190 drop(probe); 191 192 // Dispatch one row group per rayon worker. Each worker opens its own 193 // `File` handle (parquet readers aren't `Sync`) and reads only its 194 // assigned row group via `with_row_groups`. With ~hundreds of row groups 195 // and a small thread pool, this saturates all cores trivially. 196 (0..n_groups) 197 .into_par_iter() 198 .try_for_each(|rg| -> Result<()> { 199 let reader = ParquetRecordBatchReaderBuilder::try_new(File::open(BUILDINGS)?)? 200 .with_row_groups(vec![rg]) 201 .with_batch_size(65_536) 202 .build()?; 203 for batch in reader { 204 let batch: RecordBatch = batch?; 205 let wkb = batch 206 .column(geom_idx) 207 .as_any() 208 .downcast_ref::<BinaryArray>() 209 .unwrap(); 210 for v in wkb.iter().flatten() { 211 let Some((x, y)) = decode_wkb_point(v) else { 212 continue; 213 }; 214 // Same primitive a non-parquet caller would use; the 215 // Relaxed fetch_add is safe because counts don't 216 // synchronise other memory and rayon's join provides 217 // the final happens-before edge before we read them. 218 index.for_each_containing(Point::new(x, y), |i| { 219 counts[i].fetch_add(1, Ordering::Relaxed); 220 }); 221 } 222 } 223 Ok(()) 224 })?; 225 Ok(total) 226} 227 228// WKB Point layout: byte 0 is the endianness flag (1 = little-endian), 229// bytes 1..5 are the geometry-type tag (1 = Point — we don't bother to 230// check), bytes 5..13 are x, bytes 13..21 are y. Calling this once per 231// point is the inner loop, hence inlining and the hand-rolled byte parse. 232#[inline(always)] 233fn decode_wkb_point(buf: &[u8]) -> Option<(f64, f64)> { 234 if buf.len() < 21 { 235 return None; 236 } 237 let xs: [u8; 8] = buf[5..13].try_into().unwrap(); 238 let ys: [u8; 8] = buf[13..21].try_into().unwrap(); 239 Some(if buf[0] == 1 { 240 (f64::from_le_bytes(xs), f64::from_le_bytes(ys)) 241 } else { 242 (f64::from_be_bytes(xs), f64::from_be_bytes(ys)) 243 }) 244} 245 246fn print_top5(counts: &[AtomicU64], codes: &[String]) { 247 let mut by_count: Vec<(&str, u64)> = counts 248 .iter() 249 .enumerate() 250 .map(|(i, n)| (codes[i].as_str(), n.load(Ordering::Relaxed))) 251 .filter(|(_, n)| *n > 0) 252 .collect(); 253 by_count.sort_by_key(|(_, n)| std::cmp::Reverse(*n)); 254 for (zip, n) in by_count.iter().take(5) { 255 println!("{zip:<8} {n}"); 256 } 257} 258 259// ============================================================================= 260// API tests — show how MultiPolygonIndex is used without the parquet path. 261// Run with `cargo test --release --example zipcode_join`. 262// ============================================================================= 263 264#[cfg(test)] 265mod tests { 266 use super::*; 267 use geo_types::{Coord, LineString, Polygon}; 268 269 fn unit_square(min_x: f64, min_y: f64) -> MultiPolygon<f64> { 270 let exterior = LineString::new(vec![ 271 Coord { x: min_x, y: min_y }, 272 Coord { 273 x: min_x + 1.0, 274 y: min_y, 275 }, 276 Coord { 277 x: min_x + 1.0, 278 y: min_y + 1.0, 279 }, 280 Coord { 281 x: min_x, 282 y: min_y + 1.0, 283 }, 284 Coord { x: min_x, y: min_y }, 285 ]); 286 MultiPolygon(vec![Polygon::new(exterior, vec![])]) 287 } 288 289 #[test] 290 fn for_each_containing_visits_each_polygon_at_most_once() { 291 // Two adjacent unit squares sharing the edge x=1. 292 let polys = vec![unit_square(0.0, 0.0), unit_square(1.0, 0.0)]; 293 let index = MultiPolygonIndex::new(&polys); 294 295 let mut hits = Vec::new(); 296 index.for_each_containing(Point::new(0.5, 0.5), |i| hits.push(i)); 297 assert_eq!(hits, vec![0]); 298 299 let mut hits = Vec::new(); 300 index.for_each_containing(Point::new(1.5, 0.5), |i| hits.push(i)); 301 assert_eq!(hits, vec![1]); 302 } 303 304 #[test] 305 fn count_points_par_returns_one_count_per_polygon() { 306 let polys = vec![ 307 unit_square(0.0, 0.0), 308 unit_square(1.0, 0.0), 309 unit_square(0.0, 1.0), 310 unit_square(1.0, 1.0), 311 ]; 312 let index = MultiPolygonIndex::new(&polys); 313 let points = vec![ 314 Point::new(0.5, 0.5), // poly 0 315 Point::new(0.1, 0.1), // poly 0 316 Point::new(1.5, 0.5), // poly 1 317 Point::new(0.5, 1.5), // poly 2 318 Point::new(1.5, 1.5), // poly 3 319 Point::new(1.5, 1.5), // poly 3 320 Point::new(5.0, 5.0), // outside everything 321 ]; 322 323 let counts = index.count_points_par(&points); 324 assert_eq!(counts, vec![2, 1, 1, 2]); 325 } 326}