Wrangling and joining 130M points with DuckDB + the open source spatial stack

A little over a two years ago I wrote about point-in-polygon joins in R using an example of parking violations in Philedelphia. This example has been written about a few times involving a GPU example, using it as example for tuning PostGIS, a GeoPandas version for comparison, and a version comparing ArcPro, QGIS, PostGIS, and Manifold.

I don’t want to downplay the parking example because it’s a realistic representation of problems on the larger end of the normal GIS realm. For many general-purpose data engineers, though, 9 million parking tickets is a drop in the bucket and the point-in-polygon counting might just be one step in a long pipeline of transformations. In particular, the problem has a crutial simplification: it fits comfortably in memory on just about any recent laptop.

Let’s take the same problem up a notch but make it big enough that the naive approach will exhaust the memory of the average day-to-day laptop. By naive approach, I mean an approach that doesn’t involve any kind of explicit data format choices, chunking, indexing, or parallelism: these are the kinds of games you have to play when the size of your data approaches or gets bigger than memory. Let’s find a pretty big dataset and let the games begin!

The Data

For this post I’ll use the Microsoft U.S. buildings dataset as published in the GeoArrow Data page in FlatGeoBuf format. Roughly, this is 130 million points (the building centroids) from the “lower 48” (i.e., not Alaska or Hawaii). The Postal code tabulation areas are from the U.S. Census Bureau, and there are about 33,000 of them. (At the end or this post there’s a recipe for how to get the data if you’re itching to give any of this a try.)

Basically, we’ll be doing the same thing as the Philedelphia parking violation example: count points by polygon (in this case, make a list of zip codes and how many buildings are contained within each).

The Results

This is a pretty long post, so I’ll put the results up top. If you prefer to be surprised, feel free to skip and scroll back!

First of all, the winners here are really Ibis, DuckDB, Arrow, and GeoParquet. Those tools saved huge quantities of time off all existing “wrangling” times. DuckDB provided a way to automatically paralellize most of the wrangling operations without me having to think about any of it; Ibis provided a way for me not to have to remember all the SQL I needed to do common wrangling tasks; Arrow and GeoParquet were under the hood making sure Geometry stayed Geometry and I didn’t have to think too hard about writing to/reading from a binary column every time. There are some geospatial types and operators that I wish Ibis knew about, and until DuckDB spatial can propagate a CRS it won’t be useful for the general case, but those are minor gripes about a huge amount of work that it took collectively to get to this wonderful place we are now in.

Next, the results. From fastest to slowest on a nice but sort of old laptop (total time, including wrangling): DuckDB Geography (5.5 minutes), GeoPandas the easy way (6.5 minutes), GeoPandas the hard way (11 minutes), PostGIS (13 minutes).

From fastest to slowest on a Windows machine with more cores and 128 GB of RAM: GeoPandas the easy way (2 minutes), DuckDB Geography (2.5 minutes), PostGIS (6.75 minutes), Apache Sedona (23 minutes), GeoPandas the hard way (>30 minutes, something about GDAL/Windows here is funky).

QGIS:

  • Wrangling time: 9 minutes (MacOS M1)
  • Run time: 17 minutes (MacOS M1)
  • Parallelism: Single core only
  • Memory requirement: 200 MB
  • Tweaking required: Pretty much none
  • Bonus points: For the accurate progress bar!

PostGIS:

  • Wrangling time: 7 minutes (MacOS M1), 5 minutes (beefy Windows)
  • Run time: 6 minutes (MacOS M1), 1.75 minutes (beefy Windows)
  • Parallelism: Runtime seems to be ~linear on number of logical cores
  • Memory requirement: Respects a set memory limit (but seems to be much faster on a machine with virtually unconstrained memory). The peak usage of the Docker container running PostGIS seemed to be about 16 GB if unconstrained.
  • Tweaking required: Requires knowing the wrangling, indexing, and global configuration tricks to force the optimal query plan.
  • Bonus points: For not requiring any fancy SQL tricks for the actual query!

GeoPandas (the hard way):

  • Wrangling time: 9 minutes (MacOS M1) (needs a GDAL-readable file format with an index)
  • Run time: 2 minutes (MacOS M1), 20 minutes (Windows)
  • Parallelism: Configurable (can scale to more cores)
  • Memory requirement: 200 MB
  • Tweaking required: Requires knowing the exact query plan you want to execute (and seems to take strangely long on Windows).
  • Bonus points: For the lowest memory requirement!

