I'm working on getting a Spark Dataframe in two steps as the dataset is too big. It's an e-commerce data with purchases and events like list_view, add_to_card, etc. My goal is to get the events (list_view) of users who already purchased.
First, I select a sample of the users who purchased. Then, I want to get the list_view etc of those users who purchased.
Using this method below, didn't work:
def get_sparkdf(self, data_paths, filters, agg_by, agg_func, fields, sample_rate=None, seed=None):
spark_df = spark.read.parquet(*data_paths)
# filtering
if filters is not None:
for filter_ in filters:
spark_df = spark_df.filter(filter_)
# perform groupby if arguments is given
if (agg_by is not None) and (agg_func is not None):
spark_df = spark_df.groupby(agg_by).agg(*agg_func)
# select fields if given
if fields is not None:
spark_df = spark_df.select(*fields)
# sample to scale down the data
if sample_rate is not None:
spark_df = spark_df.sample(False, sample_rate, seed)
return spark_df
#filter_attr is a filter on purchase events.
df_events = spark.get_sparkdf(attr_path,filters = filters_attr, agg_by=None,
agg_func=None, fields=['user_id','event_data','ts'], sample_rate=None)
df_uid_sampled = df_events.select('user_id').distinct().sample(False,0.04,None)
Too slow and couldn't finish when I added to that a join step after, in order to do the step 2. (Get the other events like list_view of that sample of users).
So, I wanted to do a direct sql query. But didn't know how to do it.
df_uid_sampled.createOrReplaceTempView("purchasers")
df_view_events = spark.sql("SELECT user_id, event_data, ts, country FROM parquet <parquet_file> WHERE user_id IN purchasers.user_id")
The error is the following:
mismatched input ':' expecting {<EOF>, '(', ',', 'WHERE', 'GROUP', 'ORDER', 'HAVING', 'LIMIT', 'JOIN', 'CROSS', 'INNER', 'LEFT', 'RIGHT', 'FULL', 'NATURAL', 'LATERAL', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'ANTI'}(line 1, pos 57)
== SQL ==
SELECT user_id, event_data, ts, country FROM parquet hdfs:///history_parquet/bids/2018/03/*/*.parquet WHERE user_id IN purchasers.user_id
---------------------------------------------------------^^^