I have imported a dataset with 100,000 raw json files of about 100gb through data connection into foundry. I want to use the Python Transforms raw file access
transformation to read the files, Flatten array of structs and structs into a dataframe as an incremental update to df.
I want to use something like from the below example from the documentation for *.json files and also convert that into an incremental updated using @incremental()
decorator.
>>> import csv
>>> from pyspark.sql import Row
>>> from transforms.api import transform, Input, Output
>>>
>>> @transform(
... processed=Output('/examples/hair_eye_color_processed'),
... hair_eye_color=Input('/examples/students_hair_eye_color_csv'),
... )
... def example_computation(hair_eye_color, processed):
...
... def process_file(file_status):
... with hair_eye_color.filesystem().open(file_status.path) as f:
... r = csv.reader(f)
...
... # Construct a pyspark.Row from our header row
... header = next(r)
... MyRow = Row(*header)
...
... for row in csv.reader(f):
... yield MyRow(*row)
...
... files_df = hair_eye_color.filesystem().files('**/*.csv')
... processed_df = files_df.rdd.flatMap(process_file).toDF()
... processed.write_dataframe(processed_df)
With the help of @Jeremy David Gamet i was able to develop the code to get the dataset i want.
from transforms.api import transform, Input, Output
from pyspark import *
import json
@transform(
out=Output('foundry/outputdataset'),
inpt=Input('foundry/inputdataset'),
)
def update_set(ctx, inpt, out):
spark = ctx.spark_session
sc = spark.sparkContext
filesystem = list(inpt.filesystem().ls())
file_dates = []
for files in filesystem:
with inpt.filesystem().open(files.path,'r', encoding='utf-8-sig') as fi:
data = json.load(fi)
file_dates.append(data)
json_object = json.dumps(file_dates)
df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
df_2.drop_duplicates()
# this code to [Flatten array column][1]
df_2 = flatten(df_2)
out.write_dataframe(df_2)
code to flatten__df
The above code works for few files, since the files are above 100,0000 i am hitting the following error:
Connection To Driver Lost
This error indicates that connection to the driver was lost unexpectedly, which is often caused by the driver being terminated due to running out of memory. Common reasons for driver out-of-memory (OOM) errors include functions that materialize data to the driver such as .collect(), broadcasted joins, and using Pandas dataframes.
any way around this ?