2

I am a rookie at both NiFi and Python and I need your help to pass the Flow File attribute value to the script. The script is converting a nested json into csv. When I run the script locally it works.

How can I pass the FlowFile name to src_json and tgt_csv?

Thanks,

Rosa

import pandas as pd
import json
from pandas.io.json import json_normalize

src_json = "C:/Users/name/Documents/Filename.json"
tgt_csv = "C:/Users/name/Documents/Filename.csv"

jfile = open(src_json)
jdata = json.load(jfile)

...rest of the code...
```python
RosaNegra
  • 39
  • 2
  • 8
  • you could use [ExecuteStreamCommand](https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.9.2/org.apache.nifi.processors.standard.ExecuteStreamCommand/index.html) to run your python script. and rework your script to read json from stdin and write csv to stdout. – daggett May 10 '19 at 14:32
  • May I know why do you need custom python script to convert json to csv when processors like 'ConvertRecord' are readily available to do this job? Just curious – arunkvelu May 10 '19 at 14:39
  • @Arun211 how do you propose to solve multiple json schemas? I tried JOLT but I couldn't get the correct output. For now, I have only one schema structure comming in, but in the future I will have many. Ref: https://stackoverflow.com/questions/56061222/how-to-convert-complicated-json-nested-array-with-jolt – RosaNegra May 14 '19 at 12:22

1 Answers1

1

You have a few options to accomplish this task.

  1. As Arun211 pointed out, there is an existing ConvertRecord processor which largely accomplishes this task. If your nested JSON is a problem or you have other reasons for wanting to do this in a Python script, continue below.
  2. If you have an existing Python script which performs this task as shown above, you'll need to invoke it from NiFi while providing the data to the script. You can use:
    1. ExecuteScript (better for prototyping) and InvokeScriptedProcessor (more performant for production tasks) allow you to run Python (actually Jython) scripts inside the NiFi instance. This gives you direct access to some convenience methods & functionality. However, because Jython cannot handle natively-compiled Python libraries, you will not be able to use pandas in this code. See here for instructions on configuring this processor and here for why pandas will not work.
    2. If you need pandas for some functionality, you'll need to save the script as a Python file on the local file system and invoke it as a shell command using ExecuteStreamCommand (if you need to provide input to this processor) or ExecuteProcess (if it's the first processor in your flow). These processors essentially run a shell command like python my_python_script_with_pandas.py -somearg (in ExecuteProcess) or python my_python_script_with_pandas.py with the flowfile content as STDIN (in ExecuteStreamCommand) and the output of STDOUT captured as the resulting flowfile content.

Currently your script is looking for the incoming JSON file in a static file location, and putting the resulting CSV in another static file location. You will need to change the script to do one of the following:

  1. Read those paths from command-line arguments and pass those in the relevant processor property in the processor you select. These properties can be populated from flowfile attributes, so you could do something like Command Arguments: -inputfile /path/to/some_existing_file.json -outputfile ${flowfile_attribute_named_output_file} or any combination thereof. Your script would then read the -inputfile and -outputfile arguments to determine the paths.
  2. Read the incoming data directly from STDIN example here. Then process the JSON data, convert it to CSV, and return it via STDOUT. NiFi will consume this data, put it as the content of the resulting flowfile, and send it to the next processor(s) in your flow.
  3. The prior two options keep your Python script independent from NiFi; it is unaware of any of the "flowfile" constructs. This option will make it NiFi-specific, but allow further functionality (see option 2.1 above). To write Python code that reads and writes directly from/to the flowfile content, see this example of ExecuteScript processor handling flowfile content in Python.
Andy
  • 13,916
  • 1
  • 36
  • 78
  • how would you suggest handling multiple incoming JSON schemas? – RosaNegra May 17 '19 at 12:46
  • like this? https://snag.gy/x4f1yv.jpg I'm confused when you say that the prior two keeps Nifi unaware of any of the "flow file" constructs. So, I can't pass the source_path_filename attribute into the ExecuteStreamCommand? – RosaNegra May 20 '19 at 08:07
  • It looks like passing the attributes in that way is fine, but you'll need to also set the *Command Path* to either invoke a shell script which calls Python directly, or provide the Python script name in the arguments as well, and call `python` as the command path. – Andy May 20 '19 at 17:22