7

I need to execute something like: sed '1d' simple.tsv > noHeader.tsv

which will remove first line from my big flow file (> 1 GB).

The thing is - I need to execute it on my flow file, so it'd be:

sed '1d' myFlowFile > myFlowFile

Question is: how I should configure the ExecuteStreamCommand processor so that it runs the command on my flow file and returns it back to my flow file? If sed is not a best option, I can consider doing this other way (e.g. tail)

ExecuteStreamCommand processor

Thanks, Michal

Edit 2 (Solution):

Below is the final ExecuteStreamCommand config that does what I need (remove 1st line from the flow file). @Andy - thanks a lot for all the precious hints. ExecuteStreamCommand - remove 1st line from the flow

michalrudko
  • 1,432
  • 2
  • 16
  • 30

3 Answers3

8

Michal,

I want to make sure I'm understanding your problem correctly, because I think there are better solutions.

Problem:

You have a 1GB TSV loaded into NiFi and you want to remove the first line.

Solution:

If your file was smaller, the best solution would be to use a ReplaceText processor with the following processor properties:

  • Search Value: ^.*\n
  • Replacement Value: <- empty string

That would strip the first line out without having to send the 1GB content out of NiFi to the command-line and then re-ingest the results. Unfortunately, to use a regular expression, you need to set a Maximum Buffer Size, which means the entire contents need to be read into heap memory to perform this operation.

With a 1GB file, if you know the exact value of the first line, you should try ModifyBytes which allows you to trim a byte count from the beginning and/or end of the flowfile contents. Then you could simply instruct the processor to drop the first n bytes of the content. Because of NiFi's copy-on-write content repository, you will still have ~2GB of data, but it does it in a streaming manner using an 8192B buffer size.

My best suggestion is to use an ExecuteScript processor. This processor allows you to write custom code in a variety of languages (Groovy, Python, Ruby, Lua, JS) and have it execute on the flowfile. Using a Groovy script like the one below, you could remove the first line and copy the remainder in a streaming fashion so the heap does not get unnecessarily taxed.

I tested this with 1MB files and it took about 1.06 seconds for each flowfile (MacBook Pro 2015, 16 GB RAM, OS X 10.11.6). On a better machine you'll obviously get better throughput, and you can scale that up to your larger files.

def flowfile = session.get()
if (!flowfile) return

try {
    // Here we are reading from the current flowfile content and writing to the new content
    flowfile = session.write(flowfile, { inputStream, outputStream ->
        def bufferedReader = new BufferedReader(new InputStreamReader(inputStream))

        // Ignoring the first line
        def ignoredFirstLine = bufferedReader.readLine()

        def bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream))
        def line

        int i = 0

        // While the incoming line is not empty, write it to the outputStream
        while ((line = bufferedReader.readLine()) != null) {
            bufferedWriter.write(line)
            bufferedWriter.newLine()
            i++
        }

        // By default, INFO doesn't show in the logs and WARN will appear in the processor bulletins
        log.warn("Wrote ${i} lines to output")

        bufferedReader.close()
        bufferedWriter.close()
    } as StreamCallback)

    session.transfer(flowfile, REL_SUCCESS)
} catch (Exception e) {
    log.error(e)
    session.transfer(flowfile, REL_FAILURE)
}

One side note, in general a good practice for NiFi is to split giant text files into smaller component flowfiles (using something like SplitText) when possible to get the benefits of parallel processing. If the 1GB input was video, this wouldn't be applicable, but as you mentioned TSV, I think it's likely that splitting the initial flowfile into smaller pieces and operating on them in parallel (or even sending out to other nodes in the cluster for load balancing) may help your performance here.

Edit:

I realized I did not answer your original question -- how to get the content of a flowfile into the ExecuteStreamCommand processor command-line invocation. If you wanted to operate on the value of an attribute, you could reference the attribute value with the Expression Language syntax ${attribute_name} in the Arguments field. However, as the content is not referenceable from the EL, and you don't want to destroy the heap by moving the 1GB content into an attribute, the best solution there would be to write the contents out to a file using PutFile, run the sed command against the provided filename and write it to another file, and then use GetFile to read those contents back into a flowfile in NiFi.

Edit 2:

