1

I have CSV files from multiple paths that are not parent directories in s3 bucket. All the tables have the same partition keys.

the directory of the s3:

table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
...

I need to convert these csv files into parquet files and store them in another s3 bucket that has the same directory structure.

the directory of another s3:

table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
...

I have a solution is iterating through the s3 bucket and find the CSV file and convert it to parquet and save to the another S3 path. I find this way is not efficient, because i have a loop and did the conversion one file by one file.

I want to utilize the spark library to improve the efficiency. Then, I tried:

spark.read.csv('s3n://bucket_name/table_name_1/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/table_name_1')

This way works good for each table, but to optimize it more, I want to take the table_name as a parameter, something like:

TABLE_NAMES = [table_name_1, table_name_2, ...]
spark.read.csv('s3n://bucket_name/{*TABLE_NAMES}/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/{*TABLE_NAMES}')

Thanks

miaj
  • 91
  • 4
  • 13
  • so... last point {*TABLE_NAMES} is working for you ? I could not get what exactly you wanted. – Ram Ghadiyaram May 29 '20 at 22:31
  • looks like you just need to start the spark job for each table in a separate thread. Take a look at this question https://stackoverflow.com/questions/30214474/how-to-run-multiple-jobs-in-one-sparkcontext-from-separate-threads-in-pyspark – facha May 29 '20 at 22:40
  • @Ram Ghadiyaram. Nup, the last point is not working for me. I want to do something like this, utilize *path to represent all of tables: https://stackoverflow.com/questions/37257111/reading-parquet-files-from-multiple-directories-in-pyspark – miaj May 29 '20 at 23:03

1 Answers1

2

The mentioned question provides solutions for reading multiple files at once. The method spark.read.csv(...) accepts one or multiple paths as shown here. For reading the files you can apply the same logic. Although, when it comes to writing, Spark will merge all the given dataset/paths into one Dataframe. Therefore it is not possible to generate from one single dataframe multiple dataframes without applying a custom logic first. So to conclude, there is not such a method for extracting the initial dataframe directly into multiple directories i.e df.write.csv(*TABLE_NAMES).

The good news is that Spark provides a dedicated function namely input_file_name() which returns the file path of the current record. You can use it in combination with TABLE_NAMES to filter on the table name.

Here it is one possible untested PySpark solution:

from pyspark.sql.functions import input_file_name 

TABLE_NAMES = [table_name_1, table_name_2, ...]

source_path = "s3n://bucket_name/"
input_paths = [f"{source_path}/{t}" for t in TABLE_NAMES]

all_df = spark.read.csv(*input_paths) \
              .withColumn("file_name", input_file_name()) \
              .cache()

dest_path = "s3n://another_bucket/"

def write_table(table_name: string) -> None:
   all_df.where(all_df["file_name"].contains(table_name))
     .write
     .partitionBy('partition_key_1','partition_key_2')
     .parquet(f"{dest_path}/{table_name}")

for t in TABLE_NAMES:
   write_table(t)

Explanation:

  • We generate and store the input paths into input_paths. This will create paths such as: s3n://bucket_name/table1, s3n://bucket_name/table2 ... s3n://bucket_name/tableN.

  • Then we load all the paths into one dataframe in which we add a new column called file_name, this will hold the path of each row. Notice that we also use cache here, this is important since we have multiple len(TABLE_NAMES) actions in the following code. Using cache will prevent us from loading the datasource again and again.

  • Next we create the write_table which is responsible for saving the data for the given table. The next step is to filter based on the table name using all_df["file_name"].contains(table_name), this will return only the records that contain the value of the table_name in the file_name column. Finally we save the filtered data as you already did.

  • In the last step we call write_table for every item of TABLE_NAMES.

Related links

How to import multiple csv files in a single load?

Get HDFS file path in PySpark for files in sequence file format

abiratsis
  • 7,051
  • 3
  • 28
  • 46