I'm using a Jython InvokeScriptedProcessor to struct data from json struct to sql struct. I'm having trouble with a specific function. json.loads. json.loads does not recognize special characters like ñ, é, á, í...
It writes it in an odd form. And I've not reached any form to have it.
e.g. (very simple)
{"id":"ÑUECO","value":3.141592,"datetime":"....","location":"ÑUECO"}
If we try to write it in sql like
INSERT INTO .... (id, value) VALUES ("...",3.141592);
It will fail. It fails me. I cannot return data with any return option, success or failure, it doesn't matter NiFi's version. Here is my code
def process(self, inputStream, outputStream):
# read input json data from flowfile content
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
data = json.loads(text)
Neither
data = json.loads(text.encode("utf-8"))
works properly. text comes in unicode.
def __generate_sql_transaction(input_data):
""" Generate SQL statement """
sql = """
BEGIN;"""
_id = input_data.get("id")
_timestamp = input_data.get("timestamp")
_flowfile_metrics = input_data.get("metrics")
_flowfile_metadata = input_data.get("metadata")
self.valid = __validate_metrics_type(_flowfile_metrics)
if self.valid is True:
self.log.error("generate insert")
sql += """
INSERT INTO
{0}.{1} (id, timestamp, metrics""".format(schema, table)
if _flowfile_metadata:
sql += ", metadata"
sql += """)
VALUES
('{0}', '{1}', '{2}'""".format(_id.encode("utf-8"), _timestamp, json.dumps(_flowfile_metrics))
self.log.error("generate metadata")
if _flowfile_metadata:
sql += ", '{}'".format(json.dumps(_flowfile_metadata).encode("utf-8"))
sql += """)
ON CONFLICT ({})""".format(on_conflict)
if not bool(int(self.update)):
sql += """
DO NOTHING;"""
else:
sql += """
DO UPDATE
SET"""
if bool(int(self.preference)):
sql += """
metrics = '{2}' || {0}.{1}.metrics;""".format(schema, table, json.dumps(_flowfile_metrics))
else:
sql += """
metrics = {0}.{1}.metrics || '{2}';""".format(schema, table, json.dumps(_flowfile_metrics))
else:
return ""
sql += """
COMMIT;"""
return sql
I send the data to NiFi again with:
output = __generate_sql_transaction(data)
self.log.error("post generate_sql_transaction")
self.log.error(output.encode("utf-8"))
# If no sql_transaction is generated because requisites weren't met,
# set the processor output with the original flowfile input.
if output == "":
output = text
# write new content to flowfile
outputStream.write(
output.encode("utf-8")
)
That output seems like
INSERT INTO .... VALUES ("ÃUECO","2020-01-01T10:00:00",'{"value":3.1415}','{"location":"\u00d1UECO"}');
I have "Ñueco" also in metadata, and it doesn't works fine with id nor metadata
NOTE: It seems that InvokeScriptedProcessor works fine using Groove instead of Python. But my problem is I know nothing about Groovy...
Does anybody found a similar issue? How did you solve it?
Update:
Input Example:
{"id":"ÑUECO",
"metrics":{
"value":3.1415
},
"metadata":{
"location":"ÑUECO"
},
"timestamp":"2020-01-01 00:00:00+01:00"
}
Desired Output:
BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÑUECO',
'2020-01-01T00:00:00+01:00',
'{"value":3.1415}',
'{"location":"ÑUECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
SET
metrics='{"value":3.1415}' || Table.metrics;
COMMIT;
Real Output:
BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÃUECO',
'2020-01-01T00:00:00+01:00',
'{"value":3.1415}',
'{"location":"\u00d1UECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
SET
metrics='{"value":3.1415}' || Table.metrics;
COMMIT;