GeoPandas (the easy way):

  • Wrangling time: 5 minutes (MacOS M1), 1.5 minutes (beefy Windows)
  • Run time: 1.25-1.5 minutes (MacOS M1), 19 seconds (beefy Windows)
  • Parallelism: Configurable (scales to more cores subject to the limitation of a Python thread pool)
  • Memory requirement: 40 GB (but much of this can be virtual memory and it seems to run on a laptop with 16 GB physical memory)
  • Tweaking required: Requires knowing the exact query plan you want to execute
  • Bonus points: For being the fastest runtime with sufficient memory!

DuckDB Geography:

  • Wrangling time: Integrated into the runtime (although only a few seconds of explicit wrangling was required).
  • Run time: 5.5 minutes (MacOS M1), 2.5 minutes (beefy Windows)
  • Parallelism: DuckDB effectively saturated all cores for most of the join, and this probably scales such that it would be correspondingly faster for more cores.
  • Memory requirement: DuckDB respects the memory limit you set (although I am guessing this will take quite a long time to complete if there is little memory available based on observing the temporary storage in the .tmp directory get used a fair amount). On a machine with virtually unconstrained memory, the peak usage from the DuckDB process was about 16 GB.
  • Tweaking required: Requires knowing intimate details of S2 cell math (hopefully we can remove this requirement and do this under the hood in the future!)
  • Bonus points: For requiring zero (explicit) wrangling time!

Apache Sedona:

  • Wrangling time: 30 seconds (MacOS M1), 15 seconds (beefy Windows)
  • Run time: 23 minutes (beefy Windows)
  • Parallelism: Scales to infinity (because Sedona/Spark can use large distributed clusters)
  • Memory requirement: Peak usage seemed to be about 16 GB
  • Tweaking required: Requires knowing some things about Spark clusters if you want this to be fast/scale
  • Bonus points: For the ability to scale to more than one node in a compute cluster!

Ok…read on for the full story!

The Tools

For this post I’ll be using a few tools:

  • DuckDB, for reading/writing various file formats (via the spatial extension, which provides bindings to GDAL)
  • Ibis, so that we can write as little SQL as absolutely necessary
  • The brand new/in-progress DuckDB Geography extension, because I just wrote it and am itching to give it a try on a real problem!
  • PostGIS, which is is one of the go-to tools for when data gets to be unwieldy for a purely in-memory solution.
  • Apache Sedona, another go-to tool for scaling spatial to the next level.
  • GeoPandas, for a test of the purely in-memory solution.
  • QGIS’s command-line tools, which provide an implementation of the operation we’re about to try that doesn’t involve loading the whole dataset into memory.

The Hardware

I’ll discuss timings with respect to two different machines:

  • MacOS M1: My somewhat old day-to-day laptop, a first generation MacBook Air with 16 GB of RAM and 8 cores.
  • “Beefy Windows”: A Windows machine with an AMD Ryzen 5 with 12 physical/24 logical cores and 128 GB of RAM. Mostly this helped identify the engines that relied on loading everything into RAM under the hood.

Wrangling the data

One of the fun parts of dealing with a lot of points is that they work rather badly with spatial file formats that are mostly designed to store arbitrary “geometry” and aren’t optimized for the point use case. The buildings data is provided as a 9.7 GB FlatGeoBuf file (~2 GB zipped); however, many of the tools we’re about to use are faster if we start with Parquet.

First, I’ll set up the Ibis session as our database frontend (to DuckDB and other things later on).

import ibis
from ibis import _

ibis.options.interactive = True
# Because the geography extension isn't published yet and we'll use it!
con = ibis.duckdb.connect(allow_unsigned_extensions=True)
con.load_extension("spatial")

First we’ll prep the zipcodes table in FlatGeoBuf and Parquet. If we had an Arrow-enabled GDAL build here, we could use ogr2ogr to do all of this, but for that to work you would need conda GDAL or homebrew GDAL (i.e., not a run-of-the-mill Linux installed version). DuckDB’s spatial extension can do this for us too, though, so we’ll just use that for now. This is pretty reasonable (takes 1 second, both files are about 100 MB).

