I want to separate the Bad JSON records from the flowfile and my NiFi job should continue processing the Good JSON records. I checked the "ValidateRecord" processor. But since the JSON structure itself is wrong for few records (e.g., "CT":"UTF-8""), NiFi transferring the entire flowfile to Failure relationship. Since am already using a Groovy script to parse the JSON to CSV, I am thinking of writing the error records to a separate flowfile while parsing in the same Groovy script. But am struggling to modify as am new to Groovy. Could anyone help?
In case of any error in parsing, then it should write to "failure" relationship flowFile otherwise "success" relationship flowFile. something like..
try {
def jsonRecord = new JsonSlurper().parseText(jsonReplace)
}
catch (e)
write to error file
Sample Bad JSON:
{"transaction":{"TS":"2020-02-04 07:04:57.437002","V":"v8","PID":"p1125","RS":"370","SR":"420","CnID":"0/6","CIPG":{"CIP":"10.67.112.35","CP":"38212","SLP":"38212","SLEP":"38212","CVID":"410"},"SIPG":{"SIP":"54.93.254.234","SP":"80","InP":"8080","SVID":"420"},"TG":{"T":"158836","R":"158836","C":"0","SDL":"0","DL":"0","I:R":"2:0","UAP":"0","EDBL":"0","Ca":"0","A":"0","RQM":"0","RSM":"0","FIT":"0","CSR":"158836"},"AS":"454","OS":"460","CPr":"-1","CVB":"0","CS":"MISS","HS":"200","OF":{"Flag1":"-","Flag2":"-","Flag3":"-","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"-","Flag9":"-","Flag10":"-","Flag11":"-","Flag12":"-","Flag13":"-","Flag14":"-","Flag15":"-","Flag16":"-","Flag17":"-","Flag18":"-","Flag19":"-","Flag20":"-","Flag21":"-","Flag22":"-","Flag23":"-"},"SF":{"Flag1":"A","Flag2":"M","Flag3":"H","Flag4":"-","Flag5":"C","Flag6":"-","Flag7":"-","Flag8":"F","Flag9":"-","Flag10":"-","Flag11":"C","Flag12":"-","Flag13":"-","Flag14":"-","Flag15":"-","Flag16":"-","Flag17":"-","Flag18":"-","Flag19":"-","Flag20":"-","Flag21":"-","Flag22":"S","Flag23":"c","Flag24":"-"},"GF":{"Flag1":"r","Flag2":"-","Flag3":"s","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"-","Flag9":"-","Flag10":"k","Flag11":"-","Flag12":"S","Flag13":"C","Flag14":"-","Flag15":"-","Flag16":"M","Flag17":"T","Flag18":"A","Flag19":"H","Flag20":"S","Flag21":"H","Flag22":"-","Flag23":"S","Flag24":"-","Flag25":"-","Flag26":"j","Flag27":"-","Flag28":"-","Flag29":"-","Flag30":"L","Flag31":"-","Flag32":"-","Flag33":"-","Flag34":"-","Flag35":"-"},"SUNR":"0","SUNS":"460","ML":"-","VSL":{"VSID":"0","TC":"-","MTC":"-","NRTC":"-","ET":"-","HRES":"0","VRES":"0","FS":"0","FR":"0.0","VSD":"0","ACB":"-","ASB":"-","VPR":"-","VSST":"-"},"MT":"-","TCPSL":"54/54/54 21 1/1 0.0/0.0","CT":"text/javascript; charset="UTF-8"","MS":"44XXXXXXXXXX","MSH":"7ogzX50IJtbIy44dRxi9PbNPlwCJ5TppUpX+Dk72TdU=","SID":"v4.172.16.137.150:9EE6E276BA938B46","SuID":"-","UA":"okhttp/3.9.1","DID":"Default-Android-Web-Unknown","UAG":"Android-Web","CID":"TECH","HR":"-","CRG":"3001010006000000","CP1":"8232F401009032F40107ADE76E","CP2":"3544470718998600","AIDF":"0:0","UCB":"0/0","CLID":"-","CLCL":"0","OPTS":"-","PUAG":"Android-Web","SSLIL":"-","HRU":{"HM":"GET","HD":"pubsub.pubnub.com","HP":"/v2/subscribe/sub-c-ee729d78-6233-11e2-b80b-12313f022c90/uk8_16620/0","HQ":"requestid=57e020bb-bc6a-47bf-8329-314f4242a506&heartbeat=300&tr=12&tt=15807995127999216&pnsdk=PubNub-Java-Unified/4.19.0&uuid=a311a598-430f-4758-a75e-a045132c926d"},"URLF":{"CID":"-","CGID":"-","CR":"-","RA":"-","USM":"-","USP":"-","MUS":"-"},"TCPSt":{"WS":"0","SE":"0","WSFNS":"0","WSF":"0","EM":"0","RSTE":"0","MSS":"0"},"NS":{"OPID":"-","ODID":"-","EPID":"-","TrID":"-","VSN":"-","LSUT":"-","STTS":"-","TCPPR":"-"},"CQA":{"NL":"-","CL":"-","CLC":"-","SQ":"-","SQC":"-"}}}
Sample Good JSON:
{"transaction":{"TS":"2020-02-04 07:04:59.942141","V":"v8","PID":"p36489","RS":"578","SR":"649","CnID":"0/1","CIPG":{"CIP":"10.65.204.71","CP":"33602","SLP":"33602","SLEP":"33602","CVID":"410"},"SIPG":{"SIP":"5.62.38.137","SP":"80","InP":"80","SVID":"420"},"TG":{"T":"363","R":"363","C":"0","SDL":"0","DL":"0","I:R":"2:0","UAP":"0","EDBL":"0","Ca":"0","A":"0","RQM":"0","RSM":"0","FIT":"0","CSR":"362"},"AS":"254","OS":"254","CPr":"0","CVB":"0","CS":"MISS","HS":"200","OF":{"Flag1":"-","Flag2":"-","Flag3":"-","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"-","Flag9":"-","Flag10":"-","Flag11":"-","Flag12":"-","Flag13":"-","Flag14":"-","Flag15":"-","Flag16":"-","Flag17":"-","Flag18":"-","Flag19":"-","Flag20":"-","Flag21":"-","Flag22":"-","Flag23":"-"},"SF":{"Flag1":"A","Flag2":"M","Flag3":"H","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"F","Flag9":"-","Flag10":"-","Flag11":"C","Flag12":"-","Flag13":"-","Flag14":"-","Flag15":"-","Flag16":"-","Flag17":"F","Flag18":"-","Flag19":"-","Flag20":"-","Flag21":"-","Flag22":"G","Flag23":"G","Flag24":"-"},"GF":{"Flag1":"n","Flag2":"-","Flag3":"b","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"-","Flag9":"-","Flag10":"k","Flag11":"-","Flag12":"S","Flag13":"C","Flag14":"-","Flag15":"-","Flag16":"M","Flag17":"T","Flag18":"A","Flag19":"!","Flag20":"-","Flag21":"H","Flag22":"-","Flag23":"S","Flag24":"-","Flag25":"-","Flag26":"5","Flag27":"-","Flag28":"-","Flag29":"-","Flag30":"L","Flag31":"-","Flag32":"-","Flag33":"-","Flag34":"-","Flag35":"-"},"SUNR":"0","SUNS":"254","ML":"-","VSL":{"VSID":"0","TC":"-","MTC":"-","NRTC":"-","ET":"-","HRES":"0","VRES":"0","FS":"0","FR":"0.0","VSD":"0","ACB":"-","ASB":"-","VPR":"-","VSST":"-"},"MT":"-","TCPSL":"30/30/30 21 1/1 0.0/0.0","CT":"application/octet-stream","MS":"44XXXXXXXXXX","MSH":"rPD7X2tPXvDdbRPEeEjIlvpggJcu/UPStkSEgxepvB8=","SID":"v4.172.16.137.150:9EE6E2767E92B7A2","SuID":"-","UA":"Apache-HttpClient/UNAVAILABLE (Java/0)","DID":"Jakarta Commons-HttpClient#2#App#Library-Utility","UAG":"Laptop-Web","CID":"TECH","HR":"-","CRG":"3001010006000000","CP1":"8232F40100A032F40107C8667D","CP2":"3528020922700601","AIDF":"6:1191","UCB":"0/0","CLID":"-","CLCL":"0","OPTS":"-","PUAG":"Laptop-Web","SSLIL":"-","HRU":{"HM":"POST","HD":"ui.ff.avast.com","HP":"/v5/urlinfo/_MD/0073AE105208F6A80000017019A3861E30653939333461620000016F7DF63D19/1580799898571","HQ":""},"URLF":{"CID":"-","CGID":"-","CR":"-","RA":"-","USM":"-","USP":"-","MUS":"-"},"TCPSt":{"WS":"0","SE":"0","WSFNS":"0","WSF":"0","EM":"0","RSTE":"0","MSS":"0"},"NS":{"OPID":"-","ODID":"-","EPID":"-","TrID":"-","VSN":"-","LSUT":"-","STTS":"-","TCPPR":"-"},"CQA":{"NL":"-","CL":"-","CLC":"-","SQ":"-","SQC":"-"}}}
Groovy code
import groovy.json.JsonSlurper
import groovy.json.JsonParserType
import java.util.function.*
import java.lang.*
class customJSONtoCSV implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();
def REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that were not successfully processed").build();
def log
static def flatten(row, prefix="") {
def flattened = new HashMap<String, String>()
row.each { String k, Object v ->
def key = prefix ? prefix + "_" + k : k;
if (v instanceof Map) {
flattened.putAll(flatten(v, k))
} else {
flattened.put(key, '"' + v.toString() + '"')
}
}
return flattened
}
static def toCSVRow(HashMap row) {
def columns = ["CIPG_CIP","CIPG_CP","CIPG_SLP","CIPG_SLEP","CIPG_CVID","SIPG_SIP","SIPG_SP","SIPG_InP","SIPG_SVID","TG_T","TG_R","TG_C","TG_SDL","TG_DL","TG_I:R","TG_UAP","TG_EDBL","TG_Ca","TG_A","TG_RQM","TG_RSM","TG_FIT","TG_CSR","OF_Flag1","OF_Flag2","OF_Flag3","OF_Flag4","OF_Flag5","OF_Flag6","OF_Flag7","OF_Flag8","OF_Flag9","OF_Flag10","OF_Flag11","OF_Flag12","OF_Flag13","OF_Flag14","OF_Flag15","OF_Flag16","OF_Flag17","OF_Flag18","OF_Flag19","OF_Flag20","OF_Flag21","OF_Flag22","OF_Flag23","SF_Flag1","SF_Flag2","SF_Flag3","SF_Flag4","SF_Flag5","SF_Flag6","SF_Flag7","SF_Flag8","SF_Flag9","SF_Flag10","SF_Flag11","SF_Flag12","SF_Flag13","SF_Flag14","SF_Flag15","SF_Flag16","SF_Flag17","SF_Flag18","SF_Flag19","SF_Flag20","SF_Flag21","SF_Flag22","SF_Flag23","SF_Flag24","GF_Flag1","GF_Flag2","GF_Flag3","GF_Flag4","GF_Flag5","GF_Flag6","GF_Flag7","GF_Flag8","GF_Flag9","GF_Flag10","GF_Flag11","GF_Flag12","GF_Flag13","GF_Flag14","GF_Flag15","GF_Flag16","GF_Flag17","GF_Flag18","GF_Flag19","GF_Flag20","GF_Flag21","GF_Flag22","GF_Flag23","GF_Flag24","GF_Flag25","GF_Flag26","GF_Flag27","GF_Flag28","GF_Flag29","GF_Flag30","GF_Flag31","GF_Flag32","GF_Flag33","GF_Flag34","GF_Flag35","VSL_VSID","VSL_TC","VSL_MTC","VSL_NRTC","VSL_ET","VSL_HRES","VSL_VRES","VSL_FS","VSL_FR","VSL_VSD","VSL_ACB","VSL_ASB","VSL_VPR","VSL_VSST","HRU_HM","HRU_HD","HRU_HP","HRU_HQ","URLF_CID","URLF_CGID","URLF_CR","URLF_RA","URLF_USM","URLF_USP","URLF_MUS","TCPSt_WS","TCPSt_SE","TCPSt_WSFNS","TCPSt_WSF","TCPSt_EM","TCPSt_RSTE","TCPSt_MSS","NS_OPID","NS_ODID","NS_EPID","NS_TrID","NS_VSN","NS_LSUT","NS_STTS","NS_TCPPR","CQA_NL","CQA_CL","CQA_CLC","CQA_SQ","CQA_SQC","TS","V","PID","RS","SR","CnID","AS","OS","CPr","CVB","CS","HS","SUNR","SUNS","ML","MT","TCPSL","CT","MS","MSH","SID","SuID","UA","DID","UAG","CID","HR","CRG","CP1","CP2","AIDF","UCB","CLID","CLCL","OPTS","PUAG","SSLIL"]
return columns.collect { column ->
return row.containsKey(column) ? row.get(column) : ""
}.join('|')
}
@Override
void initialize(ProcessorInitializationContext context) {
log = context.getLogger()
}
@Override
Set<Relationship> getRelationships() {
return [REL_SUCCESS, REF_FAILURE] as Set
}
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
try {
def session = sessionFactory.createSession()
def flowFile = session.get()
if (!flowFile) return
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def bufferedReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))
def jsonSlurper = new JsonSlurper()
def line
def header = "CIPG_CIP,CIPG_CP,CIPG_SLP,CIPG_SLEP,CIPG_CVID,SIPG_SIP,SIPG_SP,SIPG_InP,SIPG_SVID,TG_T,TG_R,TG_C,TG_SDL,TG_DL,TG_IR,TG_UAP,TG_EDBL,TG_Ca,TG_A,TG_RQM,TG_RSM,TG_FIT,TG_CSR,OF_Flag1,OF_Flag2,OF_Flag3,OF_Flag4,OF_Flag5,OF_Flag6,OF_Flag7,OF_Flag8,OF_Flag9,OF_Flag10,OF_Flag11,OF_Flag12,OF_Flag13,OF_Flag14,OF_Flag15,OF_Flag16,OF_Flag17,OF_Flag18,OF_Flag19,OF_Flag20,OF_Flag21,OF_Flag22,OF_Flag23,SF_Flag1,SF_Flag2,SF_Flag3,SF_Flag4,SF_Flag5,SF_Flag6,SF_Flag7,SF_Flag8,SF_Flag9,SF_Flag10,SF_Flag11,SF_Flag12,SF_Flag13,SF_Flag14,SF_Flag15,SF_Flag16,SF_Flag17,SF_Flag18,SF_Flag19,SF_Flag20,SF_Flag21,SF_Flag22,SF_Flag23,SF_Flag24,GF_Flag1,GF_Flag2,GF_Flag3,GF_Flag4,GF_Flag5,GF_Flag6,GF_Flag7,GF_Flag8,GF_Flag9,GF_Flag10,GF_Flag11,GF_Flag12,GF_Flag13,GF_Flag14,GF_Flag15,GF_Flag16,GF_Flag17,GF_Flag18,GF_Flag19,GF_Flag20,GF_Flag21,GF_Flag22,GF_Flag23,GF_Flag24,GF_Flag25,GF_Flag26,GF_Flag27,GF_Flag28,GF_Flag29,GF_Flag30,GF_Flag31,GF_Flag32,GF_Flag33,GF_Flag34,GF_Flag35,VSL_VSID,VSL_TC,VSL_MTC,VSL_NRTC,VSL_ET,VSL_HRES,VSL_VRES,VSL_FS,VSL_FR,VSL_VSD,VSL_ACB,VSL_ASB,VSL_VPR,VSL_VSST,HRU_HM,HRU_HD,HRU_HP,HRU_HQ,URLF_CID,URLF_CGID,URLF_CR,URLF_RA,URLF_USM,URLF_USP,URLF_MUS,TCPSt_WS,TCPSt_SE,TCPSt_WSFNS,TCPSt_WSF,TCPSt_EM,TCPSt_RSTE,TCPSt_MSS,NS_OPID,NS_ODID,NS_EPID,NS_TrID,NS_VSN,NS_LSUT,NS_STTS,NS_TCPPR,CQA_NL,CQA_CL,CQA_CLC,CQA_SQ,CQA_SQC,TS,V,PID,RS,SR,CnID,AS,OS,CPr,CVB,CS,HS,SUNR,SUNS,ML,MT,TCPSL,CT,MS,MSH,SID,SuID,UA,DID,UAG,CID,HR,CRG,CP1,CP2,AIDF,UCB,CLID,CLCL,OPTS,PUAG,SSLIL"
outputStream.write("${header}\n".getBytes('UTF-8'))
while (line = bufferedReader.readLine()) {
def jsonReplace = line.replace('{"transaction":{','{"transaction":[{').replace('}}}','}}]}')
def jsonRecord = new JsonSlurper().parseText(jsonReplace)
def a = jsonRecord.transaction.collect { row ->
return flatten(row)
}.collect { row ->
return toCSVRow(row).toString()
}
String b = a.toString().replace('[','').replace(']','')
outputStream.write("${b}\n".getBytes('UTF-8'))
}
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
}
catch (e) {
throw new ProcessException(e)
}
}
@Override
Collection<ValidationResult> validate(ValidationContext context) { return null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) { return null }
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
@Override
List<PropertyDescriptor> getPropertyDescriptors() {
return [] as List
}
@Override
String getIdentifier() { return null }
}
processor = new customJSONtoCSV()