I'm trying to execute .toPandas()
command on an pyspark.sql.dataframe.DataFrame
but it throws me an error
pm_table_bej_test.toPandas()
Py4JJavaError Traceback (most recent call last)
~\AppData\Local\Temp\ipykernel_25776\2227465720.py in
----> 1 pm_table_bej_test.toPandas()
c:\Users\user\Anaconda3\envs\python_ds_gis\lib\site-packages\pyspark\sql\pandas\conversion.py in toPandas(self)
203
204 # Below is toPandas without Arrow optimization.
--> 205 pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
206 column_counter = Counter(self.columns)
207
c:\Users\user\Anaconda3\envs\python_ds_gis\lib\site-packages\pyspark\sql\dataframe.py in collect(self)
815 """
816 with SCCallSiteSync(self._sc):
--> 817 sock_info = self._jdf.collectToPython()
818 return list(_load_from_socket(sock_info, BatchedSerializer(CPickleSerializer())))
819
c:\Users\user\Anaconda3\envs\python_ds_gis\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
1319
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1323
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:657)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
... 1 more
Current Code
# import dependencies
import pandas as pd
import geopandas as gpd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, when
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.utils.adapter import Adapter
# Initiate SparkContext
spark = (
SparkSession
.builder
.master("local[*]")
.appName("Sedona App")
.config("spark.serializer", KryoSerializer.getName)
.config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
.getOrCreate()
)
# import table from csv
pm_table_bej = (
spark
.read
.option("delimiter", ",")
.option("header", "true")
.csv(CSV_PATH)
)
# query
pm_table_bej_test = spark.sql(
"""
SELECT
OID_,
ST_GeomFromText(
CONCAT(
'POINT(',
Lon_of_Observation_Point,
' ',
Lat_of_Observation_Point,
')'
)
) AS geometry,
Local_Time_of_Day_of_Observation_Point,
Unix_Timestamp_of_Observation_Point,
from_unixtime(Unix_Timestamp_of_Observation_Point) AS timestamp
FROM
pm_table_bej_temp_view
WHERE
TRUE
AND Local_Day_of_Week_of_Observation_Point = "Fri"
"""
)
pm_table_bej_test.printSchema()
pm_table_bej_test.show(5)
Output
root
|-- OID_: string (nullable = true)
|-- geometry: geometry (nullable = true)
|-- Local_Time_of_Day_of_Observation_Point: string (nullable = true)
|-- Unix_Timestamp_of_Observation_Point: string (nullable = true)
|-- timestamp: string (nullable = true)
+----+--------------------+--------------------------------------+-----------------------------------+-------------------+
|OID_| geometry|Local_Time_of_Day_of_Observation_Point|Unix_Timestamp_of_Observation_Point| timestamp|
+----+--------------------+--------------------------------------+-----------------------------------+-------------------+
| 14|POINT (106.783285...| 00:39:16| 1675359556|2023-02-03 00:39:16|
| 16|POINT (106.78329 ...| 00:39:16| 1675359556|2023-02-03 00:39:16|
| 21|POINT (106.78349 ...| 06:01:30| 1675378890|2023-02-03 06:01:30|
| 22|POINT (106.78349 ...| 06:01:45| 1675378905|2023-02-03 06:01:45|
| 23|POINT (106.783517...| 06:01:08| 1675378868|2023-02-03 06:01:08|
+----+--------------------+--------------------------------------+-----------------------------------+-------------------+
only showing top 5 rows