Profiling point-in-polygon joins in R

Profiling point-in-polygon joins in R

Inspired by Martin Fleischmann’s post on spaital joins using Dask and geopandas, I thought I’d give it a go in R! I’ve been playing with bigish point datasets for a while and this particular bit of profiling seemed like a good opportunity to illustrate a few things to think about when dealing with them in the r-spatial universe.

Selfishly, I also want to see if I can make R beat the Dask-geopandas benchmark of 17 seconds (to do a spatial join between 9 ish million points and 158 polygons).

I’ll use my usual bag of tools: the tidyverse, sf, and the s2 and geos packages (because we’ll need to do some lower-level stuff than is needed for most spatial operations).

The data for this operation is a bunch of parking violations (as a 1 GB csv) and a bunch of neighbourhoods as defined by a small ish shapefile available from the city of Philedelphia. What we’re up against here is about 9 million rows on the csv and 158 polygons in the shapefile. The query is a common spatial query: join the polygons on the points so that we know the neighbourhood in which each violation occurred.

curl::curl_download(
  "https://phl.carto.com/api/v2/sql?filename=parking_violations&format=csv&skipfields=cartodb_id,the_geom,the_geom_webmercator&q=SELECT%20*%20FROM%20parking_violations%20WHERE%20issue_datetime%20%3E=%20%272012-01-01%27%20AND%20issue_datetime%20%3C%20%272017-12-31%27",
  "phl_parking.csv"
)

curl::curl_download(
  "https://github.com/azavea/geo-data/raw/master/Neighborhoods_Philadelphia/Neighborhoods_Philadelphia.zip",
  "Neighborhoods_Philadelphia.zip"
)

unzip("Neighborhoods_Philadelphia.zip", exdir = ".")
read_csv("phl_parking.csv", lazy = TRUE)
#> Rows: 9412119 Columns: 13
#> ── Column specification ────────────────────────────────────────────────────────
#> Delimiter: ","
#> chr  (5): state, division, location, violation_desc, issuing_agency
#> dbl  (6): anon_ticket_number, anon_plate_id, fine, lat, lon, zip_code
#> lgl  (1): gps
#> dttm (1): issue_datetime
#> 
#>  Use `spec()` to retrieve the full column specification for this data.
#>  Specify the column types or set `show_col_types = FALSE` to quiet this message.
#> # A tibble: 9,412,119 × 13
#>    anon_ticket_number issue_datetime      state anon_plate_id division location 
#>                 <dbl> <dttm>              <chr>         <dbl> <chr>    <chr>    
#>  1            1777797 2013-08-22 12:36:00 PA           878255 0001     212 N 12…
#>  2            1777798 2013-09-23 21:52:00 PA           878255 0001     900 RACE…
#>  3            1777799 2013-11-23 12:25:00 NJ           328900 0001     5448 GER…
#>  4            1777800 2013-01-18 22:56:00 NJ           244961 NA       400 SPRI…
#>  5            1777801 2013-05-17 12:11:00 PA           665834 0001     1000 FIL…
#>  6            1777802 2013-10-15 12:10:00 PA           847624 NA       1400 CAL…
#>  7            1777803 2013-02-06 12:44:00 NJ           201017 0001     130 ARCH…
#>  8            1777804 2013-09-28 08:48:00 NJ           201017 0001     1130 N 4…
#>  9            1777805 2013-10-03 07:09:00 NJ           201017 0001     1618 S B…
#> 10            1777806 2013-11-24 16:52:00 NJ           201017 0001     100 S 4T…
#> # … with 9,412,109 more rows, and 7 more variables: violation_desc <chr>,
#> #   fine <dbl>, issuing_agency <chr>, lat <dbl>, lon <dbl>, gps <lgl>,
#> #   zip_code <dbl>
read_sf("Neighborhoods_Philadelphia.shp")
#> Simple feature collection with 158 features and 5 fields
#> Geometry type: POLYGON
#> Dimension:     XY
#> Bounding box:  xmin: 2660586 ymin: 204650.6 xmax: 2750109 ymax: 304965.3
#> Projected CRS: NAD83 / Pennsylvania South (ftUS)
#> # A tibble: 158 × 6
#>    NAME   LISTNAME  MAPNAME  Shape_Leng Shape_Area                      geometry
#>    <chr>  <chr>     <chr>         <dbl>      <dbl>    <POLYGON [US_survey_foot]>
#>  1 BRIDE… Bridesbu… Bridesb…     27815.  44586264. ((2719790 256235.5, 2719815 …
#>  2 BUSTL… Bustleton Bustlet…     48868. 114050424. ((2733378 289259.9, 2732819 …
#>  3 CEDAR… Cedarbro… Cedarbr…     20021.  24871745. ((2685268 279747.3, 2685272 …
#>  4 CHEST… Chestnut… Chestnu…     56394.  79664975. ((2678490 284400.4, 2678519 …
#>  5 EAST_… East Fal… East Fa…     27401.  40576888. ((2686770 263625.4, 2686921 …
#>  6 MOUNT… Mount Ai… East Mo…     28846.  43152470. ((2687707 269073.7, 2687685 …
#>  7 GRAYS… Grays Fe… Grays F…     25522.  31311635. ((2684944 224379.1, 2685191 …
#>  8 OLNEY  Olney     Olney        32197.  50308403. ((2703709 270202.1, 2703936 …
#>  9 PENNY… Pennypac… Pennypa…     87084.  60140756. ((2722036 286198.8, 2721979 …
#> 10 SOMER… Somerton  Somerton     55055. 129254597. ((2737777 301540, 2737958 30…
#> # … with 148 more rows

