(Geo)Data cleaning with Arrow and SedonaDB

Last week I had the pleasure of being a guest on a Cloud Native Geospatial webinar with my colleagues Jia Yu and Matt Forrest where we talked SedonaDB. In the webinar Matt demoed a fantastic blog post comparing a real-world workflow on SedonaDB, DuckDB, and PostGIS. In preparation, I came up with a few examples and only a small bit ended up making it to the live demo. Here’s the full version!

The gist of the post: let’s use SedonaDB’s top-notch (if I do say so myself) Arrow interop to ingest some totally bonkers real-world data and write it to nice clean GeoParquet.

Let’s get started: SedonaDB can be instealled with pip:

pip install "apache-sedona[db]"

We’ll create the handle to our session (sd) and turn on interactive mode to auto-print results (handy when not running queries against remote data!).

import sedona.db

sd = sedona.db.connect()
sd.options.interactive = True

Now let’s get into the problem a little. In 2012 I got a job as a Project Coordinator at a small environmental consultant that did upstream (i.e., before the refinery) oil and gas work in Alberta. The sites we were visiting didn’t have road addresses…they were numbered according to the Dominion Land Survey, which is basically the reason why, when you fly over the prairies, you see exactly one square mile postage stamps from Winnipeg all the way to the rockies. Those postage stamps all have numbers and I needed to look up how to get there while driving in places with terrible cell phone internet.

The end of the story is that I wrote my own geocoder that compressed the DLS into about 5 MB and made a few thousand dollars selling it on Google Play in the mid 2010s. The beginning of the story is that I had to ingest many GB of totally wild GIS data provided by various governments…in this post I’ll reimagine that with the tools we now have (it’s much easier now than in 2012!).

tl;dr

  • For Saskatchewan we’ll use DuckDB’s awesome CSV reader and stream the query directly into SedonaDB (where we’ll use SedonaDB’s coordinate reference system support to keep the original UTM coordinates when saving to GeoParquet)
  • For Alberta we’ll roll our own reader for an absolutely bonkers text file
  • For Manitoba we’ll convert a slew of shapefiles into GeoParquet with UTM coordinates

Alberta

The Alberta data is a great example of writing your own reader and leveraging Sedona’s Arrow interop to easily injest it. Basically, anything that can be turned into an iterable of pyarrow RecordBatches works as input. We need this here because Alberta publishes its DLS data (or used to) in a fixed-width text file looks like this:

with open("data/ATS_V4_1.SEQ") as f:
    print(f.readline())
40100101E449.00701238110.0050793319840621S07LBD         1421   0.000000000    199104030000000020050331113922

Thankfully, there’s a Word document (eyeroll) distributed alongside the data giving the names and lengths of the columns:

import numpy as np

ats_field_names = [
    "meridian", "range", "township", "section", "quarter_section",
    "latitude", "longitude", "year_computed", "month_computed", "day_computed",
    "station_code", "status_code", "horizontal_classification", "comment_field",
    "horizontal_origin", "horizontal_method", "horizontal_datum",
    "road_allowance_code", "elevation", "elevation_date", "elevation_origin",
    "elevation_method", "elevation_accuracy", "vertical_datum", "parcel_year_computed",
    "parcel_month_computed", "parcel_day_computed", "1_20_000_year_computed",
    "1_20_000_month_computed", "1_20_000_day_computed", "update_date"
]

ats_field_lengths = [
    1, 2, 3, 2, 2, 11, 12, 4, 2, 2, 1, 1, 1, 12, 1, 1, 1, 1, 6,
    8, 1, 1, 1, 1, 4, 2, 2, 4, 2, 2, 14
]

ats_field_end = np.cumsum(ats_field_lengths)
ats_field_start = ats_field_end - ats_field_lengths

Unfortunately Pandas’ fixed-width file reader borks on this:

import pandas as pd

pd.read_fwf("data/ATS_V4_1.SEQ", widths=ats_field_lengths).head(5)
#> UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa0 in position 1584: invalid start byte

…probably because it’s some ISO encoding. In any case, it’s not too hard to roll our own reader here, particularly because we only care about the first 7 fields:

def read_ats_line():
    with open("data/ATS_V4_1.SEQ", "rb") as f:
        for line in f:
            yield [
                line[ats_field_start[i] : ats_field_end[i]].decode() for i in range(7)
            ]


