0

I want to separate the Bad JSON records from the flowfile and my NiFi job should continue processing the Good JSON records. I checked the "ValidateRecord" processor. But since the JSON structure itself is wrong for few records (e.g., "CT":"UTF-8""), NiFi transferring the entire flowfile to Failure relationship. Since am already using a Groovy script to parse the JSON to CSV, I am thinking of writing the error records to a separate flowfile while parsing in the same Groovy script. But am struggling to modify as am new to Groovy. Could anyone help?

In case of any error in parsing, then it should write to "failure" relationship flowFile otherwise "success" relationship flowFile. something like..

try {
def jsonRecord = new JsonSlurper().parseText(jsonReplace)
}
catch (e)
write to error file

Sample Bad JSON:

{"transaction":{"TS":"2020-02-04 07:04:57.437002","V":"v8","PID":"p1125","RS":"370","SR":"420","CnID":"0/6","CIPG":{"CIP":"10.67.112.35","CP":"38212","SLP":"38212","SLEP":"38212","CVID":"410"},"SIPG":{"SIP":"54.93.254.234","SP":"80","InP":"8080","SVID":"420"},"TG":{"T":"158836","R":"158836","C":"0","SDL":"0","DL":"0","I:R":"2:0","UAP":"0","EDBL":"0","Ca":"0","A":"0","RQM":"0","RSM":"0","FIT":"0","CSR":"158836"},"AS":"454","OS":"460","CPr":"-1","CVB":"0","CS":"MISS","HS":"200","OF":{"Flag1":"-","Flag2":"-","Flag3":"-","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"-","Flag9":"-","Flag10":"-","Flag11":"-","Flag12":"-","Flag13":"-","Flag14":"-","Flag15":"-","Flag16":"-","Flag17":"-","Flag18":"-","Flag19":"-","Flag20":"-","Flag21":"-","Flag22":"-","Flag23":"-"},"SF":{"Flag1":"A","Flag2":"M","Flag3":"H","Flag4":"-","Flag5":"C","Flag6":"-","Flag7":"-","Flag8":"F","Flag9":"-","Flag10":"-","Flag11":"C","Flag12":"-","Flag13":"-","Flag14":"-","Flag15":"-","Flag16":"-","Flag17":"-","Flag18":"-","Flag19":"-","Flag20":"-","Flag21":"-","Flag22":"S","Flag23":"c","Flag24":"-"},"GF":{"Flag1":"r","Flag2":"-","Flag3":"s","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"-","Flag9":"-","Flag10":"k","Flag11":"-","Flag12":"S","Flag13":"C","Flag14":"-","Flag15":"-","Flag16":"M","Flag17":"T","Flag18":"A","Flag19":"H","Flag20":"S","Flag21":"H","Flag22":"-","Flag23":"S","Flag24":"-","Flag25":"-","Flag26":"j","Flag27":"-","Flag28":"-","Flag29":"-","Flag30":"L","Flag31":"-","Flag32":"-","Flag33":"-","Flag34":"-","Flag35":"-"},"SUNR":"0","SUNS":"460","ML":"-","VSL":{"VSID":"0","TC":"-","MTC":"-","NRTC":"-","ET":"-","HRES":"0","VRES":"0","FS":"0","FR":"0.0","VSD":"0","ACB":"-","ASB":"-","VPR":"-","VSST":"-"},"MT":"-","TCPSL":"54/54/54 21 1/1 0.0/0.0","CT":"text/javascript; charset="UTF-8"","MS":"44XXXXXXXXXX","MSH":"7ogzX50IJtbIy44dRxi9PbNPlwCJ5TppUpX+Dk72TdU=","SID":"v4.172.16.137.150:9EE6E276BA938B46","SuID":"-","UA":"okhttp/3.9.1","DID":"Default-Android-Web-Unknown","UAG":"Android-Web","CID":"TECH","HR":"-","CRG":"3001010006000000","CP1":"8232F401009032F40107ADE76E","CP2":"3544470718998600","AIDF":"0:0","UCB":"0/0","CLID":"-","CLCL":"0","OPTS":"-","PUAG":"Android-Web","SSLIL":"-","HRU":{"HM":"GET","HD":"pubsub.pubnub.com","HP":"/v2/subscribe/sub-c-ee729d78-6233-11e2-b80b-12313f022c90/uk8_16620/0","HQ":"requestid=57e020bb-bc6a-47bf-8329-314f4242a506&heartbeat=300&tr=12&tt=15807995127999216&pnsdk=PubNub-Java-Unified/4.19.0&uuid=a311a598-430f-4758-a75e-a045132c926d"},"URLF":{"CID":"-","CGID":"-","CR":"-","RA":"-","USM":"-","USP":"-","MUS":"-"},"TCPSt":{"WS":"0","SE":"0","WSFNS":"0","WSF":"0","EM":"0","RSTE":"0","MSS":"0"},"NS":{"OPID":"-","ODID":"-","EPID":"-","TrID":"-","VSN":"-","LSUT":"-","STTS":"-","TCPPR":"-"},"CQA":{"NL":"-","CL":"-","CLC":"-","SQ":"-","SQC":"-"}}}

Sample Good JSON:

{"transaction":{"TS":"2020-02-04 07:04:59.942141","V":"v8","PID":"p36489","RS":"578","SR":"649","CnID":"0/1","CIPG":{"CIP":"10.65.204.71","CP":"33602","SLP":"33602","SLEP":"33602","CVID":"410"},"SIPG":{"SIP":"5.62.38.137","SP":"80","InP":"80","SVID":"420"},"TG":{"T":"363","R":"363","C":"0","SDL":"0","DL":"0","I:R":"2:0","UAP":"0","EDBL":"0","Ca":"0","A":"0","RQM":"0","RSM":"0","FIT":"0","CSR":"362"},"AS":"254","OS":"254","CPr":"0","CVB":"0","CS":"MISS","HS":"200","OF":{"Flag1":"-","Flag2":"-","Flag3":"-","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"-","Flag9":"-","Flag10":"-","Flag11":"-","Flag12":"-","Flag13":"-","Flag14":"-","Flag15":"-","Flag16":"-","Flag17":"-","Flag18":"-","Flag19":"-","Flag20":"-","Flag21":"-","Flag22":"-","Flag23":"-"},"SF":{"Flag1":"A","Flag2":"M","Flag3":"H","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"F","Flag9":"-","Flag10":"-","Flag11":"C","Flag12":"-","Flag13":"-","Flag14":"-","Flag15":"-","Flag16":"-","Flag17":"F","Flag18":"-","Flag19":"-","Flag20":"-","Flag21":"-","Flag22":"G","Flag23":"G","Flag24":"-"},"GF":{"Flag1":"n","Flag2":"-","Flag3":"b","Flag4":"-","Flag5":"-","Flag6":"-","Flag7":"-","Flag8":"-","Flag9":"-","Flag10":"k","Flag11":"-","Flag12":"S","Flag13":"C","Flag14":"-","Flag15":"-","Flag16":"M","Flag17":"T","Flag18":"A","Flag19":"!","Flag20":"-","Flag21":"H","Flag22":"-","Flag23":"S","Flag24":"-","Flag25":"-","Flag26":"5","Flag27":"-","Flag28":"-","Flag29":"-","Flag30":"L","Flag31":"-","Flag32":"-","Flag33":"-","Flag34":"-","Flag35":"-"},"SUNR":"0","SUNS":"254","ML":"-","VSL":{"VSID":"0","TC":"-","MTC":"-","NRTC":"-","ET":"-","HRES":"0","VRES":"0","FS":"0","FR":"0.0","VSD":"0","ACB":"-","ASB":"-","VPR":"-","VSST":"-"},"MT":"-","TCPSL":"30/30/30 21 1/1 0.0/0.0","CT":"application/octet-stream","MS":"44XXXXXXXXXX","MSH":"rPD7X2tPXvDdbRPEeEjIlvpggJcu/UPStkSEgxepvB8=","SID":"v4.172.16.137.150:9EE6E2767E92B7A2","SuID":"-","UA":"Apache-HttpClient/UNAVAILABLE (Java/0)","DID":"Jakarta Commons-HttpClient#2#App#Library-Utility","UAG":"Laptop-Web","CID":"TECH","HR":"-","CRG":"3001010006000000","CP1":"8232F40100A032F40107C8667D","CP2":"3528020922700601","AIDF":"6:1191","UCB":"0/0","CLID":"-","CLCL":"0","OPTS":"-","PUAG":"Laptop-Web","SSLIL":"-","HRU":{"HM":"POST","HD":"ui.ff.avast.com","HP":"/v5/urlinfo/_MD/0073AE105208F6A80000017019A3861E30653939333461620000016F7DF63D19/1580799898571","HQ":""},"URLF":{"CID":"-","CGID":"-","CR":"-","RA":"-","USM":"-","USP":"-","MUS":"-"},"TCPSt":{"WS":"0","SE":"0","WSFNS":"0","WSF":"0","EM":"0","RSTE":"0","MSS":"0"},"NS":{"OPID":"-","ODID":"-","EPID":"-","TrID":"-","VSN":"-","LSUT":"-","STTS":"-","TCPPR":"-"},"CQA":{"NL":"-","CL":"-","CLC":"-","SQ":"-","SQC":"-"}}}

Groovy code

import groovy.json.JsonSlurper
import groovy.json.JsonParserType
import java.util.function.*
import java.lang.*

class customJSONtoCSV implements Processor {

def REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build();
def REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that were not successfully processed").build();

    def log
    static def flatten(row, prefix="") {
            def flattened = new HashMap<String, String>()
                            row.each { String k, Object v ->
            def key = prefix ? prefix + "_" + k : k;

            if (v instanceof Map) {
                flattened.putAll(flatten(v, k))
            } else {
                flattened.put(key, '"' + v.toString() + '"')
            }
        }

        return flattened
    }

        static def toCSVRow(HashMap row) {
        def columns = ["CIPG_CIP","CIPG_CP","CIPG_SLP","CIPG_SLEP","CIPG_CVID","SIPG_SIP","SIPG_SP","SIPG_InP","SIPG_SVID","TG_T","TG_R","TG_C","TG_SDL","TG_DL","TG_I:R","TG_UAP","TG_EDBL","TG_Ca","TG_A","TG_RQM","TG_RSM","TG_FIT","TG_CSR","OF_Flag1","OF_Flag2","OF_Flag3","OF_Flag4","OF_Flag5","OF_Flag6","OF_Flag7","OF_Flag8","OF_Flag9","OF_Flag10","OF_Flag11","OF_Flag12","OF_Flag13","OF_Flag14","OF_Flag15","OF_Flag16","OF_Flag17","OF_Flag18","OF_Flag19","OF_Flag20","OF_Flag21","OF_Flag22","OF_Flag23","SF_Flag1","SF_Flag2","SF_Flag3","SF_Flag4","SF_Flag5","SF_Flag6","SF_Flag7","SF_Flag8","SF_Flag9","SF_Flag10","SF_Flag11","SF_Flag12","SF_Flag13","SF_Flag14","SF_Flag15","SF_Flag16","SF_Flag17","SF_Flag18","SF_Flag19","SF_Flag20","SF_Flag21","SF_Flag22","SF_Flag23","SF_Flag24","GF_Flag1","GF_Flag2","GF_Flag3","GF_Flag4","GF_Flag5","GF_Flag6","GF_Flag7","GF_Flag8","GF_Flag9","GF_Flag10","GF_Flag11","GF_Flag12","GF_Flag13","GF_Flag14","GF_Flag15","GF_Flag16","GF_Flag17","GF_Flag18","GF_Flag19","GF_Flag20","GF_Flag21","GF_Flag22","GF_Flag23","GF_Flag24","GF_Flag25","GF_Flag26","GF_Flag27","GF_Flag28","GF_Flag29","GF_Flag30","GF_Flag31","GF_Flag32","GF_Flag33","GF_Flag34","GF_Flag35","VSL_VSID","VSL_TC","VSL_MTC","VSL_NRTC","VSL_ET","VSL_HRES","VSL_VRES","VSL_FS","VSL_FR","VSL_VSD","VSL_ACB","VSL_ASB","VSL_VPR","VSL_VSST","HRU_HM","HRU_HD","HRU_HP","HRU_HQ","URLF_CID","URLF_CGID","URLF_CR","URLF_RA","URLF_USM","URLF_USP","URLF_MUS","TCPSt_WS","TCPSt_SE","TCPSt_WSFNS","TCPSt_WSF","TCPSt_EM","TCPSt_RSTE","TCPSt_MSS","NS_OPID","NS_ODID","NS_EPID","NS_TrID","NS_VSN","NS_LSUT","NS_STTS","NS_TCPPR","CQA_NL","CQA_CL","CQA_CLC","CQA_SQ","CQA_SQC","TS","V","PID","RS","SR","CnID","AS","OS","CPr","CVB","CS","HS","SUNR","SUNS","ML","MT","TCPSL","CT","MS","MSH","SID","SuID","UA","DID","UAG","CID","HR","CRG","CP1","CP2","AIDF","UCB","CLID","CLCL","OPTS","PUAG","SSLIL"]

            return columns.collect { column ->
            return row.containsKey(column) ? row.get(column) : ""
        }.join('|')
    }

    @Override
    void initialize(ProcessorInitializationContext context) {
        log = context.getLogger()
    }

    @Override

    Set<Relationship> getRelationships() {
        return [REL_SUCCESS, REF_FAILURE] as Set
    }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        try {

            def session = sessionFactory.createSession()
            def flowFile = session.get()
            if (!flowFile) return
            flowFile = session.write(flowFile,
                    { inputStream, outputStream ->
                        def bufferedReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))

        def jsonSlurper = new JsonSlurper()
        def line
        def header = "CIPG_CIP,CIPG_CP,CIPG_SLP,CIPG_SLEP,CIPG_CVID,SIPG_SIP,SIPG_SP,SIPG_InP,SIPG_SVID,TG_T,TG_R,TG_C,TG_SDL,TG_DL,TG_IR,TG_UAP,TG_EDBL,TG_Ca,TG_A,TG_RQM,TG_RSM,TG_FIT,TG_CSR,OF_Flag1,OF_Flag2,OF_Flag3,OF_Flag4,OF_Flag5,OF_Flag6,OF_Flag7,OF_Flag8,OF_Flag9,OF_Flag10,OF_Flag11,OF_Flag12,OF_Flag13,OF_Flag14,OF_Flag15,OF_Flag16,OF_Flag17,OF_Flag18,OF_Flag19,OF_Flag20,OF_Flag21,OF_Flag22,OF_Flag23,SF_Flag1,SF_Flag2,SF_Flag3,SF_Flag4,SF_Flag5,SF_Flag6,SF_Flag7,SF_Flag8,SF_Flag9,SF_Flag10,SF_Flag11,SF_Flag12,SF_Flag13,SF_Flag14,SF_Flag15,SF_Flag16,SF_Flag17,SF_Flag18,SF_Flag19,SF_Flag20,SF_Flag21,SF_Flag22,SF_Flag23,SF_Flag24,GF_Flag1,GF_Flag2,GF_Flag3,GF_Flag4,GF_Flag5,GF_Flag6,GF_Flag7,GF_Flag8,GF_Flag9,GF_Flag10,GF_Flag11,GF_Flag12,GF_Flag13,GF_Flag14,GF_Flag15,GF_Flag16,GF_Flag17,GF_Flag18,GF_Flag19,GF_Flag20,GF_Flag21,GF_Flag22,GF_Flag23,GF_Flag24,GF_Flag25,GF_Flag26,GF_Flag27,GF_Flag28,GF_Flag29,GF_Flag30,GF_Flag31,GF_Flag32,GF_Flag33,GF_Flag34,GF_Flag35,VSL_VSID,VSL_TC,VSL_MTC,VSL_NRTC,VSL_ET,VSL_HRES,VSL_VRES,VSL_FS,VSL_FR,VSL_VSD,VSL_ACB,VSL_ASB,VSL_VPR,VSL_VSST,HRU_HM,HRU_HD,HRU_HP,HRU_HQ,URLF_CID,URLF_CGID,URLF_CR,URLF_RA,URLF_USM,URLF_USP,URLF_MUS,TCPSt_WS,TCPSt_SE,TCPSt_WSFNS,TCPSt_WSF,TCPSt_EM,TCPSt_RSTE,TCPSt_MSS,NS_OPID,NS_ODID,NS_EPID,NS_TrID,NS_VSN,NS_LSUT,NS_STTS,NS_TCPPR,CQA_NL,CQA_CL,CQA_CLC,CQA_SQ,CQA_SQC,TS,V,PID,RS,SR,CnID,AS,OS,CPr,CVB,CS,HS,SUNR,SUNS,ML,MT,TCPSL,CT,MS,MSH,SID,SuID,UA,DID,UAG,CID,HR,CRG,CP1,CP2,AIDF,UCB,CLID,CLCL,OPTS,PUAG,SSLIL"

        outputStream.write("${header}\n".getBytes('UTF-8'))

        while (line = bufferedReader.readLine()) {

        def jsonReplace = line.replace('{"transaction":{','{"transaction":[{').replace('}}}','}}]}')
        def jsonRecord = new JsonSlurper().parseText(jsonReplace)

        def a = jsonRecord.transaction.collect { row ->
                return flatten(row)
                }.collect { row ->
                return toCSVRow(row).toString()
                }
        String b = a.toString().replace('[','').replace(']','')
        outputStream.write("${b}\n".getBytes('UTF-8'))

        }

 } as StreamCallback)

            session.transfer(flowFile, REL_SUCCESS)
            session.commit()
        }
        catch (e) {
    throw new ProcessException(e)
        }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) { return null }

    @Override

    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

    @Override

   List<PropertyDescriptor> getPropertyDescriptors() { 

return [] as List
}

    @Override

    String getIdentifier() { return null }
}
processor = new customJSONtoCSV()
  • help with what? what is the problem? – daggett Feb 12 '20 at 16:59
  • need help in modifying the above Groovy code to implement the logic of writing to error file in case of parsing failure. I don't know how/where to modify the existing code especially the inputStream & outputStream are defined inside Closure concept. – Sriraam Venkataraman Feb 12 '20 at 17:08
  • Is this really a totally different problem than https://stackoverflow.com/questions/60127530/json-parsing-using-groovy ? – cfrick Feb 12 '20 at 17:49
  • Source JSON is same but here problem is different. Now am trying to write the error records to a new file. Earlier I was trying to parse the bad json by playing with strings. – Sriraam Venkataraman Feb 12 '20 at 18:40

