I am making transform on my dataframe. while the process takes just 3 seconds with pandas, when I use pyspark and Pandas API on spark it takes approximately 30 minutes, yes 30 minutes! my data is 10k rows. The following is my pandas approach;
def find_difference_between_two_datetime(time1, time2):
return int((time2-time1).total_seconds())
processed_data = pd.DataFrame()
for unique_ip in data.ip.unique():
session_ids = []
id = 1
id_prefix = str(unique_ip) + "_"
session_ids.append(id_prefix + str(id))
ip_data = data[data.ip == unique_ip]
timestamps= [time for time in ip_data.time]
for item in zip(timestamps, timestamps[1:]):
if find_difference_between_two_datetime(item[0], item[1]) > 30:
id +=1
session_ids.append(id_prefix + str(id))
ip_data["session_id"] = session_ids
processed_data = pd.concat([processed_data, ip_data])
processed_data = processed_data.reset_index(drop=True)
processed_data
And the following is my pyspark - Pandas API on spark approach;
import pyspark.pandas as ps
def find_difference_between_two_datetime_spark(time1, time2):
return int((time2-time1)/ 1000000000)
spark_processed_data = ps.DataFrame()
for unique_ip in data.ip.unique().to_numpy():
session_ids = []
id = 1
id_prefix = str(unique_ip) + "_"
session_ids.append(id_prefix + str(id))
ip_data = data[data.ip == unique_ip]
timestamps= ip_data.time.to_numpy()
for item in zip(timestamps, timestamps[1:]):
if find_difference_between_two_datetime_spark(item[0], item[1]) > 30:
id +=1
session_ids.append(id_prefix + str(id))
ip_data["session_id"] = session_ids
spark_processed_data = ps.concat([spark_processed_data, ip_data])
spark_processed_data = spark_processed_data.reset_index(drop=True)
spark_processed_data
What I am missing about spark environment, I think it is not normal to run this code too slowly?