Benchmarking the pieces

The first thing that’s probably going to be time-consuming is reading a 1 GB .csv file. readr::read_csv() is pretty good at this and uses multiple cores to do things like parse numbers. From some experience working with other data sets, I’ve observed that materializing massive string columns (i.e., huge character() vectors) is expensive, which is why readr::read_csv() can use lazy = TRUE. When exploring a data set it’s pretty useful (like I did above); when you happen to know you’re going to use all of the rows, it’s better to just materialize everything up front. If you’re careful to only read in non-string columns, you can avoid the string problem completely.

read_violations <- function(num_threads = readr_threads()) {
  read_csv(
    "phl_parking.csv",
    col_types = cols(
      anon_ticket_number = col_double(),
      lon = col_double(),
      lat = col_double(),
      .default = col_skip()
    ),
    num_threads = num_threads
  )
}

read_neighbourhoods <- function() {
  read_sf("Neighborhoods_Philadelphia.shp") %>% 
  select(NAME) %>% 
  st_transform("OGC:CRS84")
}

Let’s see what the read times are like!

bench::mark(read_violations(),  iterations = 5)
#> # A tibble: 1 × 6
#>   expression             min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>        <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 read_violations()    1.46s    1.46s     0.684     215MB     3.42
bench::mark(read_neighbourhoods(), iterations = 5)
#> # A tibble: 1 × 6
#>   expression                 min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>            <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 read_neighbourhoods()   13.7ms   13.9ms      69.8    2.09MB     17.4

It looks like the 1 GB csv can be read in about 1.5 seconds as long as we’re careful to not load any string columns; the neighbourhood shapefile is pretty much instantaneous to read. For the purposes of bechmarking the other steps in isolation, I’ll make a cached version of both.

cached_violations <- read_violations()
cached_neighbourhoods <- read_neighbourhoods()

Once we’re here, it’s worth noting that the built-in way to do the big-picture operation in sf would be:

cached_violations_sf <- cached_violations %>% 
  st_as_sf(coords = c("lon", "lat"), crs = "OGC:CRS84")

st_join(cached_violations_sf, cached_neighbourhoods, join = st_within)

I had to cancel this one after a minute or so (since the point was to see if I could do it faster than 17 seconds), but it’s likely that there’s a way to do this in sf that I don’t happen to know about that avoids the some overhead when working with many points.

Under the hood, sf is calling into GEOS or s2 to generate a set of “join keys”, or the indices on y that correspond to x according to a specific predicate. We can access that directly via the ‘geos’ and ‘s2’ packages. For both, there’s a cost of converting the xy list into a GEOS or s2 geometry, a cost of building the index, and a cost of querying that index to generate the join keys.

system.time(
  violations_geos <- 
    geos_read_xy(cached_violations[c("lon", "lat")])
)
#>    user  system elapsed 
#>   4.241   0.327   4.582

system.time(
  neighbourhoods_geos <- as_geos_geometry(cached_neighbourhoods)
)
#>    user  system elapsed 
#>   0.001   0.000   0.002

# avoid CRS mismatch errors below
wk::wk_crs(neighbourhoods_geos) <- NULL

system.time(
  tree <- geos_strtree(violations_geos)
)
#>    user  system elapsed 
#>   0.443   0.346   0.997

