0

I'm trying to execute .toPandas() command on an pyspark.sql.dataframe.DataFrame but it throws me an error

pm_table_bej_test.toPandas()
Py4JJavaError                             Traceback (most recent call last)
~\AppData\Local\Temp\ipykernel_25776\2227465720.py in 
----> 1 pm_table_bej_test.toPandas()

c:\Users\user\Anaconda3\envs\python_ds_gis\lib\site-packages\pyspark\sql\pandas\conversion.py in toPandas(self)
    203 
    204         # Below is toPandas without Arrow optimization.
--> 205         pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
    206         column_counter = Counter(self.columns)
    207 

c:\Users\user\Anaconda3\envs\python_ds_gis\lib\site-packages\pyspark\sql\dataframe.py in collect(self)
    815         """
    816         with SCCallSiteSync(self._sc):
--> 817             sock_info = self._jdf.collectToPython()
    818         return list(_load_from_socket(sock_info, BatchedSerializer(CPickleSerializer())))
    819 

c:\Users\user\Anaconda3\envs\python_ds_gis\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   1323 
...
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:657)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    ... 1 more

Current Code

# import dependencies
import pandas as pd
import geopandas as gpd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, when

from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.utils.adapter import Adapter

# Initiate SparkContext
spark = (
    SparkSession
    .builder
    .master("local[*]")
    .appName("Sedona App")
    .config("spark.serializer", KryoSerializer.getName)
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
    .getOrCreate()
)

# import table from csv
pm_table_bej = (
    spark
    .read
    .option("delimiter", ",")
    .option("header", "true")
    .csv(CSV_PATH)
)

# query
pm_table_bej_test = spark.sql(
    """
    SELECT 
        OID_,
        ST_GeomFromText(
            CONCAT(
                'POINT(',
                Lon_of_Observation_Point,
                ' ',
                Lat_of_Observation_Point,
                ')'
            )
        ) AS geometry,
        Local_Time_of_Day_of_Observation_Point,
        Unix_Timestamp_of_Observation_Point,
        from_unixtime(Unix_Timestamp_of_Observation_Point) AS timestamp
    FROM
        pm_table_bej_temp_view
    WHERE
        TRUE
        AND Local_Day_of_Week_of_Observation_Point = "Fri"
    """
)

pm_table_bej_test.printSchema()

pm_table_bej_test.show(5)

Output

root
 |-- OID_: string (nullable = true)
 |-- geometry: geometry (nullable = true)
 |-- Local_Time_of_Day_of_Observation_Point: string (nullable = true)
 |-- Unix_Timestamp_of_Observation_Point: string (nullable = true)
 |-- timestamp: string (nullable = true)

+----+--------------------+--------------------------------------+-----------------------------------+-------------------+
|OID_|            geometry|Local_Time_of_Day_of_Observation_Point|Unix_Timestamp_of_Observation_Point|          timestamp|
+----+--------------------+--------------------------------------+-----------------------------------+-------------------+
|  14|POINT (106.783285...|                              00:39:16|                         1675359556|2023-02-03 00:39:16|
|  16|POINT (106.78329 ...|                              00:39:16|                         1675359556|2023-02-03 00:39:16|
|  21|POINT (106.78349 ...|                              06:01:30|                         1675378890|2023-02-03 06:01:30|
|  22|POINT (106.78349 ...|                              06:01:45|                         1675378905|2023-02-03 06:01:45|
|  23|POINT (106.783517...|                              06:01:08|                         1675378868|2023-02-03 06:01:08|
+----+--------------------+--------------------------------------+-----------------------------------+-------------------+
only showing top 5 rows
Amri Rasyidi
  • 172
  • 1
  • 10
  • My guess tells me there’s an issue converting that geometry field to some pandas type. Maybe create that column different / convert to something pandas knows like a string if that makes sense with your application? – Brandon Sollins Mar 23 '23 at 10:24
  • I tried to select and then convert only the string column, it gave me another error. ```at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3685) at java.base/java.lang.invoke.DirectMethodHandle$Holder.invokeVirtual(DirectMethodHandle$Holder) at java.base/java.lang.invoke.LambdaForm$MH/0x0000000801160400.invoke(LambdaForm$MH)``` – Amri Rasyidi Mar 27 '23 at 03:20

1 Answers1

1

Several catch here:

From GeoPandas to Sedona DF

Spark 3.X has a known type-inference issue when converting GeoPandas DF to Sedona DF in which the data has Pandas NA value. It can be easily fixed by replacing NA value. For example

import pandas as pd,
gdf = gpd.read_file("data/gis_osm_pois_free_1.shp"),
gdf = gdf.replace(pd.NA, '')

From Sedona DF to GeoPandas DF

What is your GeoPandas version and what is your shapely version? (use pip freeze to check)

Sedona does not support Shapely 2.X and GeoPandas 0.12 starts to support Shapely 2.X. But GeoPandas < 0.11 has a bug which automatically installs Shapely 2.X and then crashes both itself and Sedona when converting Sedona DF to GeoPandas DF.

To fix this, either install GeoPandas 0.11.1 or pip uninstall Shapely 2.X and then pip install shapely==1.8.4

  • gpd version: 0.12.2. Previously I have shapely 2.x, then I follow your suggestion, it worked! However, I still don't understand, I'm converting to pandas not geopandas, why did gpd and it's dependencies affecting the process? – Amri Rasyidi Mar 27 '23 at 03:37
  • 1
    toPandas will call Shapely geometry to convert Sedona Geometry. If the shapely version is not correct, it will throw an error – Jia Yu - Apache Sedona Mar 28 '23 at 07:09