2

Currently I have file structure defined so dataframe schema is like below(Sample data used from different source)

schema = StructType([
     StructField('id', StringType(), True),
     StructField('dept', StringType(), True),
     StructField('salary', IntegerType(), True),
     StructField('location', StringType(), True)
     ])

df = spark.createDataFrame([(36636,'Finance'  ,3000,  'USA'),
(40288,'Finance' ,  5000, 'IND'),
(42114,'Sales',    3900,'USA'),
(39192,'Marketing',2500, 'CAN'),
(34534,'Sales',    6500,  'USA')],
schema=schema)

I do below operation to create map type for 2 columns like below

df = df.withColumn("propertiesMap",create_map(
        lit("salary"),col("salary"),
        lit("location"),col("location")
        )).drop("salary","location")

And my dataframe looks like this

+-----+---------+---------------------------------+
|id   |dept     |propertiesMap                    |
+-----+---------+---------------------------------+
|36636|Finance  |[salary -> 3000, location -> USA]|
|40288|Finance  |[salary -> 5000, location -> IND]|
|42114|Sales    |[salary -> 3900, location -> USA]|
|39192|Marketing|[salary -> 2500, location -> CAN]|
|34534|Sales    |[salary -> 6500, location -> USA]|
+-----+---------+---------------------------------+

Going forward the input file may have dynamic columns like

file1.csv
id,dept,Salary,location

file2.csv
id,dept,salary

file3.csv
id,dept

file4.csv
id,dept,firstname,lastname,middlename,address

In all the cases id,dept do not change. But all other columns are dynamic. I process file by file

For Example take file1.csv from above

fixed_columns = [id,dept]

all_columns= df.columns

dynamic_col = list(set(all_columns) - set(fixed_columns))

Gives

dynamic_col = [salary, location]

And I want something like and to use append?. Not sure

for i in dynamic_col;
     df = df.withColumn('propertiesMap', create_map( lit(i), col(i)))

Once all files got processed and appended into final dataframe it has to looklike

+-----+---------+------------------------------------------------------+
|id   |dept     |propertiesMap                                         |
+-----+---------+------------------------------------------------------+
|36636|Finance  |[salary -> 3000, location -> USA]                     |
|40288|Finance  |[salary -> 5000, location -> IND]                     |
|42114|Sales    |[salary -> 3900, location -> USA]                     |
|39192|Marketing|[salary -> 2500, location -> CAN]                     |
|34534|Sales    |[salary -> 6500, location -> USA]                     |
|36636|Finance  |[firstname -> kevin, lastname -> Miller]              |
|40288|Finance  |[firstname -> aaron, lastname -> sahn]                |
|42114|Sales    |[firstname -> daron, lastname -> ket]                 |
|39192|Marketing|[]                                                    |
|34534|Sales    |[firstname -> dev, lastname -> dis, middlename -> Sam]|
+-----+---------+------------------------------------------------------+

I don't use pandas.

settle
  • 139
  • 13

2 Answers2

2

The purpose of primitive datatypes like MapType() is to have a storied data structure. Instead of appending and doubling your df length I would ensure one row per id and dept. If thats so, then lets consider:

Original df

+-----+---------+---------------------------------+
|id   |dept     |propertiesMap                    |
+-----+---------+---------------------------------+
|36636|Finance  |{salary -> 3000, location -> USA}|
|40288|Finance  |{salary -> 5000, location -> IND}|
|42114|Sales    |{salary -> 3900, location -> USA}|
|39192|Marketing|{salary -> 2500, location -> CAN}|
|34534|Sales    |{salary -> 6500, location -> USA}|
+-----+---------+---------------------------------+

csv1 df

+-----+---------+--------------------------------------------------------+
|id   |dept     |propertiesMap_new                                       |
+-----+---------+--------------------------------------------------------+
|36636|Finance  |{firstname -> Miller, lastname -> kevin, middlename -> }|
|40288|Finance  |{firstname -> aaron, lastname -> sahn, middlename -> }  |
|42114|Sales    |{firstname -> daron, lastname -> ket, middlename -> }   |
|39192|Marketing|{firstname -> , lastname -> , middlename -> }           |
+-----+---------+--------------------------------------------------------+

csv2 df

+-----+-----+------------------------------------------------------+
|id   |dept |propertiesMap_new                                     |
+-----+-----+------------------------------------------------------+
|34534|Sales|{firstname -> dev, lastname -> dis, middlename -> Sam}|
+-----+-----+------------------------------------------------------+

Depending on the size of the dataframes being added dynamically, you have two options, 1. Either read and store them in a list first before appending to existing df. 2. Loop as you read and append to existing df. Whichever option, all you need is a for loop. In this case, I used a list of df. In the for loop 3. union existing with new df. 4. groupby and agg putting the target maptype column into an array. 5. Leverage higher order function to cast array acquired above back into map. See code below;

lst=[df2,df1]
for l in lst:
  df=df.union(l)
df.groupby('id','dept').agg(collect_list('propertiesMap').alias('propertiesMap')).select('id','dept',
        f.expr('aggregate(slice(propertiesMap, 2, size(propertiesMap)), propertiesMap[0], (acc, element) -> map_concat(acc, element))').alias('propertiesMap')
        ).show(truncate=False)



    +-----+---------+-----------------------------------------------------------------------------------------+
    |id   |dept     |propertiesMap                                                                            |
    +-----+---------+-----------------------------------------------------------------------------------------+
    |36636|Finance  |{salary -> 3000, location -> USA, firstname -> Miller, lastname -> kevin, middlename -> }|
    |40288|Finance  |{salary -> 5000, location -> IND, firstname -> aaron, lastname -> sahn, middlename -> }  |
    |42114|Sales    |{salary -> 3900, location -> USA, firstname -> daron, lastname -> ket, middlename -> }   |
    |39192|Marketing|{salary -> 2500, location -> CAN, firstname -> , lastname -> , middlename -> }           |
    |34534|Sales    |{salary -> 6500, location -> USA, firstname -> dev, lastname -> dis, middlename -> Sam}  |
    +-----+---------+-----------------------------------------------------------------------------------------+
wwnde
  • 26,119
  • 6
  • 18
  • 32
0

I guess you should try map_concat in pyspark for your requirement

str_dynamic_col = [location, ...]
int_dynamic_col =[salary, ...]

df=df.withColumn('INTpropertiesMap', F.create_map().cast(T.MapType(T.StringType(), T.IntegerType())))
df=df.withColumn('STRpropertiesMap', F.create_map().cast(T.MapType(T.StringType(), T.StringType())))

for i in str_dynamic_col:
     df = df.withColumn('STRpropertiesMap', 
          F.map_concat(
               F.col('STRpropertiesMap'),
               F.create_map( F.lit(i), F.col(i))
          )
     )

for i in int_dynamic_col:
     df = df.withColumn('INTpropertiesMap', 
          F.map_concat(
               F.col('INTpropertiesMap'),
               F.create_map( F.lit(i), F.col(i))
          )
     )

If looking for multiple datatypes with map, pls checkout this post https://stackoverflow.com/a/67088231/7224372

Dominic
  • 33
  • 6