system.time(
  keys <- geos_contains_matrix(neighbourhoods_geos, tree)
)
#>    user  system elapsed 
#>   7.386   0.061   7.448

You’ll notice that I used “contains” here, because I guessed off the bat that an index on 9 million points would be more useful than an index on 150 polygons (and ‘contains’ is the inverse operation to ‘within’). I don’t actually know that to be the case, though, so it’s worth checking the inverse of that operation:

system.time(
  tree_neighbourhoods <- geos_strtree(neighbourhoods_geos)
)
#>    user  system elapsed 
#>       0       0       0

system.time(
  keys2 <- geos_within_matrix(violations_geos, tree_neighbourhoods)
)
#>    user  system elapsed 
#> 135.727   0.931 137.040

In this case, it’s a pretty massive difference! I’m not 100% sure where all that difference comes from, but it does make sense that an index on a massive amount of points will be more efficiently re-used than one on a few polygons.

Finally, we need to materialize the joined version. Here the “join keys” are a list() of integers. Basically, for every neighbourhood, the indices from violations that are a match. We have to roll our own joiner here but it’s not too bad! We need to repeat the neighbourhoods the right number of times, then use the keys to select the violations. In theory those two vectrs should be aligned, so we can cbind() them together.

system.time({
  neighbourhoods_out <- vctrs::vec_rep_each(
    cached_neighbourhoods$NAME,
    lengths(keys)
  )
  
  violations_out <- cached_violations[unlist(keys), ]
  
  joined <- vctrs::vec_cbind(violations_out, NAME = neighbourhoods_out)
})
#>    user  system elapsed 
#>   0.242   0.128   0.412

At this point we need to make sure we didn’t cross our wires. Are they assigned correctly?

cached_neighbourhoods %>% 
  filter(NAME == "BRIDESBURG") %>% 
  .$geometry %>% 
  plot()

joined %>% 
  filter(NAME == "BRIDESBURG") %>% 
  st_as_sf(coords = c("lon", "lat"), crs = "OGC:CRS84") %>% 
  .$geometry %>% 
  plot(add = TRUE)

I mentioned S2 above, and it’s relevant here because we have lon/lat data, which, in theory, s2 is really good at! I’m hoping to fix a few things about S2 and how it deals with points, but as it stands now the cost of creating an “s2 geography” for points is pretty high, and the cost of doing a boolean operation on points is really high. The forthcoming release of S2 will expose some of the s2 cell index bits at a low level; notably, the operation to approximate a polygon with a “covering” of cells and check whether or not an individual cell is within it. It still doesn’t expose the GEOS-like r-tree index, which is mission critical (it probably should!).

The whole game

Our target is 17 seconds, which is what it took dask/geopandas to do this operation on 8 cores (although here we are using all 8 cores for the read operation because readr does this under the hood, but not for the join).

system.time({
  neighbourhoods <- read_neighbourhoods()
  violations <- read_violations()
  
  keys <- geos_contains_matrix(
    wk::wk_set_crs(neighbourhoods, NULL),
    geos_read_xy(violations[c("lon", "lat")])
  )
  
  neighbourhoods_out <- vctrs::vec_rep_each(
    neighbourhoods$NAME,
    lengths(keys)
  )
  
  violations_out <- cached_violations[unlist(keys), ]
  
  joined <- vctrs::vec_cbind(violations_out, NAME = neighbourhoods_out)
})
#>    user  system elapsed 
#>  22.199   7.358  28.526

It’s worth noting that using multiple cores in R is probably not faster in this case…the cost of sending the data between processes that’s built in to the future package is high, and we don’t get the benefit of reusing the point index.

I know that the point of benchmarking using GPUs and PostGIS and Dask and everything isn’t measuring quite the same thing…there’s a very good chance that the point of those benchmarks were specifically trying to measure some of the overheads associated with reading string columns and/or inter-process communication. Still, it’s worth remembering that there are some pain points when dealing with big point datasets…if you know what they are, you can do things very fast (even on a without threading!). Notably

  • Loading string columns is slow! This is true in R and in Python (unless you’re using Arrow, which has a much more efficient way of storing big string vectors)
  • Spatial indexes are your friend! It’s worth checking both join(x, y) and join(y, x) on a subset to see which will go faster. edit: as I should have known all along, everybody else knows this already and ‘within’ joins are automatically swapped to efficiently use the index on x in pretty much every spatial library out there except the one that I wrote…
  • Anything over a million points is really slow when converted to a sf::st_sfc(), so you want to avoid that conversion if possible. If you must, do it using st_as_sf(data.frame(x, y), coords = c("x", "y")), which has been optimized for this case.

