0

I've got a list of plenty of .csv files in a dataset, but for the simplicity let's assume there are three files in the dataset:

  • ASGK_2022.csv
  • ASGK_2023.csv
  • PRAS_2022.csv

I need to process only files with 'ASGK' in their titles. In other words, I need to filter files by their titles using transforms.api.FileSystem.files method. All files share same column names.

I filter files using regex code. Here's two pieces of code I've been using, however, unsuccessfully.

from pyspark.sql import functions as F
from transforms.api import transform, Input, Output
import pandas as pd
import json
import re


@transform(
    output_df=Output(
        ""),
    input_raw=Input(""),
)
def compute(input_raw, output_df, ctx):

    def process_file(file_status):
        with input_raw.filesystem().open(file_status.path, 'rb') as f:
            with pd.ExcelFile(f.read(), engine='openpyxl') as xlsx_path:

                pdf = pd.read_csv(xlsx_path, dtype=str, header=0)
                pdf.columns = pdf.columns.str.lower()

                for row in pdf.to_dict('records'):
                    yield json.dumps(row, default=str)

    rdd = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$').rdd.flatMap(process_file)
    spark = ctx.spark_session
    dfs = spark.read.json(rdd)
    output_df.write_dataframe(dfs)

The error I get:

Traceback (most recent call last): File "/myproject/datasets/ExcelFile.py", line 27, in compute dfs = spark.read.json(rdd) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 241, in json return self._df(self._jreader.json(jrdd)) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway.py", line 1322, in call return_value = get_return_value( File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o163.json. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (localhost executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process serializer.dump_stream(out_iter, outfile) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream for obj in iterator: File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 232, in func for x in iterator: File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/ExcelFile.py", line 17, in process_file with pd.ExcelFile(f.read(), engine='openpyxl') as xlsx_path: File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_base.py", line 1695, in init self._reader = self._engines[engine](self._io, storage_options=storage_options) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl.py", line 557, in init super().init(filepath_or_buffer, storage_options=storage_options) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_base.py", line 545, in init self.book = self.load_workbook(self.handles.handle) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl.py", line 568, in load_workbook return load_workbook( File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py", line 344, in load_workbook reader = ExcelReader(filename, read_only, keep_vba, File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py", line 123, in init self.archive = _validate_archive(fn) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py", line 95, in _validate_archive archive = ZipFile(filename, 'r') File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py", line 1269, in init self._RealGetContents() File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py", line 1336, in _RealGetContents raise BadZipFile("File is not a zip file") zipfile.BadZipFile: File is not a zip file

Another way is by using OleFile:

def compute(input_raw, output_df, ctx):

    def process_file(file_status):
        with input_raw.filesystem().open(file_status.path, 'rb') as f:
            ole = olefile.OleFileIO(f.read())
            if ole.exists('Workbook'):
                d = ole.openstream('Workbook')
                pdf = pd.read_excel(d, dtype=str, header=0, engine='openpyxl')

         

            for row in pdf.to_dict('records'):
                yield json.dumps(row)

    files_df = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$')
    rdd = files_df.rdd.flatMap(process_file)
    spark = ctx.spark_session
    dfs = spark.read.json(rdd)


    output_df.write_dataframe(dfs)

Traceback (most recent call last): File "/myproject/datasets/OleFile.py", line 33, in compute dfs = spark.read.json(rdd) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 241, in json return self._df(self._jreader.json(jrdd)) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway.py", line 1322, in call return_value = get_return_value( File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o320.json. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (localhost executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process serializer.dump_stream(out_iter, outfile) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream for obj in iterator: File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 232, in func for x in iterator: File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/OleFile.py", line 18, in process_file ole = olefile.OleFileIO(f.read()) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/olefile.py", line 1075, in init self.open(filename, write_mode=write_mode) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/olefile.py", line 1169, in open self.fp = open(filename, mode) FileNotFoundError: [Errno 2] No such file or directory: b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n

Any help please?

Folkas
  • 29
  • 2
  • 1
    Why can't you use just `if "ASGK" in filename:` and do the processing otherwise skip it? – Gameplay Aug 30 '23 at 06:14
  • CSV files aren't an Excel format. They're plain text files with Values Separated by Commas. Excel files are ZIP files packages XML documents. You can't load an Excel file as CSV or vice versa. – Panagiotis Kanavos Aug 30 '23 at 06:53
  • The code does some very weird things anyway and looks like randomly copied snippets. It would be a *lot* easier to just read the docs. 1) `pd.ExcelFile(f.read(), engine='openpyxl')` tries to open an Excel file and return this as an object. 2) `as xlsx_path` though is trying to tread that *object* as if it was a file path then 3) `pd.read_csv(xlsx_path)` is trying to load a *CSV* file from what is an Excel object. – Panagiotis Kanavos Aug 30 '23 at 07:00
  • What is this method trying to do? Convert CSV files to JSON? The easy way to read a CSV file is `df=pd.read_csv(file_status.path)`. You can generate JSON with [to_json](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_json.html). The entire method could be replaced with `return pd.read_csv(file_status.path).to_json(orient='records')` – Panagiotis Kanavos Aug 30 '23 at 07:03
  • `using OleFile` but Excel files aren't OLE documents at all. The rest of the code shows the same bugs as the previous - the file contents are loaded and used as if they were the path to the file. – Panagiotis Kanavos Aug 30 '23 at 07:10
  • Apologies for confusion. The code provided was initially used for .xlsx files, but I tried to adapt it for .csv files. In fact, will be working both with .xlsx and .csv files. Thanks @PanagiotisKanavos for great remarks. – Folkas Aug 31 '23 at 07:59

1 Answers1

3

The first error looks like that library is trying to interpret the file as a zip file.

File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py", line 1336, in _RealGetContents raise BadZipFile("File is not a zip file") zipfile.BadZipFile: File is not a zip file

The second one, it looks like it is trying to use a line from your csv file as a path?

FileNotFoundError: [Errno 2] No such file or directory: b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n

Your regex looks correct as far as I can see.

You can read CSV directly with Spark as described in this answer: https://stackoverflow.com/a/72312808/5233494

    filesystem = raw.filesystem()
    hadoop_path = filesystem.hadoop_path
    files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls(regex=r'.*ASGK.*\.csv$')]
    df = (
        ctx
        .spark_session
        .read
        .option("encoding", "UTF-8")  # UTF-8 is the default
        .option("header", True)
        .option("inferSchema", True)
        .csv(files)
    )
user5233494
  • 71
  • 1
  • 6
  • An `xlsx` file *is* a ZIP package containing XML documents. A CSV file is just a text file with commas. The question's code is trying to read a text (CSV) file using Excel methods, in a weird way. It seems the intent is to convert the CSV data to JSON rows – Panagiotis Kanavos Aug 30 '23 at 07:06
  • Thanks @user5233494. I've also got plenty of .xlsx files that I also need to filter by their titles and write a transformation. Could you provide a code which filters .xlsx files by their title and then parses them? The spark.read method seems to work only with .csv files. – Folkas Aug 31 '23 at 08:01