0

I'm using a Jython InvokeScriptedProcessor to struct data from json struct to sql struct. I'm having trouble with a specific function. json.loads. json.loads does not recognize special characters like ñ, é, á, í...

It writes it in an odd form. And I've not reached any form to have it.

e.g. (very simple)

{"id":"ÑUECO","value":3.141592,"datetime":"....","location":"ÑUECO"}

If we try to write it in sql like

INSERT INTO .... (id, value) VALUES ("...",3.141592);

It will fail. It fails me. I cannot return data with any return option, success or failure, it doesn't matter NiFi's version. Here is my code

    def process(self, inputStream, outputStream):
        # read input json data from flowfile content
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        data = json.loads(text) 

Neither

        data = json.loads(text.encode("utf-8"))

works properly. text comes in unicode.

                def __generate_sql_transaction(input_data):
                    """ Generate SQL statement """

                    sql = """
        BEGIN;"""

                    _id = input_data.get("id")
                    _timestamp = input_data.get("timestamp")
                    _flowfile_metrics = input_data.get("metrics")
                    _flowfile_metadata = input_data.get("metadata")

                    self.valid = __validate_metrics_type(_flowfile_metrics)

                    if self.valid is True:
                        self.log.error("generate insert")
                        sql += """
            INSERT INTO
                {0}.{1} (id, timestamp, metrics""".format(schema, table)

                        if _flowfile_metadata:
                            sql += ", metadata"
                        sql += """)
            VALUES
                ('{0}', '{1}', '{2}'""".format(_id.encode("utf-8"), _timestamp, json.dumps(_flowfile_metrics))

                        self.log.error("generate metadata")
                        if _flowfile_metadata:
                            sql += ", '{}'".format(json.dumps(_flowfile_metadata).encode("utf-8"))
                        sql += """)
            ON CONFLICT ({})""".format(on_conflict)

                        if not bool(int(self.update)):
                            sql += """
                DO NOTHING;"""
                        else:
                            sql += """
                DO UPDATE
                    SET"""
                            if bool(int(self.preference)):
                                sql += """
                        metrics = '{2}' || {0}.{1}.metrics;""".format(schema, table, json.dumps(_flowfile_metrics))
                            else:
                                sql += """
                        metrics = {0}.{1}.metrics || '{2}';""".format(schema, table, json.dumps(_flowfile_metrics))

                    else:
                        return ""

                    sql += """
        COMMIT;"""
                    return sql

I send the data to NiFi again with:

        output = __generate_sql_transaction(data)
        self.log.error("post generate_sql_transaction")
        self.log.error(output.encode("utf-8"))

        # If no sql_transaction is generated because requisites weren't met,
        # set the processor output with the original flowfile input.
        if output == "":
            output = text

        # write new content to flowfile
        outputStream.write(
            output.encode("utf-8")
        )

That output seems like

INSERT INTO .... VALUES ("ÃUECO","2020-01-01T10:00:00",'{"value":3.1415}','{"location":"\u00d1UECO"}');

I have "Ñueco" also in metadata, and it doesn't works fine with id nor metadata

NOTE: It seems that InvokeScriptedProcessor works fine using Groove instead of Python. But my problem is I know nothing about Groovy...

Does anybody found a similar issue? How did you solve it?

Update:

Input Example:

{"id":"ÑUECO",
 "metrics":{
     "value":3.1415
 },
 "metadata":{
     "location":"ÑUECO"
 },
 "timestamp":"2020-01-01 00:00:00+01:00"
}

Desired Output:

BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÑUECO', 
        '2020-01-01T00:00:00+01:00',
        '{"value":3.1415}',
        '{"location":"ÑUECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
    SET
        metrics='{"value":3.1415}' || Table.metrics;
COMMIT;

Real Output:

BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÃUECO', 
        '2020-01-01T00:00:00+01:00',
        '{"value":3.1415}',
        '{"location":"\u00d1UECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
    SET
        metrics='{"value":3.1415}' || Table.metrics;
