0

I need to create a CSV output file that lists the minimum price, maximum price, and total row count from a data set. The expected output (CSV file) should have as column names: min_price, max_price and total_rows. How can I merge the output I have into a dataframe and then save it as CSV with these three columns?

So far I have done:

df=spark.read.parquet('mydataframe', inferSchema=True)

price=df.select('price')

max=df.agg({'price': 'max'}).show()
min=df.agg({'price': 'min'}).show()
df.count()
Natália Resende
  • 309
  • 1
  • 17

2 Answers2

1

Seems like a simple aggregation, you shouldn't split it but just select using single dataframe action.

result = (
price
.select(
     F.min('price').alias('minimum_price'),
     F.max('price').alias('maximum_price'), 
     F.count("*").alias("total_of_rows")
    )
)

Save the result to wherever you want - If you need it as a single csv file use coalesce(1) (price.coalesce(1).write.csv("your_path", header=True)

Benny Elgazar
  • 243
  • 2
  • 9
0

This answer was modified after @Benny Elgazar pointed out my solution had a lot of issues. Basically, it's his answer from below but with just working with an exemple:

df = spark.createDataFrame(
[(1.40,),
(1.43,),
(1.35,),
(1.38,),
(1.39,)]
,['price',])

Then, creating a result dataframe using collect

result = (
df
.select(
     F.min('price').alias('minimum_price'),
     F.max('price').alias('maximum_price'), 
     F.count("*").alias("total_of_rows")
    )
)

result.show()

+-------------+-------------+-------------+
|minimum_price|maximum_price|total_of_rows|
+-------------+-------------+-------------+
|         1.35|         1.43|            5|
+-------------+-------------+-------------+

Now, write the result df in a csv file:

result.coalesce(1).write.option("header", "true").csv("my_folder/", 'overwrite')

You cannot choose the name of the csv file. You can though rename it. Check this post for some ideas

Luiz Viola
  • 2,143
  • 1
  • 11
  • 30
  • This is a bad performant result. for two reasons: 1. You are doing 3 aggregations separately 2. You collect to the driver all the results. This can cause several issues, firstly the multiple needed aggregations that cause several scanning on the same data. Secondly when moving all the data to the driver which with a large dataset can kill your spark due to Out of memory. – Benny Elgazar Mar 08 '22 at 10:32
  • I changed to your code and referred to you below – Luiz Viola Mar 08 '22 at 11:11