0

I'm working on getting a Spark Dataframe in two steps as the dataset is too big. It's an e-commerce data with purchases and events like list_view, add_to_card, etc. My goal is to get the events (list_view) of users who already purchased.

First, I select a sample of the users who purchased. Then, I want to get the list_view etc of those users who purchased.

Using this method below, didn't work:

def get_sparkdf(self, data_paths, filters, agg_by, agg_func, fields, sample_rate=None, seed=None):    
        spark_df = spark.read.parquet(*data_paths)

        # filtering 
        if filters is not None:
            for filter_ in filters:
                spark_df = spark_df.filter(filter_)

        # perform groupby if arguments is given
        if (agg_by is not None) and (agg_func is not None):
            spark_df = spark_df.groupby(agg_by).agg(*agg_func)

        # select fields if given
        if fields is not None:
            spark_df = spark_df.select(*fields)

        # sample to scale down the data
        if sample_rate is not None:
            spark_df = spark_df.sample(False, sample_rate, seed)

        return spark_df

    #filter_attr is a filter on purchase events.
    df_events = spark.get_sparkdf(attr_path,filters = filters_attr, agg_by=None, 
                                agg_func=None, fields=['user_id','event_data','ts'], sample_rate=None)
    df_uid_sampled = df_events.select('user_id').distinct().sample(False,0.04,None)

Too slow and couldn't finish when I added to that a join step after, in order to do the step 2. (Get the other events like list_view of that sample of users).

So, I wanted to do a direct sql query. But didn't know how to do it.

df_uid_sampled.createOrReplaceTempView("purchasers")
df_view_events = spark.sql("SELECT user_id, event_data, ts, country FROM parquet <parquet_file> WHERE user_id IN purchasers.user_id")

The error is the following:

mismatched input ':' expecting {<EOF>, '(', ',', 'WHERE', 'GROUP', 'ORDER', 'HAVING', 'LIMIT', 'JOIN', 'CROSS', 'INNER', 'LEFT', 'RIGHT', 'FULL', 'NATURAL', 'LATERAL', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'ANTI'}(line 1, pos 57)

== SQL ==
SELECT user_id, event_data, ts, country FROM parquet hdfs:///history_parquet/bids/2018/03/*/*.parquet WHERE user_id IN purchasers.user_id
---------------------------------------------------------^^^
stefanobaghino
  • 11,253
  • 4
  • 35
  • 63
SarahData
  • 769
  • 1
  • 12
  • 38
  • Can you please try to enclose the path... `df_view_events = spark.sql("SELECT user_id, event_data, ts, country FROM parquet.\`hdfs:///history_parquet/bids/2018/03/*/*.parquet\`")` Check if the select is working fine on a sample parquet file and then try to pass in the _WHERE_ predicate. – DataWrangler Jul 23 '18 at 10:41

1 Answers1

0

I've never seen that syntax (SELECT ... FROM parquet <parquet_file>).

What you can do is load that file, register it as a table and use it in the query:

df = spark.read.format('parquet').load('<parquet_file>')
df.registerTempTable('people')
spark.sql('SELECT user_id, event_data, ts, country FROM people WHERE user_id IN purchasers.user_id')
stefanobaghino
  • 11,253
  • 4
  • 35
  • 63