4

Is there a way to simultaneously write to different streams in a custom processor in NiFi? For instance I have third party libraries that do significant processing using APIs that work something like this:

public void process(InputStream in, OutputStream foo, OutputStream baa, List<String> args)
{
    ...
    foo.write(things);
    baa.write(stuff);
    ...
}

But the only examples I can find all just use one output stream:

FlowFile transform = session.write(original, new OutputStreamCallback() {
        @Override
        public void process(OutputStream out) throws IOException {
            out.write("stuff");
        }
    });

Processing is done in batches, (due to its large scale), so its not practical to perform all the processing then write out the separate flows.

The only way I can come up with is process the input multiple times :(

To clarify, I want to write to multiple FlowFiles, using the session.write(flowfile, callback) method, so the different streams can be sent/managed separately

Vogel612
  • 5,620
  • 5
  • 48
  • 73
foobarking
  • 43
  • 1
  • 5
  • Is the use of [TeeOutputStream](https://commons.apache.org/proper/commons-io/javadocs/api-2.4/org/apache/commons/io/output/TeeOutputStream.html) out of the question ? See : http://stackoverflow.com/questions/7987395/how-to-write-data-to-two-java-io-outputstream-objects-at-once – GPI Jul 11 '16 at 09:12
  • I dont think so, the only way I know of writing to a flowfile is using the OutputStreamCallback, which has only one function (process), which takes only one argument (an OutputStream). – foobarking Jul 11 '16 at 10:32
  • Yes, but a TeeOutputStream allows you to have 1 stream that writes to 2 separate files, isn(t that enough ? – GPI Jul 11 '16 at 10:50
  • I don't believe so, TeeOutputStream writes the same thing to both streams, my functions do not (such as "things" and "stuff" in my example). Thanks – foobarking Jul 11 '16 at 10:58
  • Also, I want to write to multiple _flowfiles_ (this isn't clear, i'll update the question) using the session.write(flowfile, callback) method. – foobarking Jul 11 '16 at 11:04

2 Answers2

4

The NiFi API is based on acting upon one flow file at a time, but you should be able to do something like this:

        FlowFile flowFile1 = session.create();
        final AtomicReference<FlowFile> holder = new AtomicReference<>(session.create());

        flowFile1 = session.write(flowFile1, new OutputStreamCallback() {
            @Override
            public void process(OutputStream out) throws IOException {

                FlowFile flowFile2 = session.write(holder.get(), new OutputStreamCallback() {
                    @Override
                    public void process(OutputStream out) throws IOException {

                    }
                });
                holder.set(flowFile2);

            }
        });
Bryan Bende
  • 18,320
  • 1
  • 28
  • 39
  • Thanks this works. My only change would be make the first OutputStream a different name (and final) so you can write to both in the inner-inner function. – foobarking Jul 13 '16 at 00:13
3

Since you're making different outputs from the same input you might also consider having these steps be broken out as discrete processors that focus on doing their specific function. Above you show "things" and "stuff" so for example I'm suggesting you have a 'DoThings' and 'DoStuff' processor. In your flow you can send the same flowfile to both by simply using the source connection twice. This then enables nice parallel operations and allows them to have different runtimes/etc. NiFi will still maintain the provenance trail for you and it won't actually be copying the bytes at all but rather passing a pointer to the original content.

Joe Witt
  • 2,152
  • 10
  • 9
  • I agree this is has its advantages, but if there is a large amount of processing done per input stream, it feels wasteful to throw it all away and do it again (or twice in parallel). Its a trade-off. Thanks though. – foobarking Jul 13 '16 at 00:16
  • Could you do the intermediate (or "shared") processing in a single processor and then split to two subsequent processors when the actions differ? Nothing says that the flowfile content had to be in a certain state when processor A finishes, as long as B and B' can both accept the output of A as input. – Andy Jul 16 '16 at 03:59