con.raw_sql(
    """
    COPY (
        SELECT ZCTA5CE20 as zipcode, geom
        FROM 'us-zip-codes/cb_2020_us_zcta520_500k.shp'
    ) TO 'us-zip-codes.fgb' WITH (FORMAT GDAL, DRIVER FlatGeoBuf)
    """
)

con.raw_sql(
    """
    COPY (
        SELECT ZCTA5CE20 as zipcode, geom
        FROM 'us-zip-codes/cb_2020_us_zcta520_500k.shp'
    ) TO 'us-zip-codes.parquet'
    """
)

! ls -lh us-zip-codes.*
-rw-r--r--  1 deweydunnington  staff    95M Dec  1 22:07 us-zip-codes.fgb
-rw-r--r--  1 deweydunnington  staff    92M Dec  1 22:07 us-zip-codes.parquet

Next, we’ll make a Parquet file from the FlatGeoBuf of the building centroids.

con.raw_sql(
    """
    COPY (
        SELECT geom,
        FROM 'microsoft-buildings-point.fgb'
    ) TO 'microsoft-buildings-point.parquet'
    """
)
<duckdb.duckdb.DuckDBPyConnection at 0x10bcd66f0>

This takes about 2.5 minutes (9 minutes on Windows), in part because how this is wired together seems to only use a single thread to do most of the scanning. The slowness on Windows is I think GDAL-related but I am not entirely sure.

One particularly nice part about the GeoParquet output is that it’s significantly smaller than the FlatGeoBuf!

! ls -lh microsoft-buildings-point.*
-rw-r--r--  1 deweydunnington  staff   9.7G Sep 28  2023 microsoft-buildings-point.fgb
-rw-r--r--  1 deweydunnington  staff   2.0G Nov 28 22:42 microsoft-buildings-point.parquet

We’re not quite done with the wrangling (we’ll need to do some extra wrangling to get the most out of some engines), but for now we’re ready to get going!

QGIS

I’m going to start with the Desktop GIS solution because it’s point-and-shoot: we want to count points in polygons, and QGIS has a “point in polygon” counter. One slight hiccup is that we need to write some kind of a file with a spatial index: the FlatGeoBuf we’ve been given doesn’t have one, and if you try to run the tool without an index on the points layer you’ll get a stern WARNING: No spatial index exists for points layer, performance will be severely degraded. DuckDB spatial can do this for us (so can ogr2ogr using pretty much the same mechanism). Because GDAL’s default layer cration option for SPATIAL_INDEX is YES, we can just read and then write.

con.raw_sql(
    """
    COPY (
        SELECT geom from 'microsoft-buildings-point.parquet'
    ) TO 'microsoft-buildings-point-indexed.fgb' WITH (FORMAT GDAL, DRIVER FlatGeoBuf)
    """
)
<duckdb.duckdb.DuckDBPyConnection at 0x256c1d3f1f0>

This took DuckDB about 9 minutes (~17 minutes on Windows for I think the same GDAL-related reason). One consequence of this is that the FlatGeoBuf output (i.e., with the index included) is 15 GB!

! ls -lh microsoft-buildings-point*
-rw-r--r--  1 deweydunnington  staff    15G Nov 28 23:30 microsoft-buildings-point-indexed.fgb
-rw-r--r--  1 deweydunnington  staff   9.7G Sep 28  2023 microsoft-buildings-point.fgb
-rw-r--r--  1 deweydunnington  staff   2.0G Dec  1 22:12 microsoft-buildings-point.parquet

You can run the tool from the Processing Toolbox too, but for reproducibility’s sake, we’ll use a shell command here:

qgis_process run native:countpointsinpolygon \
  --POLYGONS='us-zip-codes.fgb' \
  --POINTS='microsoft-buildings-point-indexed.fgb' \
  --OUTPUT='qgis/us-zip-codes-counts.fgb'

Let’s spot-check the result for one zip code to compare our later results for general consistency:

con.sql(
    """
    SELECT NUMPOINTS FROM 'qgis/us-zip-codes-counts.fgb' WHERE zipcode = '05753'
    """
)
┏━━━━━━━━━━━┓
┃ NUMPOINTS ┃
┡━━━━━━━━━━━┩
│ float64   │
├───────────┤
│    4954.0 │
└───────────┘
con.sql("""SELECT sum(NUMPOINTS::INTEGER) FROM 'qgis/us-zip-codes-counts.fgb'""")
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ sum(CAST(NUMPOINTS AS INTEGER)) ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ decimal(38, 0)                  │
├─────────────────────────────────┤
│                       120689758 │
└─────────────────────────────────┘