Here is a template which demonstrates using ExecuteStreamCommand with both rev and sed against flowfile content and putting the output into the content of the new flowfile. You can run the flow and monitor logs/nifi-app.log to see the output or use the data provenance query to examine the modification that each processor performs.

ExecuteStreamCommand Example

ExecuteStreamCommand Configuration

Andy
  • 13,916
  • 1
  • 36
  • 78
  • thank you for such an exhausting and detailed answer - appreciate it! Let me refer to it: 1) Replace Text - I tried this with smaller files and it works, however when running this processor in Entire Text mode I get memory out of bounds exception. So, as you noticed, it's not the best option here. 2) Modify Bites - no, the content of the fist line will be different for different files. I need to have the line removed regardless of its content - in other words: replace by "empty string" whatever that matches (^.*)\n regex. – michalrudko Feb 27 '17 at 18:12
  • 3) Execute Script - thanks for this piece of code, but custom code blocks are a last resort (difficult to maintain etc.). I'd rather use one of the shell commands (like e.g. sed) and thought that ExecuteStreamCommand processor would do this job as it allows running OS commands on the flow file (without saving it to the attributes) but I don't know how to pass the flow file as a parameter in this processor. Any suggestions? – michalrudko Feb 27 '17 at 18:13
  • 4) PutFile + GetFile - it's an option but I'd prefer not to disconnect the flow, I am still looking for a more "elegant" way of sorting out this challenge 5) SplitText - I am actually doing a pre-processing for SplitText, this whole parallelization will come next in my flow If you have any more suggestions, I'd grateful to hear them, thanks again! – michalrudko Feb 27 '17 at 18:13
  • If you want to avoid custom code, which would still be my suggestion in this case, I recommend you open a [Jira feature request](https://issues.apache.org/jira/browse/NIFI) for NiFi with the details you documented here. I agree there should be an out-of-the-box solution for this. As for passing the flowfile content as a parameter to the command line invocation, there isn't a good solution in this case because flowfile content is not referenceable from the Expression Language and you don't want to load the 1GB content into an attribute, which will destroy the heap. – Andy Feb 27 '17 at 18:27
  • Andy, I still would like to understand better why ExecutreStreamCommand would not be a good solution here, from what I read it DOES NOT save the flow file as an attribute. What it does is "Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command". I just would like to know HOW to achieve it using my sed command (could be in shell script). If this is still not an option, I'd go for a custom code solution. Thanks! – michalrudko Feb 27 '17 at 19:29
  • 1
    I updated my answer with an example that puts the flowfile content onto the command line with `sed` and gets the output into the contents of the next flowfile. – Andy Feb 27 '17 at 20:31
  • Your latest post gave me a good hint how I should use the ExecuteStreamCommand processor. It was simpler than I thought, however I could not find any good example in the web before. Thanks a lot for your support and time - marking this as a final solution and updating my post with final configuration – michalrudko Feb 28 '17 at 00:50
  • @Andy Do you know how to pass flow file content to groovy command which is running java jar within execute script processor :- https://community.hortonworks.com/questions/75977/run-java-code-in-apache-nifi.html?childToView=173976#comment-173976 – donald Feb 20 '18 at 12:13
1

Since you want to remove header from your file so I think to use StripHeader processor would be better option.

Ankit

Ankit Tripathi
  • 325
  • 2
  • 12
  • Ankit, where do you see that processor? It is not listed on the [NiFi Documentation](https://nifi.apache.org/docs.html). – Andy Feb 27 '17 at 16:05
  • @Ankit, I was about to ask the same question as Andy, I cannot see such processor neither. Could you explain a bit more your tip? Thanks – michalrudko Feb 27 '17 at 17:43
  • 1
    Just Noticed it is not their but you can build you own stripheader processor. For logic you can refer below link:- https://github.com/KyloIO/kylo/blob/f66903fe61e5f968856f8e159b50e190de4aa5ca/integrations/nifi/nifi-nar-bundles/nifi-core-bundle/nifi-core-processors/src/main/java/com/thinkbiganalytics/nifi/v2/ingest/StripHeader.java – Ankit Tripathi Mar 01 '17 at 07:05
1

When you need to process the data in the CSV any further, I would suggest taking a look at the CSVReader for record processing. Very powerful.

CSVReader Properties - Treat First Line as Header with Info

The property "Treat First Line as Header" in combination with the "Ignore CSV Header Column Names" allows you to deal with the first line.