Apache...Sedona, you say?

One of the most fun parts of being involved in GeoParquet is that I get to hear about all the amazing tools that people are writing to scale geospatial analysis above and beyond what the stack I’m familiar with can handle. GeoPandas (Python) and sf (R) are great tools but the struggle to scale as you approach the available memory on your laptop. On the other hand, Arrow, DuckDB, Polars, DataFusion, and the other new fangled databases struggle to efficiently handle spatial joins (although perhaps by the time I’m done writing this post or by the time you are reading it, this will have changed!).

Apache Sedona sits at the intersection of these technologies, joining PostGIS as a go-to tool to reach for as data gets big and you need the streaming capabilities of a database alongside index-assisted spatial-aware joins. Instead of extending PostgreSQL, Sedona extends Spark and has embraced newer/faster file formats like GeoParquet.

If you are familiar with Spark, using Sedona is probably second nature to you. If you are a GeoPandas/sf user and you’ve never ever used Spark (read: me!), you’ll probably need a whole blog post to get you started. This post is my adventure getting started, although it is worth first noting that Sedona has lots of great content here, and this post drew heavily on its GeoPandas tutorial and its documentation on the Docker setup.

Getting started

Managing Java and/or PySpark versions locally can be tricky and I tried and failed miserably at this the first time; however, Matthew Powers introduced me to SDKMAN! which makes the process considerably easier. I’ll give the SDKMAN! instructions but if you know things about Java already, this is basically a way to ensure that JAVA_HOME is set everywhere and makes it easier to install the right version. If at any point this doesn’t work (or you don’t feel like messing with your existing Java installation) just skip to the docker run command below.

From a terminal (after following the SDKMAN! install instructions):

# If you have strong opinions about these versions, feel free to use them!
# Anything from Java 8 up to Java 17 should work.
sdk install java 17.0.13-zulu

After running this you should see:

java -version
#> openjdk version "17.0.13" 2024-10-15 LTS
#> OpenJDK Runtime Environment Zulu17.54+21-CA (build 17.0.13+11-LTS)
#> OpenJDK 64-Bit Server VM Zulu17.54+21-CA (build 17.0.13+11-LTS, mixed mode, sharing)
echo $JAVA_HOME
#> /Users/dewey/.sdkman/candidates/java/current

Then you can install the Python packages:

pip install "apache-sedona[spark]" geopandas setuptools

Then, you can set up the session:

from sedona.spark import SedonaContext

config = (
    SedonaContext.builder()
    .config(
        "spark.jars.packages",
        "org.apache.sedona:sedona-spark-3.5_2.12:1.6.1,"
        "org.datasyslab:geotools-wrapper:1.7.0-28.5",
    )
    .config(
        "spark.jars.repositories",
        "https://artifacts.unidata.ucar.edu/repository/unidata-all",
    )
    .config("spark.executor.memory", "6G")
    .config("spark.driver.memory", "6G")
    .config("spark.sql.shuffle.partitions", "2")
    .config("spark.driver.host", "localhost")
    .getOrCreate()
)

sedona = SedonaContext.create(config)

A brief note that on my computer I needed to remove the SPARK_HOME environment variable with del os.environ["SPARK_HOME"] before running the above, since SDKMAN! “helpfully” sets SPARK_HOME if you’ve tried to install spark using it before, which I had. Something else that worked for me before I settled on letting SDKMAN! manage my Java was setting os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17" (or a path to another Java 8, 11, or 17 installation).

If any of the above didn’t work, you can use the Docker image (this is what I did the first time when I realized there was a thing about Java on my machine that wasn’t configured corretly and I had no idea how to fix it). Sedona provides an image to get this set up quickly:

docker run \
    # You can set this higher or lower to give workers access to more memory.
    # Make sure this is within the limits you've set globally for Docker.
    -e DRIVER_MEM=4G -e EXECUTOR_MEM=4G \
    # Forward the ports needed for the notebook interface to do its thing
    -p 8888:8888 -p 8080:8080 -p 8081:8081 -p 4040:4040 \
    # Keeps the container from accidentally sticking around after you're done
    --rm \
    # The image we're after!
    apache/sedona:latest

