0

I want to create a pyspark dataframe from a python dictionary but the following code

from pyspark.sql import SparkSession, Row

df_stable = spark.createDataFrame(dict_stable_feature)
df_stable.show()

show this error

TypeError: Can not infer schema for type: <class 'str'>

Reading this post on stackoverflow:

Pyspark: Unable to turn RDD into DataFrame due to data type str instead of StringType

I can deduce that maybe the problem is that I used by mistake python standard str instead of StringType and spark doesn't like it. What can I do to make it work??

EDIT:

I created my dictionary using this code

Create multiple lists and store them into a dictionary Python

as you can see, the key is created doing

cc = str(col)
vv = "_" + str(value)
cv = cc + vv

dict_stable_feature[cv] = t

while t is just a binary list of 1 and 0.

ggordon
  • 9,790
  • 2
  • 14
  • 27
ianux22
  • 405
  • 4
  • 16
  • What are the datatypes of the values of each key in your dictionary? Are they scalar/single values or are they complex types such as lists/objects? Should each key be a new column in your dataframe or a value in a row ? Can you share a sample code with your dictionary and what your expected dataframe table should look like? – ggordon Sep 08 '21 at 00:44
  • I posted some other info! – ianux22 Sep 08 '21 at 13:37

1 Answers1

0

Let's start with converting your python dictionary into a list of list values in the correct position (i.e. one of the expected data structures for initializing a spark dataframe).

You may try the following assuming all list values in dictionary are of the same length.

column_names = []
dataset = None
for column_name in dict_stable_feature:
    column_names.append(column_name)
    column_values = dict_stable_feature[column_name]
    # initialize dataset ranges
    if dataset is None:
        dataset=[]
        
        for i in range(0,len(column_values)):
            dataset.append([column_values[i]])
    else:
        for ind,val in enumerate(column_values):
            dataset[ind].append(val)

my_df = sparkSession.createDataFrame(dataset,schema=column_names)

if all list values are not of the same length, then you may try the following:

max_list_length = max([len(dict_stable_feature[k]) for k in dict_stable_feature])
column_names = []
dataset = [[] for i in range(0,max_list_length)]
default_data_value = None # feel free to change
for column_name in dict_stable_feature:
    column_names.append(column_name)
    column_values = dict_stable_feature[column_name]


    for ind,val in enumerate(column_values):
        dataset[ind].append(val)

    # ensure all columns have the same amount of rows
    no_of_values = len(column_values)
    if  no_of_values < max_list_length:
        for i in range(no_of_values,max_list_length):
            dataset[i].append(default_data_value)

my_df = sparkSession.createDataFrame(dataset,schema=column_names)

Let me know if this works for you.

ggordon
  • 9,790
  • 2
  • 14
  • 27