COMMIT;
Krakenudo
  • 182
  • 1
  • 17
  • How you are writing data and where? You shared only reading part of code.. – daggett Sep 29 '21 at 12:15
  • I'm writing data inside a new flowfile in NiFi, the output has been extracted from the output flowfile – Krakenudo Sep 29 '21 at 12:35
  • `\u00d1` is a right representation of `Ñ` in json format - https://www.compart.com/en/unicode/U+00D1 – daggett Sep 29 '21 at 13:10
  • also i can't understand why you have `.encode("utf-8")` over all the code. – daggett Sep 29 '21 at 13:12
  • please share example input and expected output – daggett Sep 29 '21 at 13:12
  • Updated. I use .encode("utf-8") because I'm desperate, because I can't understand why Jython treats Ñ like à and \u00d1 if I try to return a string, I've tried to put _id as _id, str(_id), _id.decode("utf-8"), _id.encode("utf-8") and more – Krakenudo Sep 29 '21 at 15:14
  • It could be fine if you know any way I can run python3 inside an InvokeScriptedProcessor – Krakenudo Sep 29 '21 at 15:20
  • I'm having the same issues with python2.7. It's not a problem of NiFi, but Jython – Krakenudo Sep 29 '21 at 15:21
  • If you don't see this behaviour in Groovy, use Groovy. It is a simplified Java syntax that is incredibly easy to get started with, it'll take some getting used to but it won't be difficult. It is worth learning and your use case is not complex, so it should be fairly standard stuff. Groovy usually outperforms Jython in NiFi, and in my experience is much more stable. – Sdairs Sep 29 '21 at 17:01

2 Answers2

0

UPD

  1. jython does not work correctly with byte-strings - so, don't use .encode('utf-8')

  2. use java methods to write content back to flow file with specific encoding

below is a sample that reads and writes correctly non-ascii chars including Ñ

use ExecuteScript processor with jython and replace body of _transform(text) function:

import traceback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class FlowWriter(StreamCallback):
    def _transform(self, text):
        # transform incoming text here
        return '@@@@' + text + '****'

    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        new_text = self._transform(text)
        IOUtils.write(new_text, outputStream, StandardCharsets.UTF_8)

flowFile = session.get()
if flowFile != None:
    try:
        flowFile = session.write(flowFile, FlowWriter())
        flowFile = session.putAttribute(flowFile, "filename", 'headerfile.xml')
        session.transfer(flowFile, REL_SUCCESS)
        session.commit()
    except Exception as e:
        log.error("{}\n{}".format(e,traceback.format_exc()))
        session.rollback(True)  # put file back and penalize it
daggett
  • 26,404
  • 3
  • 40
  • 56
  • It doesn't work. Even I've tried with ExecuteStreamCommand, saying which python it shall use. But with the same issues. Now I don't know where is the error, maybe is with NiFi, the way it sends 'Ñ' to Python... – Krakenudo Oct 01 '21 at 07:28
  • @Krakenudo, i updated the question with the python code that works in nifi ExecuteScript/jython processor – daggett Oct 06 '21 at 21:40
  • Thanks, @daggett, but this solution may work in a general case. But in my problem, I have to convert string to json, and I think there is the problem is with json and its methods. I've found (right now) this explanation to my own question – Krakenudo Oct 07 '21 at 08:58
  • https://stackoverflow.com/a/35882335/7634711 – Krakenudo Oct 07 '21 at 08:59
  • I'll have this kind of "issue", because it's not an issue itself. It's how it works. Finally, I've take the keys ("Ñueco") and its metadata ({"Location":"Ñueco"}) and I've save it in a database. When data comes to NiFI, I ask to database the ID related with "Ñueco" and its metadata, and works only with metrics and limited metadata (with non special characters), and replace ID and location. – Krakenudo Oct 07 '21 at 09:01
0

I've recently found this answer.

https://stackoverflow.com/a/35882335/7634711

It's not a problem with NiFi. It's a problem with Python2 and how it works with json library. And the problems will be also in Python3 if special characters come in dict keys.

Krakenudo
  • 182
  • 1
  • 17