2

I'm using spark structured streaming to process data from a streaming data source, and I'm using a file sink. Data will be put into hdfs after processing.

I've got a problem that output file is something like part-00012-8d701427-8289-41d7-9b4d-04c5d882664d-c000.txt. This makes me impossible get files output during last hour.

Is is possible to customize the output file into timestamp_xxx or something like this? Or, can I output into different path by each batch?

zero323
  • 322,348
  • 103
  • 959
  • 935
skyer CC
  • 23
  • 1
  • 3

2 Answers2

7

You can not change the name of the saved files. However, you can change the folder structure of where it is saved. Use partitionBy() to partition the data after specified columns in the dataset, in this case year, month, day and hour could be of interest:

df.writeStream 
  .format("parquet") // can be "orc", "json", "csv", etc.
  .option("path", "/path/to/save/") 
  .partitionBy("year", "month", "day", "hour") 
  .start() 

This will create a folder structure starting from the path which could look as follows:

year=2018
|
|--> month=06
|    |
|    |--> day=26
|    |    |
|    |    |--> hour=10
|    |    |--> hour=11
|    |    |--> ...
|    |
|    |--> day=27
|    |    |
|    |    |--> ...

Of course, other columns could be used to partition the files depending on what is avaiable.

Shaido
  • 27,497
  • 23
  • 70
  • 73
  • Is it possible using text file format to do this like ` .format("text").option("path", "some hdfs path").partitionBy()` – skyer CC Jun 27 '18 at 08:18
  • @skyerCC: Yes, you can. Try changing `parquet` to `csv` and you will get a comma separated columns in a text file. – Shaido Jun 27 '18 at 08:21
0

I believe this file format is an internal thing that is used by Spark for storing down the values for each partition. If you are using some sort of blob store (sorry I am windows user) you should still just be able to load the files back from output location and then work on them again using DataFrame.

What I am trying to say is although you don't have much say in the file names, as that is something Spark does itself it should not stop you from creating your own workflow where you batch stuff where you would look inside the files for some timestamp (I am assuming the out file contents has some sort of DataTime column, if it doesn't may be a good idea to add one)

That is how I would proceed with things, make the timestamp part of the file contents, and then you can use the actual file contents (as I say read into DataFrame say) and then just use normal DataFrame / Map operations on the loaded output data

I kind of roughly talk about this here.

Nazim Kerimbekov
  • 4,712
  • 8
  • 34
  • 58
sacha barber
  • 2,214
  • 1
  • 24
  • 37
  • My problem is not to process all data under the output path because files output continue(about 100 files per batch). I want to merge these outputs into one dir hourly using map/reduce. That batch time nor repartition counts is changeable as resource is limited. That makes more than 1000 files output per hour, and makes hadoop input argument too long as I have to join each file one by one into a string. – skyer CC Jun 27 '18 at 07:39
  • I think with what I show here and what is shown below by @shaido kind of points to how you can create files with some sort of custom name, and then you should be able to use the glob syntax as described here : https://stackoverflow.com/questions/31782763/how-to-use-regex-to-include-exclude-some-input-files-in-sc-textfile. This would allow you to ONLY load the batch you want. This manual kind of shows you : https://books.google.com.vn/books?id=Wu_xeGdU4G8C&lpg=PA238&ots=i8wUUFRcZw&pg=PA65#v=onepage&q&f=false – sacha barber Jun 27 '18 at 07:58
  • Using the glob syntax would allow you to do something like this `sc.textFile("/user/Orders/2015072[7-9]*,/user/Orders/2015073[0-1]*")` – sacha barber Jun 27 '18 at 08:10
  • 1
    Thank you for your advice, I'll try to use @shaido's method see what happens. – skyer CC Jun 27 '18 at 08:33