Very (very) fast point-in-polyon queries, with Parquet ingestion
0
fast_point_in_polygon.rs
1//! The reusable piece is [`MultiPolygonIndex`]: hand it a slice
2//! of `MultiPolygon<f64>` and it lets you ask which polygons contain a given
3//! `Point<f64>`. It combines an `rstar::RTree` of bounding rectangles
4//! (coarse candidate filter) with one `IntervalTreeMultiPolygon` per polygon
5//! (the actual point-in-polygon predicate). Build once; query many times
6//! from any number of threads: `MultiPolygonIndex` is `Send + Sync`.
7//!
8//! The rest of the file is the parquet driver: it loads 33k zipcode polygons
9//! from a GeoParquet file, builds the index, then streams ~130M building
10//! centroids from a plain Parquet file (one row group per rayon worker) and
11//! aggregates hits into a `Vec<AtomicU64>`.
12//!
13//! Assumes `data/us-zip-codes.parquet` and
14//! `data/microsoft-buildings_point.parquet` already exist.
15//!
16//! Run with `cargo run --release --example zipcode_join`.
17//! To give you an idea of what is meant by "fast" in this context: the calculation runs in around 4.3 s
18
19use std::fs::File;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::time::Instant;
23
24use anyhow::Result;
25use arrow_array::{Array, BinaryArray, RecordBatch, StringArray};
26use geo::algorithm::indexed::IntervalTreeMultiPolygon;
27use geo::algorithm::{BoundingRect, Contains};
28use geo_traits::to_geo::ToGeoMultiPolygon;
29use geo_types::{MultiPolygon, Point, Rect};
30use geoarrow::array::{AsGeoArrowArray, GeoArrowArray, GeoArrowArrayAccessor, from_arrow_array};
31use geoparquet::reader::{GeoParquetReaderBuilder, GeoParquetRecordBatchReader};
32use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
33use rayon::prelude::*;
34use rstar::primitives::{GeomWithData, Rectangle};
35use rstar::{AABB, RTree};
36
37// =============================================================================
38// Reusable spatial index
39// =============================================================================
40
41/// Spatial index for repeated point-in-polygon queries over many
42/// `MultiPolygon`s.
43///
44/// Built from a slice of polygons; subsequent queries return the *position*
45/// of each containing polygon in that slice. Threads can share one
46/// `Arc<MultiPolygonIndex>`: both inner structures are `Send + Sync`.
47pub struct MultiPolygonIndex {
48 // rstar's bare `AABB` doesn't implement `RTreeObject`; `Rectangle` does.
49 // `GeomWithData` pairs each rectangle with its polygon's slice index.
50 rtree: RTree<GeomWithData<Rectangle<(f64, f64)>, usize>>,
51 // One per polygon, in the same order. `IntervalTreeMultiPolygon` is
52 // `geo`'s purpose-built point-in-multipolygon index: a Y-interval tree of
53 // polygon edges plus a winding-number test on the candidates.
54 trees: Vec<IntervalTreeMultiPolygon<f64>>,
55}
56
57impl MultiPolygonIndex {
58 /// Build both indices over `polys`. The expensive piece is the per-polygon
59 /// edge tree, so we build them in parallel.
60 pub fn new(polys: &[MultiPolygon<f64>]) -> Self {
61 let aabbs: Vec<_> = polys
62 .iter()
63 .enumerate()
64 .filter_map(|(i, mp)| {
65 let r: Rect<f64> = mp.bounding_rect()?;
66 let rect = Rectangle::from_corners((r.min().x, r.min().y), (r.max().x, r.max().y));
67 Some(GeomWithData::new(rect, i))
68 })
69 .collect();
70 let trees = polys
71 .par_iter()
72 .map(IntervalTreeMultiPolygon::new)
73 .collect();
74 Self {
75 rtree: RTree::bulk_load(aabbs),
76 trees,
77 }
78 }
79
80 /// Number of polygons the index was built over.
81 pub fn len(&self) -> usize {
82 self.trees.len()
83 }
84
85 pub fn is_empty(&self) -> bool {
86 self.trees.is_empty()
87 }
88
89 /// Call `f(i)` for every polygon `i` that contains `point`.
90 ///
91 /// Allocation-free; this is the inner-loop primitive both bulk and
92 /// streaming usage build on.
93 pub fn for_each_containing<F: FnMut(usize)>(&self, point: Point<f64>, mut f: F) {
94 let envelope = AABB::from_point((point.x(), point.y()));
95 // RTree narrows ~33k polygons down to a handful of bbox candidates;
96 // the IntervalTree then runs the precise predicate on each.
97 for hit in self.rtree.locate_in_envelope_intersecting(&envelope) {
98 if self.trees[hit.data].contains(&point) {
99 f(hit.data);
100 }
101 }
102 }
103
104 /// For each polygon, count how many of `points` lie inside it. Returns a
105 /// `Vec<u64>` of length [`len`](Self::len), indexed by polygon position.
106 /// Parallelises across `points` via rayon.
107 pub fn count_points_par(&self, points: &[Point<f64>]) -> Vec<u64> {
108 let counts: Vec<AtomicU64> = (0..self.len()).map(|_| AtomicU64::new(0)).collect();
109 points.par_iter().for_each(|p| {
110 self.for_each_containing(*p, |i| {
111 counts[i].fetch_add(1, Ordering::Relaxed);
112 });
113 });
114 counts.into_iter().map(AtomicU64::into_inner).collect()
115 }
116}
117
118// =============================================================================
119// Parquet driver
120// =============================================================================
121
122// ZIPCODES is a GeoParquet file (geometry stored as GeoArrow MultiPolygon).
123// BUILDINGS is plain Parquet — geometry is a `geometry` column of WKB bytes.
124const ZIPCODES: &str = "data/us-zip-codes.parquet";
125const BUILDINGS: &str = "data/microsoft-buildings_point.parquet";
126
127fn main() -> Result<()> {
128 let t0 = Instant::now();
129
130 let (codes, polys) = load_zipcodes()?;
131 let index = Arc::new(MultiPolygonIndex::new(&polys));
132
133 // One counter per zipcode, indexed by position in `polys`. Dense indices
134 // beat a HashMap here: no hashing, no per-thread map to merge.
135 let counts: Arc<Vec<AtomicU64>> =
136 Arc::new((0..index.len()).map(|_| AtomicU64::new(0)).collect());
137
138 let total = join_from_parquet(&index, &counts)?;
139
140 let dt = t0.elapsed().as_secs_f64();
141 println!(
142 "{total} points joined in {dt:.2}s ({:.1}M pts/s)",
143 (total as f64 / 1.0e6) / dt
144 );
145 print_top5(&counts, &codes);
146 Ok(())
147}
148
149fn load_zipcodes() -> Result<(Vec<String>, Vec<MultiPolygon<f64>>)> {
150 let builder = ParquetRecordBatchReaderBuilder::try_new(File::open(ZIPCODES)?)?;
151 let geo_meta = builder.geoparquet_metadata().unwrap()?;
152 let schema = builder.geoarrow_schema(&geo_meta, true, Default::default())?;
153 let geom_col = geo_meta.primary_column.clone();
154 let reader = GeoParquetRecordBatchReader::try_new(builder.build()?, schema)?;
155
156 let mut codes = Vec::new();
157 let mut polys = Vec::new();
158 for batch in reader {
159 let batch: RecordBatch = batch?;
160 let zip_idx = batch.schema().index_of("zipcode")?;
161 let geom_idx = batch.schema().index_of(&geom_col)?;
162 let zips = batch
163 .column(zip_idx)
164 .as_any()
165 .downcast_ref::<StringArray>()
166 .unwrap();
167 let geoms = from_arrow_array(
168 batch.column(geom_idx).as_ref(),
169 batch.schema().field(geom_idx),
170 )?;
171 let mp = geoms.as_multi_polygon_opt().unwrap();
172 for i in 0..batch.num_rows() {
173 if zips.is_null(i) || mp.is_null(i) {
174 continue;
175 }
176 polys.push(mp.value(i)?.to_multi_polygon());
177 codes.push(zips.value(i).to_string());
178 }
179 }
180 Ok((codes, polys))
181}
182
183fn join_from_parquet(index: &MultiPolygonIndex, counts: &[AtomicU64]) -> Result<u64> {
184 // One probe to get the row-group count and confirm the column exists;
185 // the actual data is read by the per-row-group workers below.
186 let probe = ParquetRecordBatchReaderBuilder::try_new(File::open(BUILDINGS)?)?;
187 let n_groups = probe.metadata().num_row_groups();
188 let total = probe.metadata().file_metadata().num_rows() as u64;
189 let geom_idx = probe.schema().index_of("geometry")?;
190 drop(probe);
191
192 // Dispatch one row group per rayon worker. Each worker opens its own
193 // `File` handle (parquet readers aren't `Sync`) and reads only its
194 // assigned row group via `with_row_groups`. With ~hundreds of row groups
195 // and a small thread pool, this saturates all cores trivially.
196 (0..n_groups)
197 .into_par_iter()
198 .try_for_each(|rg| -> Result<()> {
199 let reader = ParquetRecordBatchReaderBuilder::try_new(File::open(BUILDINGS)?)?
200 .with_row_groups(vec![rg])
201 .with_batch_size(65_536)
202 .build()?;
203 for batch in reader {
204 let batch: RecordBatch = batch?;
205 let wkb = batch
206 .column(geom_idx)
207 .as_any()
208 .downcast_ref::<BinaryArray>()
209 .unwrap();
210 for v in wkb.iter().flatten() {
211 let Some((x, y)) = decode_wkb_point(v) else {
212 continue;
213 };
214 // Same primitive a non-parquet caller would use; the
215 // Relaxed fetch_add is safe because counts don't
216 // synchronise other memory and rayon's join provides
217 // the final happens-before edge before we read them.
218 index.for_each_containing(Point::new(x, y), |i| {
219 counts[i].fetch_add(1, Ordering::Relaxed);
220 });
221 }
222 }
223 Ok(())
224 })?;
225 Ok(total)
226}
227
228// WKB Point layout: byte 0 is the endianness flag (1 = little-endian),
229// bytes 1..5 are the geometry-type tag (1 = Point — we don't bother to
230// check), bytes 5..13 are x, bytes 13..21 are y. Calling this once per
231// point is the inner loop, hence inlining and the hand-rolled byte parse.
232#[inline(always)]
233fn decode_wkb_point(buf: &[u8]) -> Option<(f64, f64)> {
234 if buf.len() < 21 {
235 return None;
236 }
237 let xs: [u8; 8] = buf[5..13].try_into().unwrap();
238 let ys: [u8; 8] = buf[13..21].try_into().unwrap();
239 Some(if buf[0] == 1 {
240 (f64::from_le_bytes(xs), f64::from_le_bytes(ys))
241 } else {
242 (f64::from_be_bytes(xs), f64::from_be_bytes(ys))
243 })
244}
245
246fn print_top5(counts: &[AtomicU64], codes: &[String]) {
247 let mut by_count: Vec<(&str, u64)> = counts
248 .iter()
249 .enumerate()
250 .map(|(i, n)| (codes[i].as_str(), n.load(Ordering::Relaxed)))
251 .filter(|(_, n)| *n > 0)
252 .collect();
253 by_count.sort_by_key(|(_, n)| std::cmp::Reverse(*n));
254 for (zip, n) in by_count.iter().take(5) {
255 println!("{zip:<8} {n}");
256 }
257}
258
259// =============================================================================
260// API tests — show how MultiPolygonIndex is used without the parquet path.
261// Run with `cargo test --release --example zipcode_join`.
262// =============================================================================
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use geo_types::{Coord, LineString, Polygon};
268
269 fn unit_square(min_x: f64, min_y: f64) -> MultiPolygon<f64> {
270 let exterior = LineString::new(vec![
271 Coord { x: min_x, y: min_y },
272 Coord {
273 x: min_x + 1.0,
274 y: min_y,
275 },
276 Coord {
277 x: min_x + 1.0,
278 y: min_y + 1.0,
279 },
280 Coord {
281 x: min_x,
282 y: min_y + 1.0,
283 },
284 Coord { x: min_x, y: min_y },
285 ]);
286 MultiPolygon(vec![Polygon::new(exterior, vec![])])
287 }
288
289 #[test]
290 fn for_each_containing_visits_each_polygon_at_most_once() {
291 // Two adjacent unit squares sharing the edge x=1.
292 let polys = vec![unit_square(0.0, 0.0), unit_square(1.0, 0.0)];
293 let index = MultiPolygonIndex::new(&polys);
294
295 let mut hits = Vec::new();
296 index.for_each_containing(Point::new(0.5, 0.5), |i| hits.push(i));
297 assert_eq!(hits, vec![0]);
298
299 let mut hits = Vec::new();
300 index.for_each_containing(Point::new(1.5, 0.5), |i| hits.push(i));
301 assert_eq!(hits, vec![1]);
302 }
303
304 #[test]
305 fn count_points_par_returns_one_count_per_polygon() {
306 let polys = vec![
307 unit_square(0.0, 0.0),
308 unit_square(1.0, 0.0),
309 unit_square(0.0, 1.0),
310 unit_square(1.0, 1.0),
311 ];
312 let index = MultiPolygonIndex::new(&polys);
313 let points = vec![
314 Point::new(0.5, 0.5), // poly 0
315 Point::new(0.1, 0.1), // poly 0
316 Point::new(1.5, 0.5), // poly 1
317 Point::new(0.5, 1.5), // poly 2
318 Point::new(1.5, 1.5), // poly 3
319 Point::new(1.5, 1.5), // poly 3
320 Point::new(5.0, 5.0), // outside everything
321 ];
322
323 let counts = index.count_points_par(&points);
324 assert_eq!(counts, vec![2, 1, 1, 2]);
325 }
326}