1

In my scenario I have CSV files continuously uploaded to HDFS.

As soon as a new file gets uploaded I'd like to process the new file with Spark SQL (e.g., compute the maximum of a field in the file, transform the file into parquet). i.e. I have a one-to-one mapping between each input file and a transformed/processed output file.

I was evaluating Spark Streaming to listen to the HDFS directory, then to process the "streamed file" with Spark.

However, in order to process the whole file I would need to know when the "file stream" completes. I'd like to apply the transformation to the whole file in order to preserve the end-to-end one-to-one mapping between files.

How can I transform the whole file and not its micro-batches?

As far as I know, Spark Streaming can only apply transformation to batches (DStreams mapped to RDDs) and not to the whole file at once (when its finite stream has completed).

Is that correct? If so, what alternative should I consider for my scenario?

zero323
  • 322,348
  • 103
  • 959
  • 935
Andrea T. Bonanno
  • 221
  • 1
  • 4
  • 15

2 Answers2

3

I may have misunderstood your question the first try...

As far as I know, Spark Streaming can only apply transformation to batches (DStreams mapped to RDDs) and not to the whole file at once (when its finite stream has completed).

Is that correct?

No. That's not correct.

Spark Streaming will apply transformation to the whole file at once as was written to HDFS at the time Spark Streaming's batch interval elapsed.

Spark Streaming will take the current content of a file and start processing it.


As soon as a new file gets uploaded I need to process the new file with Spark/SparkSQL

Almost impossible with Spark due to its architecture which takes some time from the moment "gets uploaded" and Spark processes it.

You should consider using a brand new and shiny Structured Streaming or (soon obsolete) Spark Streaming.

Both solutions support watching a directory for new files and trigger Spark job once a new file gets uploaded (which is exactly your use case).

Quoting Structured Streaming's Input Sources:

In Spark 2.0, there are a few built-in sources.

  • File source - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.

See also Spark Streaming's Basic Sources:

Besides sockets, the StreamingContext API provides methods for creating DStreams from files as input sources.

File Streams: For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as:

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming will monitor the directory dataDirectory and process any files created in that directory (files written in nested directories not supported).

One caveat though given your requirement:

I would need to know when the "file stream" completes.

Don't do this with Spark.

Quoting Spark Streaming's Basic Sources again:

  • The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.

  • Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.

Wrapping up...you should only move the files to the directory that Spark watches when the files are complete and ready for processing using Spark. This is outside the scope of Spark.

Community
  • 1
  • 1
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • thank you for your answer, btw I need to remark the key point of my question. How can I transform the whole file and not its microbatches? That's why I wrote [quote] I would need to know when the "file stream" completes. I need to apply the transformation to the whole file in order to preserve the end-to-end one-to-one mapping between files. – Andrea T. Bonanno Jun 06 '17 at 11:28
  • @Andrea You need to clarify what determines a whole file. HDFS does not recognize "file streams". Each "part" of any file written to it will be recognized as a *whole file*. – OneCricketeer Jun 06 '17 at 13:47
0

You can use DFSInotifyEventInputStream to watch Hadoop dir and then execute Spark job programmatically when file is created.

See this post: HDFS file watcher

Sourav Gulati
  • 1,359
  • 9
  • 18
  • Spark Streaming can watch a folder. That class isn't necessary – OneCricketeer Jun 06 '17 at 08:51
  • How will you process file by file in Spark Streaming? What if two files are written at once? – Sourav Gulati Jun 07 '17 at 01:54
  • What do you mean file by file? Spark Streaming picks up all files that are *atomically moved to a target directory*, as stated in the documentation (copied in other answer), so two files are treated as two separate records – OneCricketeer Jun 07 '17 at 02:36
  • Read the question. The user wants to create one target file for one source file. – Sourav Gulati Jun 07 '17 at 02:53
  • Okay, and again, your answer isn't taking advantage of the builtin functionality of Spark Streaming and instead suggests a plain HDFS library – OneCricketeer Jun 07 '17 at 02:59
  • Spark Streaming does not work in this way. For a custom requirement, you should use something which can work instead of complicating things using built functionalities. So, I would request, please understand the context before rating an answer. – Sourav Gulati Jun 07 '17 at 04:45
  • First, I didn't downvote. Second, you've had the opportunity to explain to the OP in an edit to your answer why Spark doesn't work the way they are asking instead of arguing with me. I never said you're wrong. I simply stated not necessary – OneCricketeer Jun 07 '17 at 11:33
  • There is nothing to argue. I am just responding to you. – Sourav Gulati Jun 07 '17 at 14:32