1 Answers1

0

here is an example how to write two files to different relations for ExecuteGroovyScript processor

assume there is a text file as input with numbers on each line

11
22
33
44

the following groovy code splits incoming file on two REL_SUCCESS for odd numbers and REL_FAILURE for even numbers

ff=session.get()
if(!ff)return

def ff2 = ff.clone(false)

ff.write{streamIn,streamOut->
    streamOut.withWriter("UTF-8"){w->
        streamIn.eachLine("UTF-8"){line->
            if(line.toInteger()%2){
                w.append(line + '\n')
            }else{
                ff2.append("UTF-8", line + '\n')
            }
        }
    }
}

REL_SUCCESS<<ff

//transfer ff2 only if not empty
if(ff2.getSize()>0){
    REL_FAILURE<<ff2 
}else{
    ff2.remove()
}

you could rewrite this code for InvokedScriptedProcessor but it will be just larger (IHMO)...

daggett
  • 26,404
  • 3
  • 40
  • 56
  • Thanks for your reply. I tried rewriting the code and getting the below error. InvokeScriptedProcessor[id=38e32b68-0170-1000-0000-000050e86830] InvokeScriptedProcessor[id=38e32b68-0170-1000-0000-000050e86830] failed to process session due to org.apache.nifi.processor.exception.ProcessException: groovy.lang.MissingMethodException: No signature of method: org.apache.nifi.controller.repository.StandardFlowFileRecord.clone() is applicable for argument types: (java.lang.Boolean) values: [false] – Sriraam Venkataraman Feb 13 '20 at 13:51
  • why don't you want to use ExecuteGroovyScript ? – daggett Feb 13 '20 at 13:59
  • Thanks for your suggestion. I modified the Groovy code and called through ExecuteGroovyScript processor. It is working fine. I tested for more than 1000 flow files. – Sriraam Venkataraman Feb 13 '20 at 22:46