I am trying to create a Python script in NiFi that:
- Reads some attributes from an incoming flowfile
- Read the json content of the flowfile & extract specific fields
- Write attributes to outgoing flowfile
- Overwrite incoming flowfile with new content that is created in the script (e.g. API call that returns new json) and send it to SUCCESS relationship OR remove the old flowfile and create new with desired content
What i ve done so far:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback
class OutputWrite(OutputStreamCallback, obj):
def __init__(self):
self.obj = obj
def process(self, outputStream):
outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))
###end class###
flowfile = session.get()
if flowfile != None:
**#1) Get flowfile attributes**
headers = {
'Accept-Encoding': 'gzip, deflate, br',
'Accept': 'application/json, text/plain, */*',
'Cache-Control': 'no-cache',
'Ocp-Apim-Trace': 'true',
'Authorization': flowfile.getAttribute('Authorization')
}
collection = flowfile.getAttribute('collection')
dataset = flowfile.getAttribute('dataset')
**#2)Get flowfile content**
stream_content = session.read(flowfile)
text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
json_content = json.loads(text_content)
records = json_content['result']['count']
pages = records/10000
**#3) Write flowfile attributes**
flowfile = session.putAttribute(flowfile, 'collection', collection)
flowfile = session.putAttribute(flowfile, 'dataset', dataset)
**#API operations: output_json with desired data**
output_json = {some data}
**#4) Write final JSON data to output flowfile**
flowfile = session.write(flowfile, OutputWrite(output_json))
session.transfer(flowfile, REL_SUCCESS)
session.commit()
My problem is that i can't find a way to pass a reference to the desired output_json object as an argument in the OutputStreamCallback class. Any ideas on how to resolve this or maybe a better approach?
Is it maybe easier to perform all API operations in this case within the process function of the class, but then how do i get access to the incoming flowfile attributes within the process function (requires a session or a flowfile object) ?
Any help much appreciated!