I have a fixed width text file containing several "tables".
table1 dataneedsseparation154
table1 heresplitbadlyneedd432
table2 it'salwaysdifferent
...
I want to read it and process it with pyspark. Every table has its own schema so that I would like to arrive at something like this, using schemas like this:
table1: col1 char(4)
col2 char(5)
col3 char(10)
col4 int(3)
table2: col1 char(19)
table1:
id|col1|col2 |col3 |col4
-------|-----|----------|-----
1|data|needs|separation|154
2|here|split|badlyneedd|432
table2:
id|col1
------------------
1 |it'salwaysdifferent
That means I want to somehow split/groupby the first column, apply the schema and then write to separate files. What I did, is using this list comprehenion, but this is of course not-parallelized.
tables_list = [(table_name,apply_schema(df.filter(df['table']==table_name),table_name) for table_name in tables]
[table.write.format('parquet').save(f'PATH/{table_name}.parquet') for table_name,table in tables_list]
How can I process this in one go with the files being written out in parallel?
I also thought about using the partitionBy function of the dataframe, but then I couldn't figure out if it is possible to apply the apply_schema
function before writing.
Is a UDAF or a window_function able to deal with the distributed writes?
---edit:
minimal example:
df = spark.createDataFrame(
[
("table1", "dataneedsseparation154"),
("table1", "heresplitbadlyneedd432"),
("table2", "it'salwaysdifferent"),
],
('table', 'raw_string')
)
schema = {'table1':{'col1':(0,4),'col2':(5,5),'col3':(10,10),'col4':(20,3)},'table2':{'col1':(0,19)}}
def apply_schema(df,table_name):
for column,(start,length) in schema[table_name].items():
df = df.withColumn(column,df['raw_string'].substr(start,length))
df = df.drop('raw_string')
return df
result = [apply_schema(df.filter(df['table']==table),table) for table in schema.keys()]
Desired output:
spark.createDataFrame(
[
("table1", "data","needs","separation","154"),
("table1", "here","split","badlyneedd","432")
],
('table', 'col1','col2','col3','col4')
).write.format('parquet').save('table1.parquet')
spark.createDataFrame(
[
("table2", "it'salwaysdifferent"),
],
('table', 'col1')
).write.format('parquet').save('table2.parquet')
The question is how to get from the result list to parquet files (in a parallel manner), or if the way described above is the right way at all for getting the transformed parquet files, parallelized.