next(read_ats_line())
['4', '01', '001', '01', 'E4', '49.00701238', '110.00507933']

We can use the handy batched() iterator tool to turn this into an iterable of pyarrow RecordBatches:

import pyarrow as pa
from itertools import batched

ats_schema = pa.schema([pa.field(nm, pa.string()) for nm in ats_field_names[:7]])
def read_ats_batches(n=1024):
    for batch in batched(read_ats_line(), n):
        yield pa.record_batch(
            list(zip(*batch)),
            schema=ats_schema
        )

next(read_ats_batches(5))
pyarrow.RecordBatch
meridian: string
range: string
township: string
section: string
quarter_section: string
latitude: string
longitude: string
----
meridian: ["4","4","4","4","4"]
range: ["01","01","01","01","01"]
township: ["001","001","001","001","001"]
section: ["01","01","01","01","01"]
quarter_section: ["E4","N4","NE","P1","P2"]
latitude: ["49.00701238","49.01415809","49.01424023","48.99961020","48.99960948"]
longitude: ["110.00507933","110.01615511","110.00508075","110.00087106","110.00507790"]

Finally, we need to wire that in to SedonaDB. We can do that with RecordBatchReader.from_batches(), which wraps the iterable in something that implements the Arrow C Stream interface. From there we can go straight to sd.create_data_frame() and write our lovely clean GeoParquet file:

ats_reader = pa.RecordBatchReader.from_batches(ats_schema, read_ats_batches())
df = sd.create_data_frame(ats_reader)
df.to_view("ats", overwrite=True)
sd.sql("""
SELECT
    meridian, range, township, section, quarter_section,
    ST_SetSRID(ST_Point(latitude::DOUBLE, longitude::DOUBLE), 4326) as geometry
FROM ats
""").to_parquet("data/ats.parquet")
sd.read_parquet("data/ats.parquet").schema
SedonaSchema with 6 fields:
  meridian: utf8<Utf8View>
  range: utf8<Utf8View>
  township: utf8<Utf8View>
  section: utf8<Utf8View>
  quarter_section: utf8<Utf8View>
  geometry: geometry<WkbView(ogc:crs84)>
sd.read_parquet("data/ats.parquet").show(5)
┌──────────┬───────┬──────────┬─────────┬─────────────────┬─────────────────────────────────┐
│ meridian ┆ range ┆ township ┆ section ┆ quarter_section ┆             geometry            │
│   utf8   ┆  utf8 ┆   utf8   ┆   utf8  ┆       utf8      ┆             geometry            │
╞══════════╪═══════╪══════════╪═════════╪═════════════════╪═════════════════════════════════╡
│ 4        ┆ 01    ┆ 001      ┆ 01      ┆ E4              ┆ POINT(49.00701238 110.00507933) │
├╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 4        ┆ 01    ┆ 001      ┆ 01      ┆ N4              ┆ POINT(49.01415809 110.01615511) │
├╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 4        ┆ 01    ┆ 001      ┆ 01      ┆ NE              ┆ POINT(49.01424023 110.00508075) │
├╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 4        ┆ 01    ┆ 001      ┆ 01      ┆ P1              ┆ POINT(48.9996102 110.00087106)  │
├╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 4        ┆ 01    ┆ 001      ┆ 01      ┆ P2              ┆ POINT(48.99960948 110.0050779)  │
└──────────┴───────┴──────────┴─────────┴─────────────────┴─────────────────────────────────┘

Saskatchewan

Saskatchewan distributes its data as a paid offering, and while it does give a single shapefile, that wouldn’t be any fun. It also gives a very strangely formatted .csv file (with a .lst extension, which I can only assume was done on purpose to make us work a little harder).

with open("data/SaskGrid.lst") as f:
    print(f.readline(), end="")
    print(f.readline(), end="")
      PPID,  ,               ,CORNER,SECTION,TRM        ,SMNUMBER,  0.00,   0.00,13
0400001DJN,  ,               , NQ,22 ,T92R20W3M  ,73820611,249213.14,6325133.25,13

This CSV-except-not-CSV-because-it-ends-with-lst file warms the cockles of my heart because of its utter ineefectiveness: there are two columns named 0.00, two blank columns, and an entire column whose only contents is 13 (whose name, is, aptly, 13). While SedonaDB can import CSVs using DataFusion’s great CSV reader, we haven’t exposed all the options yet and DuckDB is truly fantastic at guessing CSV read options from terribly formatted files.

