3

I have a table in hive, i want to query it on a condition in a loop and store the result in multiple pyspark dataframes dynamically.

Base Query

g1 = """
    select * from db.hive_table where group =  1
"""

group_1 = spk.sql(g1)
group_1.show(3)
group_1.printSchema()
print((group_1.count(), len(group_1.columns)))
group_1 = group_1.toPandas()

There are 80 groups in total, Currently running the above code individually for Group = 2, Group = 3 and so on.

My useless iteration code

    # changes the geometry type to obj

df_list=[group_1,group_2,group_3,group_4,group_5,group_6,group_7,group_8,group_9,group_10,
         group_11,group_12,group_13,group_14,group_15,group_16,group_17,group_18,group_19,group_20,
         group_21,group_22,group_23,group_24,group_25,group_26,group_27,group_28,group_29,group_30,
         group_31,group_32,group_33,group_34,group_35,group_36,group_37,group_38,group_39,group_40,
         group_41,group_42,group_43,group_44,group_45,group_46,group_47,group_48,group_49,group_50,
         group_51,group_52,group_53,group_54,group_55,group_56,group_57,group_58,group_59,group_60,
         group_61,group_62,group_63,group_64,group_65,group_66,group_67,group_68,group_69,group_70,
         group_71,group_72,group_73,group_74,group_75,group_76,group_77,group_78,group_79,group_80,
         
# num_list=[1,2,3,4,5,5,6,6]

for d in df_list:
    for i in range(1,80):
         gi = """
        select * from db.hive_table where group =  $i
        """
    
        group_i = spk.sql(gi)
        print(group_i.show(3))
        print(group_i.printSchema())
        print((group_i.count(), len(group_i.columns)))
        return group_i = group_i.toPandas()

Requesting help to guide me to solve this problem and help me increase my coding knowledge.

Thanks in advance.

Ridhi
  • 39
  • 6
  • does this [link](https://stackoverflow.com/a/43878152/12011541) answer your question? it creates a list of dataframes that you can then select using the list index – lesk_s Jul 14 '22 at 18:27
  • Hi @lesk_s - not quite as i am looking to execute sql query in the loop as well. – Ridhi Jul 14 '22 at 21:44
  • python/pyspark won't allow you to dynamically create variable names. *there are certain ways (e.g., `exec()`) that aren't usually recommended.* you can however create a list of dataframes that can be called like `sdf_list[0].toPandas()`, `sdf_list[1].show()`, etc. – samkart Jul 15 '22 at 05:43
  • What is the reason you want to create 80 dataframe objects? I believe you could simply read all those groups using one query `select * from db.hive_table where group between 1 and 80` – blackbishop Jul 15 '22 at 08:11
  • @samkart - oh that makes sense. yeah - cannot use exec() – Ridhi Jul 15 '22 at 15:55
  • @blackbishop - the reason being the data is huge (30M data points) and i have a function which maps it to another group from a predefined list of groups. So Group =1 will get mapped to an alpha group B (this alpha group has 10K data points) based on couple of conditions. The mapping approach uses UDF binary tree with K nearest neighbors, to process mapping of 30 million data points at one shot results in heap space error and is a challenge and thats why it needs to be split by groups – Ridhi Jul 15 '22 at 16:02

1 Answers1

2

Using lists

python/pyspark won't allow you to create variable names dynamically. However, you can create a list of dataframes that can be used like sdf_list[0].show(), sdf_list[1].toPandas().

sdf_list = []

for i in range(1, 81):
    filtered_sdf = spark.sql('select * from hive_db.hive_tbl where group = {0}'.format(i))
    sdf_list.append((i, filtered_sdf))  # (<filter/group identifier>, <spark dataframe>)
    del filtered_sdf

Now, the sdf_list has a list of spark dataframes that can be accessed using list indices. e.g., the first dataframe can be accessed using [0] and a print will verify that it is a dataframe.

print(sdf_list[0])
# (1, DataFrame[col1: bigint, dt: date, col3: bigint])
# (<filter/group identifier>, <spark dataframe>)

The list can be iterated over and all dataframes within it can be used individually. e.g.,

for (i, sdf) in sdf_list[:2]:
    print("dataframe {0}'s count:".format(i), sdf.count())

# dataframe 1's count: 20
# dataframe 2's count: 30

Feel free to use it as you like.

sdf_list[0][1].count()  # [0] returns the tuple - (<sdf identifier>, <sdf>)
# 20

sdf_list[0][1].show(2)
# etc...

Let's say you also want all the spark dataframes as pandas dataframes. You'll again need to create a list of dataframes if you want it dynamic. Or just access the spark dataframes using indices.

# using indices
group1_pdf = sdf_list[0][1].toPandas()

# creating list of pandas dataframes
pdf_list = []

for (i, sdf) in sdf_list:
    pdf_list.append((i, sdf.toPandas()))  # (<filter/group identifier>, <pandas dataframe>)

type(pdf_list)
# list

type(pdf_list[0])
# tuple

type(pdf_list[0][1])
# pandas.core.frame.DataFrame

Using dictionaries

We could also use dictionaries to store the dataframes and keep track of it using the keys. So, the keys can act as a dataframe name.

sdf_dict = {}

for i in range(1, 81):
    filtered_sdf = spark.sql('select * from hive_db.hive_tbl where group = {0}'.format(i))
    sdf_dict['group'+str(i)] = filtered_sdf
    del filtered_sdf

The dictionary will have the dataframes that can be accessed using the keys. Let's simply print the first 2 keys and check what values we have.

list(sdf_dict.keys())[:2]
# ['group1', 'group2']

sdf_dict['group1']
# DataFrame[col1: bigint, dt: date, col3: bigint]

sdf_dict['group1'].count()
# 20

You can choose to iterate over the dict keys and use the spark dataframes.

for sdf_key in list(sdf_dict.keys())[:2]:
    print(sdf_key+"'s record count:", sdf_dict[sdf_key].count())

# group1's record count: 20
# group2's record count: 30

You can check the type() for a better understanding.

type(sdf_dict)
# dict

type(sdf_dict['group1'])
# pyspark.sql.dataframe.DataFrame

Conversion to a pandas dataframe would be simple

# single df manually
group1_pdf = sdf_dict['group1'].toPandas()

# with iteration
pdf_dict = {}

for sdf_key in sdf_dict.keys():
    pdf_dict[sdf_key] = sdf_dict[sdf_key].toPandas()

type(pdf_dict)
# dict

type(pdf_dict['group1'])
# pandas.core.frame.DataFrame
samkart
  • 6,007
  • 2
  • 14
  • 29