Apache...Sedona, you say?

One of the most fun parts of being involved in GeoParquet is that I get to hear about all the amazing tools that people are writing to scale geospatial analysis above and beyond what the stack I’m familiar with can handle. GeoPandas (Python) and sf (R) are great tools but the struggle to scale as you approach the available memory on your laptop. On the other hand, Arrow, DuckDB, Polars, DataFusion, and the other new fangled databases struggle to efficiently handle spatial joins (although perhaps by the time I’m done writing this post or by the time you are reading it, DuckDB spatial will have solved this!).

Apache Sedona sits at the intersection of these technologies, joining PostGIS as a go-to tool to reach for as data gets big and you need the streaming capabilities of a database alongside index-assisted spatial-aware joins. Instead of extending PostgreSQL, Sedona extends Spark and has embraced newer/faster file formats like GeoParquet.

If you are familiar with Spark, using Sedona is probably second nature to you. If you are a GeoPandas/sf user and you’ve never ever used Spark (read: me!), you’ll probably need a whole blog post to get you started. This post is my adventure getting started, although it is worth first noting that Sedona has lots of great content here, and this post drew heavily on its GeoPandas tutorial and its documentation on the Docker setup.

Getting started

I did try breifly to pip install apache-sedona, but quickly went for the docker approach when I realized there was a thing about Java on my machine that wasn’t configured corretly. Luckily, Seonda makes this easy!

docker run \
    # This is a rather small configuration, but is more likely to succeed
    # than the default on a normal Docker configuration on a laptop
    -e DRIVER_MEM=2G -e EXECUTOR_MEM=2G \
    # Forward the ports needed for the notebook interface to do its thing
    -p 8888:8888 -p 8080:8080 -p 8081:8081 -p 4040:4040 \
    # Keeps the container from accidentally sticking around after you're done
    --rm \
    # The image we're after!
    apache/sedona:latest

You can run a stock browser notebook by visiting https://127.0.0.1:8888, or you can connect to the server directly from VSCode by choosing “Connect to existing Jupyter server”. I quite like this method because the autocomplete is nicer, and since I don’t have much experience with sedona or PySpark, I will definitely be needing it.

The next step is to make sure you can execute something Sedona-y in the notebook!

from sedona.spark import *

config = SedonaContext.builder().getOrCreate()
sedona = SedonaContext.create(config)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/01 19:28:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/01 19:28:33 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

The next thing I’ll do is download some example data into the container. You could persist this data by mounting a volume when you run the container as well (i.e. docker run -v path/to/local/sample-data:/sample-data ...). Because we’re accessing the container via the notebook interface, we’ll use the ! trick in a Python cell. While we’re at it, we’ll install pyogrio since the stock version of geopandas in the container is old and uses the much slower fiona package for IO.

! pip install pyogrio
! mkdir /sample-data
! curl -L https://github.com/geoarrow/geoarrow-data/releases/download/v0.1.0/ns-water-water_line.fgb.zip > /sample-data/ns-water-water_line.fgb.zip
! curl -L https://github.com/geoarrow/geoarrow-data/releases/download/v0.1.0/ns-water-basin_poly.fgb.zip > /sample-data/ns-water-basin_poly.fgb.zip

These sample files are the Nova Scotia river network and Nova Scotia river basin divisions from the Nova Scotia Geospatial Data Directory (as cleaned for the GeoArrow Sample Data page).

The next step is to load the files and get them into Spark/Sedona! The water_line layer is actually quite large (a few hundred MB), so this takes a while to get into Sedona (presumably getting uploaded into the Spark cluster). For me locally this was about 40 seconds.

import pyogrio

rivers_pandas = pyogrio.read_dataframe("/sample-data/ns-water-water_line.fgb.zip")
basins_pandas = pyogrio.read_dataframe("/sample-data/ns-water-basin_poly.fgb.zip")

rivers = sedona.createDataFrame(rivers_pandas)
basins = sedona.createDataFrame(basins_pandas)

PySpark?

Here, rivers and basins are now PySpark DataFrames. There’s a pandas-ish API and a SQL API, although for me this doesn’t help much since most of my data frame manipulation experience is with R and the tidyverse. I’ll give quick tour of how to do some basic things that you can do once you have a PySpark data frame (mostly for my own benefit!).

The first thing to know about these data frames is that they’re lazy. If you’d like you can call collect() to bring the whole thing in to your Python session, but for the part where you’re still experimenting you probably just want to show() a few rows:

