14

I am using AWS to transform some JSON files. I have added the files to Glue from S3. The job I have set up reads the files in ok, the job runs successfully, there is a file added to the correct S3 bucket. The issue I have is that I cant name the file - it is given a random name, it is also not given the .JSON extension.

How can I name the file and also add the extension to the output?

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
Ewan Peters
  • 141
  • 1
  • 1
  • 5
  • Possible duplicate of [Write single CSV file using spark-csv](https://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv) – Jesse Clark Feb 21 '18 at 18:48
  • cannot be a duplicate, the link shared above is for spark, the solution works for aws glue. In spark it is difficult to solve the problem. – Abhisekh Jun 18 '20 at 18:42

2 Answers2

13

Due to the nature of how Spark works, it's not possible to name the file. However, it's possible to rename the file right afterward.

URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
fs = FileSystem.get(URI("s3://{bucket_name}"), sc._jsc.hadoopConfiguration())

file_path = "s3://{bucket_name}/processed/source={source_name}/year={partition_year}/week={partition_week}/"
df.coalesce(1).write.format("json").mode(
    "overwrite").option("codec", "gzip").save(file_path)

# rename created file
created_file_path = fs.globStatus(Path(file_path + "part*.gz"))[0].getPath()
fs.rename(
    created_file_path,
    Path(file_path + "{desired_name}.jl.gz"))
alessiosavi
  • 2,753
  • 2
  • 19
  • 38
Juan Riaza
  • 1,618
  • 2
  • 16
  • 35
  • 1
    a very good solution, helped me rename an s3 file to whatever name i want, thank you :) – Abhisekh Jun 18 '20 at 18:40
  • Do the 'variable' encased in the `{}` automatically get resolved or do we need to replace? I would be able to replace with the actualy buckname and I think soruce, but to keep automated how would you vary the year/month/day? – Int'l Man Of Coding Mystery Jan 07 '21 at 16:01
  • There is something I don't understand. This particular write: df.coalesce(1).write() has produced a single file. Is it possible to retrieve the name of this particular file out of the possibly many part* files? – ivan May 04 '22 at 12:34
2

This following code worked for me -

source_DataFrame = glueContext.create_dynamic_frame.from_catalog(database = databasename, table_name = source_tablename_in_catalog, transformation_ctx = "source_DataFrame")

source_DataFrame = source_DataFrame.toDF().coalesce(1) #avoiding coalesce(1) will create many part-000* files according to data

from awsglue.dynamicframe import DynamicFrame
DyF = DynamicFrame.fromDF(source_DataFrame, glueContext, "DyF")

# writing the file as usual in Glue. **I have given some partitions** too.
# keep "partitionKeys":[] in case of no partitions
output_Parquet = glueContext.write_dynamic_frame.from_options(frame = DyF, connection_type = "s3", format = "parquet", connection_options = {"path": destination_path + "/", "partitionKeys": ["department","team","card","datepartition"]}, transformation_ctx = "output_Parquet")

import boto3
client = boto3.client('s3')

#getting all the content/file inside the bucket. 
response = client.list_objects_v2(Bucket=bucket_name)
names = response["Contents"]

#Find out the file which have part-000* in it's Key
particulars = [name['Key'] for name in names if 'part-000' in name['Key']]

#Find out the prefix of part-000* because we want to retain the partitions schema 
location = [particular.split('part-000')[0] for particular in particulars]

#Constrain - copy_object has limit of 5GB.datepartition=20190131
for key,particular in enumerate(particulars):
    client.copy_object(Bucket=bucket_name, CopySource=bucket_name + "/" + particular, Key=location[key]+"newfile")
    client.delete_object(Bucket=bucket_name, Key=particular)

job.commit()

Cornerstone is it will fail in copying the file (copy_object) when it is higher than 5GB. You can use this

s3 = boto3.resource('s3')
for key,particular in enumerate(particulars):
    copy_source = {
        'Bucket': bucket_name,
        'Key': particular
    }
    s3.meta.client.copy(copy_source, bucket_name, location[key]+"newfile")