import duckdb

duckdb.read_csv("data/SaskGrid.lst", header=True).limit(5)
┌────────────┬─────────┬─────────────────┬─────────┬─────────┬─────────────┬──────────┬───────────┬────────────┬───────┐
│    PPID    │ column1 │     column2     │ CORNER  │ SECTION │     TRM     │ SMNUMBER │   0.00    │   0.00_1   │  13   │
│  varchar   │ varchar │     varchar     │ varchar │ varchar │   varchar   │ varchar  │  double   │   double   │ int64 │
├────────────┼─────────┼─────────────────┼─────────┼─────────┼─────────────┼──────────┼───────────┼────────────┼───────┤
│ 0400001DJN │         │                 │  NQ     │ 22      │ T92R20W3M   │ 73820611 │ 249213.14 │ 6325133.25 │    13 │
│ 0400001DKB │         │                 │  NW     │ 06      │ T52R20W2M   │ 32921301 │ 503087.15 │ 5924243.64 │    13 │
│ 0400001DPC │         │                 │  WQ     │ 14      │ T27R14W3M   │ 65070913 │ 299361.37 │ 5687924.65 │    13 │
│ 0400001DRX │         │                 │  WQ     │ 28      │ T06R29W3M   │ 85110407 │  148006.8 │  5494624.0 │    13 │
│ 0400001DSH │         │                 │  SQ     │ 02      │ T44R21W3M   │ 74691514 │ 233938.26 │ 5852310.45 │    13 │
└────────────┴─────────┴─────────────────┴─────────┴─────────┴─────────────┴──────────┴───────────┴────────────┴───────┘

If our coordinates were only longitude/latitude we could use DuckDB spatial to write GeoParquet here. Unfortunately, they are not, and so we’ll use SedonaDB’s CRS support to assign the correct CRS (UTM Zone 13, NAD83) and write it out.

The key here is the use of arrow(): this returns an object implementing __arrow_c_stream__() which is something SedonaDB knows how to scan:

sask_grid_csv = duckdb.read_csv("data/SaskGrid.lst", header=True).to_view("sask_grid")
sask_grid_reader = duckdb.sql("""
SELECT TRM AS trm, SECTION AS section, CORNER as corner, "0.00" as x, "0.00_1" as y
FROM sask_grid
""").arrow()

sd.create_data_frame(sask_grid_reader).to_view("sask_grid", overwrite=True)
sd.sql("""
SELECT
    trm, section, corner,
    ST_SetSRID(ST_Point(x, y), 26913) as geometry
FROM sask_grid
""").to_parquet("data/SaskGrid.parquet")

We can use the fantastic lonboard to check that our data are correctly aligned with the right CRS:

import lonboard

lonboard.viz(sd.read_parquet("data/SaskGrid.parquet"))

Manitoba

Finally, Manitoba! Manitoba has the cleanest data output here in the sense that they’re all shapefiles, although there are still some issues. There are two issues to work around with this particular data:

  • There are a whack of shapefiles
  • The CRS is not actually set on (most of) the shapefiles. From the two that are set, we know it is UTM Zone 14N, NAD83.
import glob

mb_shp = list(sorted(glob.glob("data/mb/**/*DLSPoly.shp")))
mb_shp[:5]
['data/mb/maptile01_shp/maptile1_DLSPoly.shp',
 'data/mb/maptile02_shp/maptile2_DLSPoly.shp',
 'data/mb/maptile03_shp/maptile3_DLSPoly.shp',
 'data/mb/maptile04_shp/maptile4_DLSPoly.shp',
 'data/mb/maptile05_shp/maptile5_DLSPoly.shp']

SedonaDB will soon support doing this kind of thing directly, but for now we’ll use a more direct approach using the fact that pyogrio exposes GDAL/OGR’s Arrow C Stream support.

import pyogrio

