5

I have imported a dataset with 100,000 raw json files of about 100gb through data connection into foundry. I want to use the Python Transforms raw file access transformation to read the files, Flatten array of structs and structs into a dataframe as an incremental update to df. I want to use something like from the below example from the documentation for *.json files and also convert that into an incremental updated using @incremental() decorator.

>>> import csv
>>> from pyspark.sql import Row
>>> from transforms.api import transform, Input, Output
>>>
>>> @transform(
...     processed=Output('/examples/hair_eye_color_processed'),
...     hair_eye_color=Input('/examples/students_hair_eye_color_csv'),
... )
... def example_computation(hair_eye_color, processed):
...
...    def process_file(file_status):
...        with hair_eye_color.filesystem().open(file_status.path) as f:
...            r = csv.reader(f)
...
...            # Construct a pyspark.Row from our header row
...            header = next(r)
...            MyRow = Row(*header)
...
...            for row in csv.reader(f):
...                yield MyRow(*row)
...
...    files_df = hair_eye_color.filesystem().files('**/*.csv')
...    processed_df = files_df.rdd.flatMap(process_file).toDF()
...    processed.write_dataframe(processed_df)

With the help of @Jeremy David Gamet i was able to develop the code to get the dataset i want.

from transforms.api import transform, Input, Output
from  pyspark import *
import json


@transform(
     out=Output('foundry/outputdataset'),
     inpt=Input('foundry/inputdataset'),
 )
def update_set(ctx, inpt, out):
    spark = ctx.spark_session
    sc = spark.sparkContext

    filesystem = list(inpt.filesystem().ls())
    file_dates = []
    for files in filesystem:
        with inpt.filesystem().open(files.path,'r', encoding='utf-8-sig') as fi:
            data = json.load(fi)
        file_dates.append(data)

    json_object = json.dumps(file_dates)
    df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))

    df_2.drop_duplicates()
# this code to [Flatten array column][1]
    df_2 = flatten(df_2)
    out.write_dataframe(df_2)

code to flatten__df

The above code works for few files, since the files are above 100,0000 i am hitting the following error:

Connection To Driver Lost 

This error indicates that connection to the driver was lost unexpectedly, which is often caused by the driver being terminated due to running out of memory. Common reasons for driver out-of-memory (OOM) errors include functions that materialize data to the driver such as .collect(), broadcasted joins, and using Pandas dataframes.

any way around this ?

Asher
  • 175
  • 11
  • You should be able `import json` instead of CSV with the example above https://docs.python.org/3/library/json.html – fmsf Mar 26 '21 at 16:13
  • @fmsf, how can i contruct the row using json – Asher Mar 26 '21 at 16:29
  • 1
    I don't have time to give you a full example atm, but you seem to be in the right path with your edit above. I'll try to post something later today or tomorrow morning – fmsf Mar 26 '21 at 16:32
  • 1
    @fmsf, i was able to parse the json files as strings but this is not inferring the schema for the column – Asher Mar 28 '21 at 09:30
  • 1
    Regarding your last edit, it sounds like you are hiting memory limits of your driver. Maybe the json, or number of jsons, are too big to process. You'll need to increase the driver memory or be creative about the way it processes them, like not parallelising, or having smaller jsons. If the parsing works for individual jsons, then going incremental can be the solution here, though this will happen again if you do a recompute. – fmsf Mar 31 '21 at 09:48
  • @fmsf, thank you for that valuable feedback, any way around this, how can i go with incremental solution, increasing driver memory is a no go for me at the moment. – Asher Mar 31 '21 at 12:05
  • @fmsf, another solution i tried is copied the schema from the output of my code with few files and inserted it to the input dataset with the list of json files and i called transformation on that, its sometimes build the dataset, sometimes fails with oom, is there a way to optimize in that aspect ? – Asher Mar 31 '21 at 12:07
  • 1
    For the incremental part you just need to add `@incremental(semantic_version=1)` on top of the `@transform`. If you increase the semantic version it will toggle a full recompute, if you keep the same, subsequent builds will only build non processed files. – fmsf Mar 31 '21 at 12:31
  • @fmsf, how can i know which is the best approach, the code above or applying the schema manually to the input dataset? – Asher Mar 31 '21 at 15:13

1 Answers1

2

I have given an example of how this can be done dynamically as an answer to another question.

Here is the link to that code answer: How to union multiple dynamic inputs in Palantir Foundry? and a copy of the same code:

from transforms.api import Input, Output, transform
from pyspark.sql import functions as F
import json
import logging


def transform_generator():
    transforms = []
    transf_dict = {## enter your dynamic mappings here ##}

    for value in transf_dict:
        @transform(
            out=Output(' path to your output here '.format(val=value)),
            inpt=Input(" path to input here ".format(val=value)),
        )
        def update_set(ctx, inpt, out):
            spark = ctx.spark_session
            sc = spark.sparkContext

            filesystem = list(inpt.filesystem().ls())
            file_dates = []
            for files in filesystem:
                with inpt.filesystem().open(files.path) as fi:
                    data = json.load(fi)
                file_dates.append(data)

            logging.info('info logs:')
            logging.info(file_dates)
            json_object = json.dumps(file_dates)
            df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
            df_2 = df_2.withColumn('upload_date', F.current_date())

            df_2.drop_duplicates()
            out.write_dataframe(df_2)
        transforms.append(update_logs)
    return transforms


TRANSFORMS = transform_generator()

Please let me know if there is anything I can clarify.

fmsf
  • 36,317
  • 49
  • 147
  • 195
  • also line ```transforms.append(update_logs)``` through error as undefined. – Asher Mar 30 '21 at 05:05
  • can you help with understanding whats happening in your code in this line? ```transforms = [] transf_dict = {## enter your dynamic mappings here ##}```, what is the expected mapping , it is the json schema ? – Asher Mar 30 '21 at 05:18
  • thank you for that, it helped me build the dataset, how to convert this to incremental update or we cant work like that with json files ? – Asher Mar 30 '21 at 05:52
  • though with your code, i am able to generate the data, but for all the files, i am running oom issue, any way around it ? – Asher Mar 30 '21 at 06:59