QGIS was able to complete the join in just over 17 minutes. There didn’t seem to be any indication that any more than a single core was being used and the memory usage seemed to hover around 200 MB.

PostGIS

Next, we’ll give PostGIS a shot. I drew heavily on a Paul Ramsay post and an Arthur Lembo post for the configuration (which I may still have wrong here!). We need some wrangling here, too, because PostGIS needs geometries in a database. First we need to start up a PostGIS instance. I’m doing this via Docker (ensuring that my Docker settings allow for unconstrained use of my cores and memory):

docker run -it --rm \
  -e POSTGRES_PASSWORD=password \
  -e POSTGRES_USER=postgres \
  -p 5432:5432 \
  postgis/postgis:latest

DuckDB does have a PostgreSQL integration but doesn’t have quite enough flexibility to copy GeoParquet directly into a PostgreSQL table (or stream the result of a query into a PostgreSQL table, or at least I can’t figure out how to do it!). GDAL’s ogr2ogr can do this1, but it takes about half an hour to load the buildings file from a FlatGeoBuf and we can do quite a bit better.

The approach I used here was to use DuckDB’s postgres extension to write PostgreSQL COPY binary, and insert it using psql. I like this approach because (1) it’s faster than anything else I tried and (2) it doesn’t load the whole buildings file into memory at any given time (i.e., this solution would scale to something that’s actually bigger than memory). It does use a sneaky trick: the COPY binary format for GEOMETRY is WKB, so while we write a BLOB we copy into GEOMETRY and it gives us what we need. If we didn’t do this, we’d need to issue a separate command to create the GEOMETRY column which for me took an extra few minutes.

con.load_extension("postgres")
con.raw_sql(
    """
    COPY (
        SELECT zipcode, geom.st_aswkb() as geom from 'us-zip-codes.parquet'
    ) TO "zipcodes.bin" WITH (FORMAT POSTGRES_BINARY)
    """
)

! psql \
   "host=127.0.0.1 user=postgres password=password dbname=postgres" \
   -c "DROP TABLE IF EXISTS zipcodes; CREATE TABLE zipcodes (zipcode VARCHAR, geom GEOMETRY)"
! psql \
   "host=127.0.0.1 user=postgres password=password dbname=postgres" \
   -c "\copy zipcodes FROM 'zipcodes.bin' WITH (FORMAT BINARY)"
DROP TABLE
CREATE TABLE
COPY 33791
con.raw_sql(
    """
    COPY (
        SELECT geom.st_aswkb() as geom from 'microsoft-buildings-point.parquet'
    ) TO "buildings.bin" WITH (FORMAT POSTGRES_BINARY)
    """
)

! psql \
   "host=127.0.0.1 user=postgres password=password dbname=postgres" \
   -c "DROP TABLE IF EXISTS buildings; CREATE TABLE buildings (geom GEOMETRY)"
! psql \
   "host=127.0.0.1 user=postgres password=password dbname=postgres" \
   -c "\copy buildings FROM 'buildings.bin' WITH (FORMAT BINARY)"
DROP TABLE
CREATE TABLE


NOTICE:  table "buildings" does not exist, skipping


COPY 129735970

Collectively, these run for me in less than 2 minutes (same on Windows), with the time for the zipcodes table <3 seconds of that.

I’ll use the Ibis PostgreSQL backend to issue queries to the PostgreSQL connection. First, to check if our zipcodes all made it:

pg_conn = ibis.postgres.connect(user="postgres", password="password", host="127.0.0.1")
pg_conn.sql("SELECT zipcode, st_astext(geom) FROM zipcodes").head(5)
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ zipcode  st_astext                                                                        ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ stringstring                                                                           │
├─────────┼──────────────────────────────────────────────────────────────────────────────────┤
│ 15301  POLYGON((-80.368598 40.218953,-80.354929 40.244065,-80.349107 40.248988,-80.350… │
│ 66112  POLYGON((-94.797741 39.116388,-94.797719 39.128597,-94.789669 39.128587,-94.788… │
│ 28645  POLYGON((-81.755792 35.92534,-81.745947 35.929242,-81.737449 35.941565,-81.7406… │
│ 07502  POLYGON((-74.204425 40.918754,-74.202744 40.920531,-74.201612 40.921717,-74.201… │
│ 15658  POLYGON((-79.34323 40.172011,-79.337496 40.177769,-79.331105 40.179871,-79.3329… │
└─────────┴──────────────────────────────────────────────────────────────────────────────────┘

Next, check the buildings table to make sure they all made it:

pg_conn.sql("SELECT st_astext(geom) FROM buildings").head(5)
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ st_astext                                    ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ string                                       │
├──────────────────────────────────────────────┤
│ POINT(-84.95972352431309 32.421832237128726) │
│ POINT(-84.9597329778881 32.420910155065535)  │
│ POINT(-84.9599375 32.2352595)                │
│ POINT(-84.96016955991914 32.422296538057)    │
│ POINT(-84.9616848201395 32.41927086747914)   │
└──────────────────────────────────────────────┘

For this to work we need to build an index on the points just like we did with QGIS, which for me took ~5 minutes (3 minutes on a beefier Windows machine):

pg_conn.raw_sql(
    """
    CREATE INDEX buildings_idx ON buildings USING GIST (geom);
    """
)

We’re good to go! Let’s roll:

pg_conn.raw_sql("""SET min_parallel_table_scan_size = '1kB'""")
pg_conn.raw_sql("""SET max_parallel_workers_per_gather = 12""")
pg_conn.sql(
    """
    SELECT zipcodes.zipcode, count(*) as numpoints
    FROM zipcodes
    JOIN buildings ON ST_Contains(zipcodes.geom, buildings.geom)
    GROUP BY zipcodes.zipcode
    """
).to_csv("postgis/result.csv")

This completes for me in 6 minutes (1 minute 45 seconds on a beefier Windows machine). Even though I set the number of workers to 12, I never saw more than 300% CPU usage (suggesting each worker wasn’t saturating a core due to IO constraints, or that fewer than 12 workers were used). Let’s spot check a zipcode and the total count!

con.read_csv("postgis/result.csv")\
    .filter(_.zipcode == "05753")
┏━━━━━━━━━┳━━━━━━━━━━━┓
┃ zipcode  numpoints ┃
┡━━━━━━━━━╇━━━━━━━━━━━┩
│ stringint64     │
├─────────┼───────────┤
│ 05753  4954 │
└─────────┴───────────┘
con.read_csv("postgis/result.csv")\
    .aggregate(sum=_.numpoints.sum())
┏━━━━━━━━━━━┓
┃ sum       ┃
┡━━━━━━━━━━━┩
│ int64     │
├───────────┤
│ 129603206 │
└───────────┘

Interestingly, we get a slightly different total count. I’m wondering if QGIS uses intersection rather than containment here, which might account for the difference if there were building centroids close enough to vertices/edges of the polygons.

GeoPandas the hard way

I say the hard way here because we’re going to basically reconstruct the nice query plan that PostGIS calculated for us but using GeoPandas intstead to do the execution. The query plan is basically:

  • Split zipcodes into num_workers pieces
  • On each worker, loop through zipcode polygons one at a time, querying the point index for each polygon.
  • After getting getting a list of candidate points, check each of them to make sure they’re actually contained within the polygon (and then count the ones that really were contained).

This is rather specific to this exact task, but if you’re throwing a lot of data at a problem you might need a specialized solution.

This solution shares the requirement that we have a file format with a spatial index that GDAL understands. This is because GeoPandas reads using GDAL/OGR and lets us pass a spatial filter through (that makes use of the index). We’ll reuse microsoft-buildings-point-indexed.fgb but make sure to count the 9 minutes it took to get our data into a format that works here.

The heart of this solution is a single Python function that quite literally counts points for a single polygon. The gymnastics with writing the function to a file and reimporting it is something about the ProcessPoolExecutor and its interaction with Jupyter and probably isn’t necessary in a standalone Python script.

import geopandas

buildings_fun = """
import geopandas
import warnings
warnings.filterwarnings('ignore') # Ignore warnings about "read 0 features"

def count_buildings(zipcode_feat):
    points = geopandas.read_file(
        'microsoft-buildings-point-indexed.fgb',
        bbox=zipcode_feat.bounds
    )

    query = points.geometry.sindex.query(
        zipcode_feat,
        predicate="contains",
        sort=False
    )

    return len(query)
"""

with open("count_buildings.py", "w") as f:
    f.write(buildings_fun)

from count_buildings import count_buildings

zipcodes_df = geopandas.read_file("us-zip-codes.fgb")

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=8) as executor:
    zipcodes_df["building_count"] = list(executor.map(count_buildings, zipcodes_df.geometry))

This finishes for me in 2 minutes (~20 minutes on Windows for some reason) and responds almost perfectly to the number of workers (provided you have enough cores to house them).

Let’s do our spot check:

zipcodes_df["building_count"][zipcodes_df.zipcode == "05753"]
10930    4954
Name: building_count, dtype: int64
zipcodes_df["building_count"].sum()
np.int64(129603206)

This exactly matches PostGIS!

GeoPandas the easy way

I’m calling this one “the easy way” because it’s a bit more point-and-shoot: we’re just going to load the microsoft-buildings-point file straight into memory and build the index there. That said, it’s easy only if you have a huge quantity of memory available to you. This doesn’t happen to crash on my laptop (16 GB physical memory) but I’ve had a slightly bigger example crash, and it does make the computer almost unusable while it happens (because it moves a huge amount of memory to swap/disk).

First, we’ll load the files. I found a slight advantage to GeoParquet vs. FlatGeoBuf on the read (~15% faster) but this might have been due to some of the extreme memory pressure (after the read, the Python process was using 28 GB of memory and there’s on 16 on my laptop).

import geopandas

zipcodes_df = geopandas.read_file("us-zip-codes.fgb")
buildings_df = geopandas.read_parquet("microsoft-buildings-point.parquet")

The read takes 2.5 minutes / 44 seconds on a beefy Windows machine (28 GB memory usage on both).

sindex = buildings_df.geometry.sindex

The index building takes 2.5-3 minutes / 48 seconds on a beefy Windows machine (40 GB memory usage). It’s just a guess but I think this timing depends considerably on how much swapping has to happen to make this work which can be variable if you have less than 28 GB of actual memory.

Next, we’ll set up the spatial index query attempting to use a bit of parallelism. The approach here is to chunk the zipcodes into reasonably large chunks (12 here) and exploit the fact that sindex.query() can take a vector of geometries.

import pandas as pd

zipcode_chunks = []
chunk_size = len(zipcodes_df.geometry) // 11
n_chunks = 11 + 1
for i in range(n_chunks):
    start = i * chunk_size
    end = start + chunk_size
    chunk = zipcodes_df.geometry.iloc[start:end]
    zipcode_chunks.append(chunk)

def count_buildings_chunk(chunk):
    result = sindex.query(chunk, predicate="contains")
    return pd.Index(pd.Series(chunk.index).iloc[result[0]]).value_counts(sort=False)

Finally, we can set up the executor and run the join.

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=8) as executor:
    chunk_results = list(executor.map(count_buildings_chunk, zipcode_chunks))
    zipcodes_df["building_count"] = pd.concat(chunk_results)

This finishes for me in 1.25-1.5 minutes (19 seconds on a beefy Windows machine). Let’s spot check!

zipcodes_df["building_count"][zipcodes_df.zipcode == "05753"]
10930    4954.0
Name: building_count, dtype: float64
zipcodes_df["building_count"].sum()
np.float64(129603206.0)

DuckDB Geography

I have to admit that this is the one I’m the most excited about, but it’s also not entirely practical at the moment since the DuckDB Geography extension is brand new and not easily installable (e.g., as of this writing you’ll have to grab a binary from the latest CI job, extract/copy it into your working directory and install it from a local file).

con.raw_sql("FORCE INSTALL 'geography.duckdb_extension'")
con.load_extension("geography")

First, we’ll read zipcodes into a table where the “geometry” column is actually a GEOGRAPHY type (as defined by the extension). We’ll use s2_prepare() to exploit a unique type of GEOGRAPHY that is “prepared”, which allows it to re-use quite a bit of effort that would otherwise be duplicated for every point-in-polygon test that’s about to happen.

The key part that will let this join compete with the explicitly indexed solutions is the “covering”. This “covering” is a collection of discrete cells represented by 64-bit integers. It takes a bit of effort in advance to calculate these, but DuckDB can blow through this all in 7 seconds.

con.raw_sql("""
    CREATE OR REPLACE TABLE zipcodes AS
    SELECT
        zipcode,
        geom.st_aswkb().s2_geogfromwkb().s2_prepare() as geog,
        geog.s2_covering() as covering
    FROM "us-zip-codes.parquet"
""")
<duckdb.duckdb.DuckDBPyConnection at 0x2093af5e6f0>

Next, we’ll prepare the two sides of the join as VIEWs. Creating them as VIEWs are free (takes zero seconds) but lets the actual join be a bit more concise. One of the key parts of this is the “unnesting” of the covering, which gives one row for each candidate S2 cell, keeping a copy of the original geography in each row.

For the buildings layer, we’ll approximate each building location as an S2 cell as well. These cells are “leaf” cells, or cells at the highest level (30). Leaf s2 cells have a resolution of a few cm, which is OK for us since the accuracy of the collection was almost certainly not that high to begin with.

con.raw_sql("""
    CREATE OR REPLACE VIEW zipcodes_unnested AS
    SELECT
        zipcode,
        geog,
        unnest(covering) as cell
    FROM zipcodes
""")

con.raw_sql("""
    CREATE OR REPLACE VIEW buildings AS
    SELECT
        geom.st_aswkb().s2_cellfromwkb() as cell
    FROM "microsoft-buildings-point.parquet"
""")
<duckdb.duckdb.DuckDBPyConnection at 0x2093af5e6f0>

Finally, the join. This is a range inequality join that is doing something similar to the index queries above! Except, instead of using a custom index, we exploit the fact that checking one s2 cell’s containment of another is a simple range check. This gets our “preselection” sorted, which lets the expensive WHERE clause only check points that are likely to be inside the polygon.

con.sql("""
    SELECT
        zipcodes_unnested.zipcode,
        count(*) as numpoints
    FROM
        buildings
    LEFT JOIN zipcodes_unnested ON
        zipcodes_unnested.cell.s2_cell_range_min() <= buildings.cell
        AND zipcodes_unnested.cell.s2_cell_range_max() >= buildings.cell
    WHERE s2_contains(zipcodes_unnested.geog, buildings.cell)
    GROUP BY zipcode
""").to_csv("duckdb-geography/result.csv")

This runs for me in 5.5 minutes (2.5 minutes on a beefier Windows machine). Pretty much all of my cores were saturated the whole time, and DuckDB quite nicely respected the memory limit of my system (actively using the .tmp directory to spill to disk a number of times).

Let’s spot check!

join_result = con.read_csv("duckdb-geography/result.csv")
join_result.filter(_.zipcode == "05753")
┏━━━━━━━━━┳━━━━━━━━━━━┓
┃ zipcode  numpoints ┃
┡━━━━━━━━━╇━━━━━━━━━━━┩
│ stringint64     │
├─────────┼───────────┤
│ 05753  4954 │
└─────────┴───────────┘
join_result.aggregate(_.numpoints.sum())
┏━━━━━━━━━━━━━━━━┓
┃ Sum(numpoints) ┃
┡━━━━━━━━━━━━━━━━┩
│ int64          │
├────────────────┤
│      129603207 │
└────────────────┘

This one is amusing to me because there’s apparently exactly one building of 130 million where the approximation of each building centroid as an S2 leaf cell (whose resolution is on average about 1-2 cm) makes a difference. Barring that, it looks like we get the same result.

Apache Sedona

I recently wrote up my adventures getting started with Apache Sedona, and this post calls for revisiting that since the big join is the bread-and-butter of Sedona’s target workload. I’m not quite to the point where I know how to get Seonda up and running without Docker, so let’s spin up the image and get going!

docker run \
    -e DRIVER_MEM=10G -e EXECUTOR_MEM=50G \
    -p 8888:8888 -p 8080:8080 -p 8081:8081 -p 4040:4040 \
    -v $(pwd):/data \
    --rm \
    apache/sedona:latest

The documentation is careful to mention that the number of partitionions can have a huge impact on join performance. Repartitioning from within Spark/Sedona is somewhat expensive (took me 7 minutes to repartition the zipcodes), so I’ll do it from DuckDB before we hop into Sedona. Here I’m using the geography extension to create the partitions because it’s very fast (30 seconds, or 15 seconds on a beefier Windows machine).

con.load_extension("geography")
con.sql(
    """
    SELECT
        geom.st_aswkb().s2_cellfromwkb().s2_cell_parent(4).s2_cell_token() as partition_cell,
        geom
    FROM 'microsoft-buildings-point.parquet'
    """
).to_parquet("buildings", partition_by="partition_cell", overwrite=True)

Let’s check that we have good spatial partitions!

con.sql(
    """
    SELECT partition_cell, st_extent_agg(geom)
    FROM "buildings/*/*.parquet"
    GROUP BY partition_cell
    """
).to_pandas().plot(edgecolor="black", facecolor="none")
<Axes: >

png

Next, let’s hop into Spark/Sedona land and create our session.

from sedona.spark import *

config = SedonaContext.builder().getOrCreate()
sedona = SedonaContext.create(config)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/01 23:47:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Next, we’ll create our tables. I’m using createOrReplaceTempView() here so that we can use these from SQL in the next step.

sedona.read.format("geoparquet")\
    .load("/data/us-zip-codes.parquet")\
    .createOrReplaceTempView("zipcodes")
sedona.read.format("geoparquet")\
    .load("/data/buildings/*/*.parquet")\
    .createOrReplaceTempView("buildings")

Finally, we type the same join that we did in PostGIS. This builds the index for us automatically, which is very cool.

sedona.sql(
    """
    SELECT zipcodes.zipcode, count(*) as numpoints
    FROM zipcodes
    JOIN buildings ON ST_Contains(zipcodes.geom, buildings.geom)
    GROUP BY zipcodes.zipcode
    """
).write.mode("overwrite").csv("/data/sedona/results.csv")
24/12/02 02:38:01 WARN JoinQuery: UseIndex is true, but no index exists. Will build index on the fly.

This ran for me in 38 minutes with no partitioning and 20 minutes with the buildings partitioned nicely. Partitioning the zipcodes didn’t seem to have an effect.

Some nitty gritty details

I mostly put together this visualization for myself…for our target zipcode, the S2 cells that make the covering and the bounding rectangle used by the RTree-based methods (i.e., everything except DuckDB Geography). In this case, the bounding rectangle is quite a good approximation because the zipcode area is squareish. We could configure the covering to contain more cells or only contain cells of a certain size, which might improve our timing.

# The polygon we're after
ax = con.read_parquet("us-zip-codes.parquet")\
    .filter(_.zipcode == "05753")\
    .select(_.geom)\
    .to_pandas().plot(edgecolor="black", facecolor="none")

# The "covering" S2 cells (used by duckdb geography)
con.sql(
    """
    SELECT cell.s2_aswkb().st_geomfromwkb() as geom
    FROM zipcodes_unnested
    WHERE zipcode = '05753'
    """
).to_pandas().plot(edgecolor="red", facecolor="none", ax=ax)

# The bounding rectangle (used by everybody else)
con.read_parquet("us-zip-codes.parquet")\
    .filter(_.zipcode == "05753")\
    .mutate(geom=_.geom.envelope())\
    .select(_.geom)\
    .to_pandas().plot(edgecolor="purple", facecolor="none", ax=ax)
<Axes: >

png

The Setup

This post uses data from the GeoArrow data page, which you can download with curl or similar (it’s a ~2 GB download in FlatGeoBuf format, 8 GB otherwise).

curl -L https://github.com/geoarrow/geoarrow-data/releases/download/v0.1.0/microsoft-buildings-point.fgb.zip \
    -o microsoft-buildings-point.fgb.zip
unzip microsoft-buildings-point.fgb.zip
rm microsoft-buildings-point.fgb.zip

It also uses a zipcode dataset distribued by the U.S. census:

curl -L https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_zcta520_500k.zip \
    -o cb_2020_us_zcta520_500k.zip
unzip cb_2020_us_zcta520_500k.zip -d us-zip-codes
rm cb_2020_us_zcta520_500k.zip

I used a Python virtual environment created with:

pip install 'ibis-framework[duckdb,postgres,geospatial]' geopandas matplotlib

To use DuckDB geography, you’ll unfortunately need to jump through some hoops until it is a community extension (by the time you read this hopefully it is!). You’ll need to download a version of the extension built for your DuckDB version and platform from the latest CI run, extract it, and then run the SQL INSTALL '/path/to/geography.duckdb_extension'.


  1. ! ogr2ogr -f PostgreSQL PG:"host=127.0.0.1 user=postgres password=password dbname=postgres" microsoft-buildings-point.parquet -nln buildings was what I timed for this (using Parquet instead of FlatGeoBuf just in case the format is the issue, and I don’t think that it is). ↩︎

Software Engineer & Geoscientist