with pyogrio.raw.ogr_open_arrow(mb_shp[0], {}) as info:
    meta, reader = info
    print(meta)
    sd.create_data_frame(reader).show(5)
{'crs': None, 'encoding': 'ISO-8859-1', 'fields': array(['PIN', 'QS', 'SEC', 'TWP', 'RGE', 'MER', 'AREA'], dtype=object), 'geometry_type': 'Polygon', 'geometry_name': '', 'fid_column': 'OGC_FID'}
┌───────────┬──────┬───────┬───────┬──────┬──────┬───────────┬─────────────────────────────────────┐
│    PIN    ┆  QS  ┆  SEC  ┆  TWP  ┆  RGE ┆  MER ┆    AREA   ┆             wkb_geometry            │
│    utf8   ┆ utf8 ┆ int32 ┆ int32 ┆ utf8 ┆ utf8 ┆  float64  ┆               geometry              │
╞═══════════╪══════╪═══════╪═══════╪══════╪══════╪═══════════╪═════════════════════════════════════╡
│ 094180241 ┆ NW   ┆    36 ┆   123 ┆ 10   ┆ E1   ┆ 64.925287 ┆ POLYGON((682797.48 6625603.65,6827… │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 094180252 ┆ NW   ┆    35 ┆   123 ┆ 10   ┆ E1   ┆ 64.923891 ┆ POLYGON((681165.35 6625525.0200000… │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 094180263 ┆ NW   ┆    34 ┆   123 ┆ 10   ┆ E1   ┆ 64.923761 ┆ POLYGON((679533.23 6625446.37,6794… │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 094180274 ┆ NW   ┆    33 ┆   123 ┆ 10   ┆ E1   ┆ 64.922019 ┆ POLYGON((677901.13 6625367.71,6778… │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 094180285 ┆ NW   ┆    32 ┆   123 ┆ 10   ┆ E1   ┆ 64.921044 ┆ POLYGON((676269.03 6625289.05,6762… │
└───────────┴──────┴───────┴───────┴──────┴──────┴───────────┴─────────────────────────────────────┘
def mb_shp_batches(columns=None):
    for f in mb_shp:
        with pyogrio.raw.ogr_open_arrow(f, {}, columns=columns) as info:
            _, reader = info
            yield from pa.RecordBatchReader.from_stream(reader)

cols = ["QS", "SEC", "TWP", "RGE", "MER"]
mb_shp_schema = next(mb_shp_batches(cols)).schema
mb_shp_reader = pa.RecordBatchReader.from_batches(mb_shp_schema, mb_shp_batches(cols))
sd.create_data_frame(mb_shp_reader).to_view("mb_shp", overwrite=True)
sd.sql("""
SELECT
    "QS" as quarter,
    "SEC"::TEXT AS section,
    "TWP"::TEXT AS twp,
    "RGE" as range,
    "MER" as meridian,
    ST_SetSRID(wkb_geometry, 26914) as geometry
FROM mb_shp
""").to_parquet("data/mb_shp.parquet")

Finally, let’s make sure it shows up in the right spot (i.e., make sure the CRS of the Parquet file is correct):

lonboard.viz(sd.read_parquet("data/mb_shp.parquet"))

Unfortunately it looks like there’s one shapefile that was incorrectly assigned a CRS. At least we know! I’ll leave that as a battle for a different day.

The end of the story

I’ve made my thousands of dollars on this and the data products/cell coverage since I did this in 2013 have improved such that it’s not really necessary anymore, so I’ll share how I compressed all of this into a few MB because I think it’s cool and I was rather proud of it at the time.

Basically, I exploited the fact that the theoretical Dominion land survey plan was incredibly organized: every “township” (identified by a “meridian” and a “range”) was very close to a rectangle in longitude/latitude space. Each one of those was split into 36 sections, and every one of those 36 sections was split into 4 quarter sections (or 15 legal land description leaf nodes, whose name I forget).

I stored the township rectangles in a SQLite database that I shipped alongside an Android (3.1!) application and computed everything else on the fly. Of course, there were a few places where, in the pitting of the the colonial British desire to split everything into a rectangle against the colonial British desire to avoid getting one’s clothes muddy, the surveyors opted to make irregularly shaped sections to avoid crossing a river/lake/mud pit. I found all the exceptions and stored those explicitly in my magical SQLite database.

Reflections

In this post we went over how to use SedonaDB’s Arrow support to combine tools and stream data from one tool to another, in this case supporting a data ingest pipeline. The streaming here is magic…these files aren’t that big but this technique scales to larger than memory data. It should also be noted that other tools can do this too…DuckDB in particular has great Arrow interop…you stream data from SedonaDB to DuckDB just as you can the other way around.

More SedonaDB posts to come…stay tuned!

Software Engineer & Geoscientist