Dask/geopandas, revisited

Let’s revisit the dask/geopandas version to see if we can get a handle on what the overhead is like on that side of things! I’ve spent a lot of time thinking about this kind of thing in R, but my Python game is rusty and falls apart quickly when pandas gets involved.

First, we need to install dask-geopandas and its dependencies. My Python package management game is very bad and I haven’t installed conda; however, these two pip commands seemed to do it:

pip3 install geopandas dask distributed pyarrow
pip3 install git+https://github.com/geopandas/dask-geopandas.git

(note pyarrow, who did not appear in the original post but whose purpose will be come clear shortly!)

Now, let’s run the bit of code from Martin’s post!

import geopandas
import dask_geopandas
import dask.dataframe
from dask.distributed import Client, LocalCluster

client = Client(
    LocalCluster(
        n_workers=8, 
        threads_per_worker=1
    )
)

ddf = dask.dataframe.read_csv(
    "phl_parking.csv", 
    blocksize=25e6, 
    assume_missing=True
)

ddf = ddf.set_geometry(
    dask_geopandas.points_from_xy(
        ddf, 
        x="lon", 
        y="lat", 
        crs=4326
    )
)

neigh = geopandas.read_file(
    "Neighborhoods_Philadelphia.shp"
).to_crs(4326)

joined = dask_geopandas.sjoin(ddf, neigh, predicate="within")

r = joined.compute()

This is all new to me, and the idea that you can do a split/apply/combine in parallel with pretty much zero extra code is magic! As advertised, it ran in about 17 seconds. Let’s also give it a plot so that we can compare a few other ways to do this and check our results:

r[r.NAME == "BRIDESBURG"].plot()

Let’s give the single-threaded version a shot, since it seemed to run quickly in R and both geopandas and geos are using the GEOS C API under the hood in pretty much the exact same way. Martin indicated on Twitter that this took about 35 seconds, not including the CSV load. I used pyarrow’s csv reader, which took about 2.5 seconds to load all the columns.

import pyarrow.csv as csv
violations_arrow = csv.read_csv("phl_parking.csv")
violations = violations_arrow.to_pandas()

Creating the geopandas data frame and looks the same as for the dask version:

violations_geo = violations.set_geometry(
    geopandas.points_from_xy(
        x=violations.lon,
        y=violations.lat,
        crs=4326))

This ran for me in about 2 seconds, which is on par with geos_read_xy() in R.

Reading a shapefile also looks the same as in the Dask version:

neighbourhoods = geopandas.read_file(
    "Neighborhoods_Philadelphia.shp"
).to_crs(4326)

Finally, we can do the join. The “within” predicate is automatically switched with “contains” to use the index on violations_geo effectively, as I should have checked when I wrote the first version of this post!

r2 = geopandas.sjoin(violations_geo, neighbourhoods, predicate="within")

This ran for me in about 22 seconds. Again, let’s plot to check:

r2[r2.NAME == "BRIDESBURG"].plot()

Let’s also check the Python version of what I did in R above…basically, use the output from the index operation directly and use it to subset the left and right sides appropriately. First, we do the equivalent of geos_contains_matrix(), which gives us the indexes we need to subset both sides:

ids_neighbourhoods, ids_violations = violations_geo.sindex.query_bulk(
    neighbourhoods.geometry,
    predicate="contains",
    sort=False)

This ran for me in 4 seconds, which about twice as fast as geos_contains_matrix() could do the same thing.

I’m positive there’s a better way to do a column-bind operation here using pd.concat(), but I couldn’t seem to get that code to look any cleaner than the version I knew how to do (which was to subset violations_geo, then add all the non-geometry columns from the expanded neighbourhoods).

violations_geo_joined = violations_geo.take(ids_violations).reset_index()
for name, value in neighbourhoods.take(ids_neighbourhoods).reset_index().items():
    if name == "geometry":
        continue
    violations_geo_joined[name] = value

This ran for me in about 4 seconds. Very important here to check for crossed wires since I’ve rolled my own version of the join:

violations_geo_joined[violations_geo_joined.NAME == "BRIDESBURG"].plot()

I’m too new to geopandas and dask to suggest any “gotchas” for Python and big point data sets, and given how easy it was to find out how to do all of these operations (even the low-level ones!), I suspect there are few!

Avatar
Dewey Dunnington
Geoscientist, Programmer, Educator