0

I want execute a very large amount of hive queries and store the result in a dataframe.

I have a very large dataset structured like this:

+-------------------+-------------------+---------+--------+--------+
|         visid_high|          visid_low|visit_num|genderid|count(1)|
+-------------------+-------------------+---------+--------+--------+
|3666627339384069624| 693073552020244687|       24|       2|      14|
|1104606287317036885|3578924774645377283|        2|       2|       8|
|3102893676414472155|4502736478394082631|        1|       2|      11|
| 811298620687176957|4311066360872821354|       17|       2|       6|
|5221837665223655432| 474971729978862555|       38|       2|       4|
+-------------------+-------------------+---------+--------+--------+

I want to create a derived dataframe which uses each row as input for a secondary query:

result_set = []
for session in sessions.collect()[:100]:
    query = "SELECT prop8,count(1) FROM hit_data WHERE dt = {0} AND visid_high = {1} AND visid_low = {2} AND visit_num = {3} group by prop8".format(date,session['visid_high'],session['visid_low'],session['visit_num'])
    result = hc.sql(query).collect()
    result_set.append(result)

This works as expected for a hundred rows, but causes livy to time out with higher loads.

I tried using map or foreach:

def f(session):
    query = "SELECT prop8,count(1) FROM hit_data WHERE dt = {0} AND visid_high = {1} AND visid_low = {2} AND visit_num = {3} group by prop8".format(date,session.visid_high,session.visid_low,session.visit_num)
    return hc.sql(query)

test = sampleRdd.map(f)

causing PicklingError: Could not serialize object: TypeError: 'JavaPackage' object is not callable. I understand from this answer and this answer that the spark context object is not serializable.

I didn't try generating all queries first, then running the batch, because I understand from this question batch querying is not supported.

How do I proceed?

Tom Rijntjes
  • 614
  • 4
  • 16
  • Considering the arguments in the linked questions/answers, there is not much to do here. This said, what do you mean by "very large amount of queries" ? Is it thousands ? is it millions ? – eliasah Jul 23 '18 at 13:13
  • I expect Spark to at least give the tools to deal with very large datasets. There are billions of rows available. Of course, I only need more as long as it would improve the model. – Tom Rijntjes Jul 23 '18 at 13:21
  • 1
    Spark deals with huge dataset. It does not do "nested operations on distributed data structure.It is simply not supported in Spark. You have to use joins, local (optionally broadcasted) data structures or access external data directly instead." – eliasah Jul 23 '18 at 13:24
  • Yes, that's what the other answer says. – Tom Rijntjes Jul 23 '18 at 13:30
  • And it is correct. – eliasah Jul 23 '18 at 13:32
  • Try to work on MVCE so maybe we can help. – eliasah Jul 23 '18 at 13:36
  • Okay, give me a moment. – Tom Rijntjes Jul 23 '18 at 13:38
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/176578/discussion-between-eliasah-and-tom-rijntjes). – eliasah Jul 23 '18 at 13:40

1 Answers1

0

What I was looking for is:

  1. Querying all required data in one go by writing the appropriate joins
  2. Adding custom columns, based on the values of the large dataframe using pyspark.sql.functions.when() and df.withColumn(), then
  3. Flattening the resulting dataframe with df.groupBy() and pyspark.sql.functions.sum()

I think I didn't fully realize that Spark handles dataframes lazily. The supported way of working is to define large dataframes and then the appropriate transforms. Spark will try to execute the data retrieval and the transforms in one go, at the last second and distributed. I was trying to limit the scope up front, which led to unsupported functionality.

Tom Rijntjes
  • 614
  • 4
  • 16