basins.show(5)
+--------+---------+----------+--------------------+--------------------+------------------+----------------+--------------------+
|OBJECTID|FEAT_CODE|BASIN_NAME|               RIVER|                 HID|        SHAPE_AREA|       SHAPE_LEN|            geometry|
+--------+---------+----------+--------------------+--------------------+------------------+----------------+--------------------+
|       2|   WABA40|   01EC000|ROSEWAY/SABLE/JORDAN|BD2BF4F2DD3441DD9...|1.85634481728983E9| 281285.37639417|MULTIPOLYGON (((3...|
|       3|   WABA40|   01EQ000|      NEW HBR/SALMON|BF024228AD004530A...|4.45876681698229E9|271610.959870275|MULTIPOLYGON (((7...|
|       1|   WABA40|   01EB000|    BARRINGTON/CLYDE|070600BADC3E46F59...|4.52992670427213E9|450208.188931839|MULTIPOLYGON (((2...|
|       5|   WABA40|   01DA000|            METEGHAN|F7A451591ECF4014A...|1.36580883089273E9| 206783.65380403|MULTIPOLYGON (((2...|
|       6|   WABA40|   01EL000|             TANGIER|D5D6AB000BFE4A6A9...|2.36304937003734E9|263862.127520653|MULTIPOLYGON (((5...|
+--------+---------+----------+--------------------+--------------------+------------------+----------------+--------------------+
only showing top 5 rows

You can select() to get fewer columns:

basins.select("RIVER", "geometry").show(5)
+--------------------+--------------------+
|               RIVER|            geometry|
+--------------------+--------------------+
|ROSEWAY/SABLE/JORDAN|MULTIPOLYGON (((3...|
|      NEW HBR/SALMON|MULTIPOLYGON (((7...|
|    BARRINGTON/CLYDE|MULTIPOLYGON (((2...|
|            METEGHAN|MULTIPOLYGON (((2...|
|             TANGIER|MULTIPOLYGON (((5...|
+--------------------+--------------------+
only showing top 5 rows

…and you can rename them:

basins.select(basins.RIVER.alias("river"), "geometry").show(5)
+--------------------+--------------------+
|               river|            geometry|
+--------------------+--------------------+
|ROSEWAY/SABLE/JORDAN|MULTIPOLYGON (((3...|
|      NEW HBR/SALMON|MULTIPOLYGON (((7...|
|    BARRINGTON/CLYDE|MULTIPOLYGON (((2...|
|            METEGHAN|MULTIPOLYGON (((2...|
|             TANGIER|MULTIPOLYGON (((5...|
+--------------------+--------------------+
only showing top 5 rows

To show more information about the columns that you’re dealing with, you can specifically print the schema:

basins.printSchema()
root
 |-- OBJECTID: long (nullable = true)
 |-- FEAT_CODE: string (nullable = true)
 |-- BASIN_NAME: string (nullable = true)
 |-- RIVER: string (nullable = true)
 |-- HID: string (nullable = true)
 |-- SHAPE_AREA: double (nullable = true)
 |-- SHAPE_LEN: double (nullable = true)
 |-- geometry: geometry (nullable = true)

..and you can select fewer rows with filter():

basins.filter(basins.RIVER == "ANNAPOLIS").show(5)
+--------+---------+----------+---------+--------------------+-----------------+----------------+--------------------+
|OBJECTID|FEAT_CODE|BASIN_NAME|    RIVER|                 HID|       SHAPE_AREA|       SHAPE_LEN|            geometry|
+--------+---------+----------+---------+--------------------+-----------------+----------------+--------------------+
|      43|   WABA40|   01DC000|ANNAPOLIS|734C69BF41A3456D8...|5.0609438683547E9|431466.630572249|MULTIPOLYGON (((3...|
+--------+---------+----------+---------+--------------------+-----------------+----------------+--------------------+

To do something more complex, you have to use some compute functions from pyspark.sql.functions:

from pyspark.sql.functions import lower

basins.select(lower(basins.RIVER).alias("river")).show(5)
+--------------------+
|               river|
+--------------------+
|roseway/sable/jordan|
|      new hbr/salmon|
|    barrington/clyde|
|            meteghan|
|             tangier|
+--------------------+
only showing top 5 rows

…and, crucially for this post, it can do a join:

from pyspark.sql import Row

name_lookup = sedona.createDataFrame([
    Row(RIVER="ANNAPOLIS", not_shouting_name = "Annapolis")
])
basins.\
    join(name_lookup, on=["RIVER"], how="inner").\
    select("RIVER", "not_shouting_name").\
    show()
+---------+-----------------+
|    RIVER|not_shouting_name|
+---------+-----------------+
|ANNAPOLIS|        Annapolis|
+---------+-----------------+

If you already know SQL you can do these things directly from SQL as well:

basins.createOrReplaceTempView("basins")
name_lookup.createOrReplaceTempView("name_lookup")
sedona.sql("""
    SELECT lower(basins.RIVER) as river, not_shouting_name
    FROM basins INNER JOIN name_lookup ON basins.RIVER = name_lookup.RIVER
""").show()
+---------+-----------------+
|    river|not_shouting_name|
+---------+-----------------+
|annapolis|        Annapolis|
+---------+-----------------+

It’s important to remember that all of these things are lazy and only get chained together when the result is requested. This means you can nicely separate your steps if it makes your code cleaner. For many complex pipelines this is a must, particularly in Python (which hasn’t found a nice way to do R’s Pipe operator yet).

from pyspark.sql.functions import col

basins_with_less_shouting = basins.select(basins.RIVER).alias("river")
basins_with_even_less_shouting = basins.select(lower(col("river")).alias("river"))
basins_with_even_less_shouting.show(5)
+--------------------+
|               river|
+--------------------+
|roseway/sable/jordan|
|      new hbr/salmon|
|    barrington/clyde|
|            meteghan|
|             tangier|
+--------------------+
only showing top 5 rows

I thought your post was about Sedona

Yes, yes, but if you’re coming from R or GeoPandas land (like me!) and you want to scale up your workflow, those are all things that you needed to know first!

Sedona fits in to all of this by adding the geometry type and all the functions you need to run your spatial workflows, which don’t ship with Spark (which is running the show from Java on the backend) or pyspark (which is running the show from your notebook).

For example, maybe you only want to take a look at river segments longer than 100 meters:

from sedona.sql.st_functions import ST_Length

rivers\
    .select(ST_Length(col("geometry")).alias("length"), "geometry")\
    .filter(col("length") > 100)\
    .show(5)
24/11/01 21:12:04 WARN TaskSetManager: Stage 64 contains a task of very large size (69889 KiB). The maximum recommended task size is 1000 KiB.
[Stage 64:>                                                         (0 + 1) / 1]

+------------------+--------------------+
|            length|            geometry|
+------------------+--------------------+
|280.01913933401755|MULTILINESTRING (...|
|185.33062465065956|MULTILINESTRING (...|
|178.54414343193926|MULTILINESTRING (...|
| 1779.161974904653|MULTILINESTRING (...|
| 469.5825210527006|MULTILINESTRING (...|
+------------------+--------------------+
only showing top 5 rows



24/11/01 21:12:08 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 64 (TID 146): Attempting to kill Python Worker

Or, maybe you want to do that in SQL:

rivers.createOrReplaceTempView("rivers")
sedona.sql("""
    SELECT ST_Length(geometry) as length, geometry
    FROM rivers WHERE ST_Length(geometry) > 100.0
""").show(5)
24/11/02 02:23:17 WARN TaskSetManager: Stage 66 contains a task of very large size (69889 KiB). The maximum recommended task size is 1000 KiB.
[Stage 66:>                                                         (0 + 1) / 1]

+------------------+--------------------+
|            length|            geometry|
+------------------+--------------------+
|280.01913933401755|MULTILINESTRING (...|
|185.33062465065956|MULTILINESTRING (...|
|178.54414343193926|MULTILINESTRING (...|
| 1779.161974904653|MULTILINESTRING (...|
| 469.5825210527006|MULTILINESTRING (...|
+------------------+--------------------+
only showing top 5 rows



24/11/02 02:23:21 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 66 (TID 148): Attempting to kill Python Worker

…and, of course, the spatial join, where Sedona really shines. The example I have here fits in memory easily, but as soon as you approach that limit you have to reach for tools designed to split up, stream, and spill to disk. Most databases can do that even for spatial data; however, Sedona has the ability to optimize joins using a spatial index (PostGIS can also do both these things, but Sedona has the ability to leverage Spark’s resources to scale more readily!).

sedona.sql("""
    SELECT basins.RIVER, rivers.geometry
    FROM basins, rivers
    WHERE ST_Contains(basins.geometry, rivers.geometry)
""").show(5)

In full transparency I couldn’t get this join to actually complete; however, I think this is a consequence of me being very bad at Spark and not necessarily a failure of Sedona to do its thing as the data scales beyond one’s laptop. Because this all fits in memory nicely, we can use geopandas, which takes 30 seconds or so to assign each river segment a basin name.

import geopandas

geopandas.GeoDataFrame(rivers_pandas.filter(["geometry"]))\
    .sjoin(geopandas.GeoDataFrame(basins_pandas.filter(["RIVER", "geometry"])))\
    .head(5)
[Stage 69:>   (0 + 6) / 8][Stage 74:>   (1 + 5) / 8][Stage 79:>   (0 + 4) / 8]

geometry index_right RIVER
0 MULTILINESTRING ((328482.171 4837061.782, 3284... 31 MERSEY
1 MULTILINESTRING ((346529.973 4850302.284, 3465... 31 MERSEY
2 MULTILINESTRING ((346689.673 4850233.384, 3466... 31 MERSEY
3 MULTILINESTRING ((328730.671 4838192.482, 3287... 31 MERSEY
4 MULTILINESTRING ((328454.671 4835593.981, 3284... 31 MERSEY
[Stage 69:>   (0 + 6) / 8][Stage 74:>   (1 + 5) / 8][Stage 79:>   (0 + 4) / 8]

Final thoughts

If you’re using Spark and know some things about how to configure and share data across nodes in a cluster, using and optimizing Sedona will be very familiar to you. If you’re a GeoPandas user looking to scale up, there will be a learning curve!

Software Engineer & Geoscientist