I am taking my first steps in PySpark, and currently, I am studying UDFs and pandas UDFs. I have read several forums, and they more or less agree that "pandas UDFs allow vectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs". Hence, pandas UDFs become an interesting subject.
For my test, I have the following dummy data:
import pandas as pd
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import ArrayType, StringType
spark = SparkSession.builder \
.appName('SpacyOverPySpark') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.2') \
.getOrCreate()
# - Running in a GCP Dataproc Workbench Jupyter Notebook
# - Data being imported from GCP Cloud Storage
# - They're basically text paragraphs or sentences, no nulls
df = spark.read.csv("gs://my_bucket/data_sample.csv", header=True)
print("DataFrame shape: ", (df.count(), len(df.columns)))
# Gets `DataFrame shape: (1029895, 1)`
df.printSchema()
# root
# |-- posting: string (nullable = true)
Trying to keep it simple, I first built a udf, that splits the input text (I know there could be better implementations, based in native PySpark functions; let's put them aside during these tests):
def textsplitter(texts: str) -> list:
return texts.split()
textsplitterUDF = udf(lambda z: textsplitter(z), ArrayType(StringType()))
t1 = time.time()
df.withColumn("posting_split", textsplitterUDF(col("posting"))).show(5, truncate=10)
print(time.time()-t1) # 0.5169014930725098 (for ~1M samples, not bad)
So far, so good. Now, the turn for pandas udf, using a "from pd.Series
to pd.Series
" approach. Naively here (maybe), I ran some quick speed tests over a demo dff
(a pandas.DataFrame
with only 10 texts) to "pick the fastest implementation", and so:
# dff["out"] = dff["posting"].map(lambda x:x.split())
# 286 µs ± 3.79 µs per loop (mean ± std. dev. of 10 runs, 1000 loops each)
# dff["out"] = dff["posting"].apply(lambda x:x.split())
# 306 µs ± 7.71 µs per loop (mean ± std. dev. of 10 runs, 1000 loops each)
# dff["out"] = dff["posting"].str.split()
# 375 µs ± 7.6 µs per loop (mean ± std. dev. of 10 runs, 1000 loops each)
Knowing this, I implemented the pandas udf:
@pandas_udf(ArrayType(StringType()))
def textsplitter(texts: pd.Series) -> pd.Series:
return texts.map(lambda x:x.split())
t1 = time.time()
df_dummy2.withColumn("posting_split", textsplitter(col("posting"))).show(5, truncate=10)
print(time.time()-t1) # Who knows, I waited for ~ 1H before giving up waiting (!) ...
As you can see, the pandas udf does not even complete in ~1H, without triggering any error or warning. I was checking this website, and the most alike case I found is this, whose answer (I think) explains my case too, when saying "Unless your data is large enough such that it cannot be processed by just one node spark should not be considered". Still, I have some questions:
- Does this answer explain my current case, or is there something else missing?
- How to know if my data is "large enough", and how to know if several nodes are being activated in my cluster?
- The batch of information I have at the moment has ~0.5 TB in size (in general, batches that size are expected to be processed); plus my final goal is to use a custom NER model trained in spaCy, to extract entities from texts (this model would be wrapped in a udf / pandas udf, that similar to this case, would return "lists of strings"). Roughly speaking and for my use case, using a udf / pandas udf in PySpark would be the best approach?
Thank you.