2

I have 1000s of files with data in the below format:

a|b|c|clm4=1|clm5=3
a|b|c|clm4=9|clm6=60|clm7=23

And I want to read it and convert it to a dataframe as below:

clm1|clm2|clm3|clm4|clm5|clm6|clm7
a|b|c|1|3|null|null
a|b|c|9|null|60|23

I have tried the below method:

files = [f for f in glob.glob(pathToFile + "/**/*.txt.gz", recursive=True)]
df = spark.read.load(files, format='csv', sep = '|', header=None)

But it is giving me below result:

clm1, clm2, clm3, clm4, clm5
a, b, c, 1, 3
a, b, c, 9, null
blackbishop
  • 30,945
  • 11
  • 55
  • 76
user0204
  • 231
  • 3
  • 18
  • https://stackoverflow.com/questions/45605839/spark-doesnt-read-columns-with-null-values-in-first-row – Simon Crane Jan 25 '20 at 12:01
  • To use this method I have to write getItem() for each column which is not possible because there are 100s of columns and most of them are unknown – user0204 Jan 25 '20 at 12:06

1 Answers1

1

For Spark 2.4+, you can read the files as a single column then split it by |. You'll get an array column that you could transform using higher-order functions:

df.show(truncate=False)

+----------------------------+
|clm                         |
+----------------------------+
|a|b|c|clm4=1|clm5=3         |
|a|b|c|clm4=9|clm6=60|clm7=23|
+----------------------------+

We use transform function to convert the array of string that we get from splitting the clm column into an array of structs. Each struct contains column name if present (check if a string contains =) or name it clm + (i+1) where i is its position.

transform_expr = """
transform(split(clm, '[|]'), (x, i) -> 
                   struct(
                         IF(x like '%=%', substring_index(x, '=', 1), concat('clm', i+1)), 
                         substring_index(x, '=', -1)
                         )
        )
"""

Now use map_from_entries to convert the array to map. And finally, explode the map and pivot to get your columns

df.select("clm", 
          explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")
         ) \
  .groupby("clm").pivot('col_name').agg(first('col_value')) \
  .drop("clm") \
  .show(truncate=False)

Gives:

+----+----+----+----+----+----+----+
|clm1|clm2|clm3|clm4|clm5|clm6|clm7|
+----+----+----+----+----+----+----+
|a   |b   |c   |9   |null|60  |23  |
|a   |b   |c   |1   |3   |null|null|
+----+----+----+----+----+----+----+
blackbishop
  • 30,945
  • 11
  • 55
  • 76
  • Thanks for the help. Is there any way I can put a condition in above code to select only those columns that are present in an existing list of column names? – user0204 Jan 29 '20 at 03:00