You can run a stock browser notebook by visiting http://127.0.0.1:8888, or you can connect to the server directly from VSCode by choosing “Connect to existing Jupyter server”. I quite like this method because the autocomplete is nicer, and since I don’t have much experience with Sedona or PySpark, I will definitely be needing it.

Then you can set up the session. This is slightly different than for the localhost version because the Docker image sets some of these configuration options for you.

from sedona.spark import SedonaContext

config = SedonaContext.builder().getOrCreate()
sedona = SedonaContext.create(config)

The next thing I’ll do is load some example data. These sample files are the Nova Scotia river network and Nova Scotia river basin divisions from the Nova Scotia Geospatial Data Directory (as cleaned for the GeoArrow Sample Data page). I’m loading them directly using a URL directly from geopandas; however, you could also download them using curl and unzip them to make the load faster.

The next step is to load the files and get them into Spark/Sedona! The water_line layer is actually quite large (a few hundred MB), so this takes a hot second to get into Sedona (presumably getting uploaded into the Spark cluster). For me locally this was about 20 seconds. Sedona can read some types of files (notably: GeoParquet) directly such that this step can usually be avoided. Still, for playing around with data, nothing beats geopandas.read_file() for support.

import geopandas

rivers_pandas = geopandas.read_file(
    "https://github.com/geoarrow/geoarrow-data/releases/download/v0.1.0/ns-water-water_line.fgb.zip"
)
basins_pandas = geopandas.read_file(
    "https://github.com/geoarrow/geoarrow-data/releases/download/v0.1.0/ns-water-basin_poly.fgb.zip"
)
rivers = sedona.createDataFrame(rivers_pandas)
basins = sedona.createDataFrame(basins_pandas)

PySpark?

Here, rivers and basins are now PySpark DataFrames. There’s a pandas-ish API and a SQL API, although for me this doesn’t help much since most of my data frame manipulation experience is with R and the tidyverse. I’ll give quick tour of how to do some basic things that you can do once you have a PySpark data frame (mostly for my own benefit!).

The first thing to know about these data frames is that they’re lazy. If you’d like you can call collect() to bring the whole thing in to your Python session, but for the part where you’re still experimenting you probably just want to show() a few rows:

