3

I am using readCsvFile(path) function in Apache Flink api to read a CSV file and store it in a list variable. How does it work using multiple threads? For example, is it splitting the file based on some statistics? if yes, what statistics? Or does it read the file line by line and then send the lines to threads to process them?

Here is the sample code:

//default parallelism is 4
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
csvPath="data/weather.csv";
List<Tuple2<String, Double>> csv= env.readCsvFile(csvPath)
                        .types(String.class,Double.class)
                        .collect();

Suppose that we have a 800mb CSV file on local disk, how does it distribute the work between those 4 threads?

Oliv
  • 10,221
  • 3
  • 55
  • 76
Ehsan
  • 311
  • 4
  • 7

2 Answers2

2

The readCsvFile() API method internally creates a data source with a CsvInputFormat which is based on Flink's FileInputFormat. This InputFormat generates a list of so-called InputSplits. An InputSplit defines which range of a file should be scanned. The splits are then distributed to data source tasks.

So, each parallel task scans a certain region of a file and parses its content. This is very similar to how it is done by MapReduce / Hadoop.

Fabian Hueske
  • 18,707
  • 2
  • 44
  • 49
  • Thanks Fabian. But I want to know how it defines Splits? By file size? Number of lines or something else? And does it read the whole file at first then decide, or before reading does the split? – Ehsan Jan 11 '17 at 09:10
  • For `CsvInputFormat` the file is split by size. Reading the file in a single thread to split it would be pointless. Since a row might span over two splits, a reading threads starts with the first new line it finds and completes a line that was started in its split even if it crosses the split boundary. – Fabian Hueske Jan 11 '17 at 09:54
  • Ok. Let's say we have 200mb file and parallelism is set to 2. Thread1 should start from the beginning. Thread2 should start reading approximately from middle of the file. How does thread2 find out this location? How does Thread1 knows it reached the end of its portion and should stop? – Ehsan Jan 11 '17 at 10:40
  • Thread1 knows it reads from position 0 to position 100MB, Thread2 from position 100MB to 200MB. The threads simply skip to their starting position and keep track of how much their read. – Fabian Hueske Jan 11 '17 at 10:56
  • thanks again Fabian. By 'simply skip', do you mean skip lines until arriving to the start point(100mb)? if yes, how does it understand that it skipped first 100mb? – Ehsan Jan 12 '17 at 16:56
  • 2
    Skipping to a certain byte position of a file is basic filesystem functionality. – Fabian Hueske Jan 12 '17 at 19:34
1

This is the same as How does Hadoop process records split across block boundaries?

I extract some code from flink-release-1.1.3 DelimitedInputFormat file.

    // else ..
    int toRead;
    if (this.splitLength > 0) {
        // if we have more data, read that
        toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength;
    }
    else {
        // if we have exhausted our split, we need to complete the current record, or read one
        // more across the next split.
        // the reason is that the next split will skip over the beginning until it finds the first
        // delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
        // previous split.
        toRead = this.readBuffer.length;
        this.overLimit = true;
    }

It's clear that if it don't read line delimiter in one split, it will get another split to find.( I haven't find The corresponding code, I will try.)

Plus: the image below is how I find the code, from readCsvFile() to DelimitedInputFormat.

enter image description here

Community
  • 1
  • 1