0

I am trying to validate process a csv using Apache-NiFi. I use ExecuteGroovyScript to process csv and get data.

My original csv has a problem, some records look like this :

id,name,age,bd,email,address
1,sachith,29,9,sachith@email.com,{"No": "1","Lane":"Lane-1"}
2,nalaka,29,17,nalaka@email.com,{"No": "1","Lane":
"Lane-1"}

here 2nd record is invalid, I want to remove just this record and process rest.

import groovy.json.*

def ff=session.get()
if(!ff)return

def parser = new JsonSlurper().setType(JsonParserType.LAX)

ff.write{streamIn,streamOut->
    streamIn.withReader('UTF-8'){r->      //convert in stream to reader
        streamOut.withWriter('UTF-8'){w-> //convert out stream to writer
            //go line by line
            r.eachLine{line, lineNum->
                if(lineNum==1){
                    w<<line<<'id,name,age,bd,email,address'<<'\n'        //for the first line just add some headers
                }else{
                    def row=line.split(',')          //split line by coma
                    def json=row[5..-1].join(',')    //join back to string starting from 3rd element
                    json = parser.parseText(json)
                    w<<"${json.id},${json.name},${json.age},${json.bd},${json.email},${json.address}"<<'\n'
                }
            }
        }
    }
}
REL_SUCCESS<<ff

This was taken from my previous question.

Basically I want to just ignore the record and process with other values :

I have referred : groovy.json.JsonException: expecting

Groovy: validate JSON string

But I dont understand how to integrate this to Apache-NiFi flow.

Sachith Muhandiram
  • 2,819
  • 10
  • 45
  • 94

1 Answers1

1

I agree that better to fix the source

however if it's not possible then you could try to match if line is complete

import groovy.json.*

def parser = new JsonSlurper().setType(JsonParserType.LAX) //LAX to accept strings without double-quotes

def w = System.out
def buf = new StringBuilder() //buffer to collect lines if they are not complete
new StringReader('''id,name,age,bd,email,address
1,sachith,29,9,sachith@email.com,{"No": "1","Lane":"Lane-1"}
2,nalaka,29,17,nalaka@email.com,{"No": "1"
,"Lane":"Lane-1"}''').withReader{r->
    r.eachLine{line, lineNum->
        if(lineNum==1){
            w<<line<<'id,name,age,bd,email,address'<<'\n'
        }else{
            buf<<(buf?'\n':'')<<line //append line to previous incomplete line(s)
            if(buf=~/(?s)^\d.*\}$/){
                //normal line: starts with number and ends with }
                def row=buf.toString().split(',')   //split line by coma
                def json=row[5..-1].join(',')       //join back to string starting from 3rd element
                json = parser.parseText(json)
                w<<"${json.No},${json.Lane}"<<'\n'
                buf.setLength(0) //reset buffer
            }
        }
    }
}
daggett
  • 26,404
  • 3
  • 40
  • 56