basins.show(5)
+--------+---------+----------+--------------------+--------------------+------------------+----------------+--------------------+
|OBJECTID|FEAT_CODE|BASIN_NAME|               RIVER|                 HID|        SHAPE_AREA|       SHAPE_LEN|            geometry|
+--------+---------+----------+--------------------+--------------------+------------------+----------------+--------------------+
|       2|   WABA40|   01EC000|ROSEWAY/SABLE/JORDAN|BD2BF4F2DD3441DD9...|1.85634481728983E9| 281285.37639417|MULTIPOLYGON (((3...|
|       3|   WABA40|   01EQ000|      NEW HBR/SALMON|BF024228AD004530A...|4.45876681698229E9|271610.959870275|MULTIPOLYGON (((7...|
|       1|   WABA40|   01EB000|    BARRINGTON/CLYDE|070600BADC3E46F59...|4.52992670427213E9|450208.188931839|MULTIPOLYGON (((2...|
|       5|   WABA40|   01DA000|            METEGHAN|F7A451591ECF4014A...|1.36580883089273E9| 206783.65380403|MULTIPOLYGON (((2...|
|       6|   WABA40|   01EL000|             TANGIER|D5D6AB000BFE4A6A9...|2.36304937003734E9|263862.127520653|MULTIPOLYGON (((5...|
+--------+---------+----------+--------------------+--------------------+------------------+----------------+--------------------+
only showing top 5 rows

You can select() to get fewer columns:

basins.select("RIVER", "geometry").show(5)
+--------------------+--------------------+
|               RIVER|            geometry|
+--------------------+--------------------+
|ROSEWAY/SABLE/JORDAN|MULTIPOLYGON (((3...|
|      NEW HBR/SALMON|MULTIPOLYGON (((7...|
|    BARRINGTON/CLYDE|MULTIPOLYGON (((2...|
|            METEGHAN|MULTIPOLYGON (((2...|
|             TANGIER|MULTIPOLYGON (((5...|
+--------------------+--------------------+
only showing top 5 rows

…and you can rename them:

basins.select(basins.RIVER.alias("river"), "geometry").show(5)
+--------------------+--------------------+
|               river|            geometry|
+--------------------+--------------------+
|ROSEWAY/SABLE/JORDAN|MULTIPOLYGON (((3...|
|      NEW HBR/SALMON|MULTIPOLYGON (((7...|
|    BARRINGTON/CLYDE|MULTIPOLYGON (((2...|
|            METEGHAN|MULTIPOLYGON (((2...|
|             TANGIER|MULTIPOLYGON (((5...|
+--------------------+--------------------+
only showing top 5 rows

To show more information about the columns that you’re dealing with, you can specifically print the schema:

basins.printSchema()
root
 |-- OBJECTID: long (nullable = true)
 |-- FEAT_CODE: string (nullable = true)
 |-- BASIN_NAME: string (nullable = true)
 |-- RIVER: string (nullable = true)
 |-- HID: string (nullable = true)
 |-- SHAPE_AREA: double (nullable = true)
 |-- SHAPE_LEN: double (nullable = true)
 |-- geometry: geometry (nullable = true)

..and you can select fewer rows with filter():

basins.filter(basins.RIVER == "ANNAPOLIS").show(5)
+--------+---------+----------+---------+--------------------+-----------------+----------------+--------------------+
|OBJECTID|FEAT_CODE|BASIN_NAME|    RIVER|                 HID|       SHAPE_AREA|       SHAPE_LEN|            geometry|
+--------+---------+----------+---------+--------------------+-----------------+----------------+--------------------+
|      43|   WABA40|   01DC000|ANNAPOLIS|734C69BF41A3456D8...|5.0609438683547E9|431466.630572249|MULTIPOLYGON (((3...|
+--------+---------+----------+---------+--------------------+-----------------+----------------+--------------------+

To do something more complex, you have to use some compute functions from pyspark.sql.functions:

from pyspark.sql.functions import lower

basins.select(lower(basins.RIVER).alias("river")).show(5)
+--------------------+
|               river|
+--------------------+
|roseway/sable/jordan|
|      new hbr/salmon|
|    barrington/clyde|
|            meteghan|
|             tangier|
+--------------------+
only showing top 5 rows

…and, crucially for this post, it can do a join:

from pyspark.sql import Row

name_lookup = sedona.createDataFrame([
    Row(RIVER="ANNAPOLIS", not_shouting_name = "Annapolis")
])
basins.\
    join(name_lookup, on=["RIVER"], how="inner").\
    select("RIVER", "not_shouting_name").\
    show()
+---------+-----------------+
|    RIVER|not_shouting_name|
+---------+-----------------+
|ANNAPOLIS|        Annapolis|
+---------+-----------------+

If you already know SQL you can do these things directly from SQL as well:

basins.createOrReplaceTempView("basins")
name_lookup.createOrReplaceTempView("name_lookup")
sedona.sql("""
    SELECT lower(basins.RIVER) as river, not_shouting_name
    FROM basins INNER JOIN name_lookup ON basins.RIVER = name_lookup.RIVER
""").show()
+---------+-----------------+
|    river|not_shouting_name|
+---------+-----------------+
|annapolis|        Annapolis|
+---------+-----------------+

It’s important to remember that all of these things are lazy and only get chained together when the result is requested. This means you can nicely separate your steps if it makes your code cleaner. For many complex pipelines this is a must, particularly in Python (which hasn’t found a nice way to do R’s Pipe operator yet).

from pyspark.sql.functions import col

basins_with_less_shouting = basins.select(basins.RIVER).alias("river")
basins_with_even_less_shouting = basins.select(lower(col("river")).alias("river"))
basins_with_even_less_shouting.show(5)
+--------------------+
|               river|
+--------------------+
|roseway/sable/jordan|
|      new hbr/salmon|
|    barrington/clyde|
|            meteghan|
|             tangier|
+--------------------+
only showing top 5 rows

I thought your post was about Sedona

Yes, yes, but if you’re coming from R or GeoPandas land (like me!) and you want to scale up your workflow, those are all things that you needed to know first!

Sedona fits in to all of this by adding the geometry type and all the functions you need to run your spatial workflows, which don’t ship with Spark (which is running the show from Java on the backend) or pyspark (which is running the show from your notebook).

For example, maybe you only want to take a look at river segments longer than 100 meters:

from sedona.sql.st_functions import ST_Length

rivers\
    .select(ST_Length(col("geometry")).alias("length"), "geometry")\
    .filter(col("length") > 100)\
    .show(5)
25/01/08 16:51:36 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/01/08 16:51:36 WARN TaskSetManager: Stage 25 contains a task of very large size (46920 KiB). The maximum recommended task size is 1000 KiB.
[Stage 25:>                                                         (0 + 1) / 1]

+------------------+--------------------+
|            length|            geometry|
+------------------+--------------------+
|280.01913933401755|MULTILINESTRING (...|
|185.33062465065956|MULTILINESTRING (...|
|178.54414343193926|MULTILINESTRING (...|
| 1779.161974904653|MULTILINESTRING (...|
| 469.5825210527006|MULTILINESTRING (...|
+------------------+--------------------+
only showing top 5 rows



25/01/08 16:51:40 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 25 (TID 69): Attempting to kill Python Worker

Or, maybe you want to do that in SQL:

rivers.createOrReplaceTempView("rivers")
sedona.sql("""
    SELECT ST_Length(geometry) as length, geometry
    FROM rivers WHERE ST_Length(geometry) > 100.0
""").show(5)
25/01/08 16:51:45 WARN TaskSetManager: Stage 26 contains a task of very large size (46920 KiB). The maximum recommended task size is 1000 KiB.
[Stage 26:>                                                         (0 + 1) / 1]

+------------------+--------------------+
|            length|            geometry|
+------------------+--------------------+
|280.01913933401755|MULTILINESTRING (...|
|185.33062465065956|MULTILINESTRING (...|
|178.54414343193926|MULTILINESTRING (...|
| 1779.161974904653|MULTILINESTRING (...|
| 469.5825210527006|MULTILINESTRING (...|
+------------------+--------------------+
only showing top 5 rows

…and, of course, the spatial join, where Sedona really shines. The example I have here fits in memory easily, but as soon as you approach that limit you have to reach for tools designed to split up, stream, and spill to disk. Most databases can do that even for spatial data; however, Sedona has the ability to optimize joins using a spatial index (PostGIS can also do both these things, but Sedona has the ability to leverage Spark’s resources to scale more readily!).

sedona.sql("""
    SELECT basins.RIVER, rivers.geometry
    FROM basins, rivers
    WHERE ST_Contains(basins.geometry, rivers.geometry)
""").collect()

Because this all fits in memory nicely, we can use geopandas, which about the same amount of time to assign each river segment a basin name:

import geopandas

geopandas.GeoDataFrame(rivers_pandas.filter(["geometry"]))\
    .sjoin(geopandas.GeoDataFrame(basins_pandas.filter(["RIVER", "geometry"])))\
    .head(5)

geometry index_right RIVER
0 MULTILINESTRING ((328482.171 4837061.782, 3284... 31 MERSEY
1 MULTILINESTRING ((346529.973 4850302.284, 3465... 31 MERSEY
2 MULTILINESTRING ((346689.673 4850233.384, 3466... 31 MERSEY
3 MULTILINESTRING ((328730.671 4838192.482, 3287... 31 MERSEY
4 MULTILINESTRING ((328454.671 4835593.981, 3284... 31 MERSEY

Final thoughts

If you’re using Spark and know some things about how to configure and share data across nodes in a cluster, using and optimizing Sedona will be very familiar to you. If you’re a GeoPandas user looking to scale up, there will be a learning curve! My personal take is that if you want to scale a workflow, converting GeoPandas code to PySpark is a bit easier than converting it to SQL, but Sedona lets you do either and lets you take that workflow to a managed solution like Databricks, Wherobots, or Amazon EMR when data volume starts to increase beyond the capabilities of your local system.

Software Engineer & Geoscientist