1

I want process multiple json records one after the other. My code reads the multiple jsons and stores them into dataframe. Now i want to process the json document row by row from dataframe. When i take the row from dataframe i need to convert that single row to dataframe again and do some operations on that. I am stuck at to convert class 'pyspark.sql.types.Row' object to dataframe.

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/mydatabase.sample").load()
A = u(funcRowIter,df.schema)
z = df.withColumn("new_column",A(struct([df[x] for x in df.columns])))
z.show()

def funcRowIter(rows):
print type(rows)
if(rows is not None):

        rdf = sqlContext.createDataFrame(rows)
        rdf.show()
        return rows  

Help me out to convert the class 'pyspark.sql.types.Row' object to dataframe. My row object is huge json file.

This is the json i am trying to read from mongodb

{
"Feed": {
    "feedBody": {
        "Reservation": {
            "recordLocatorID": "X23344", 
            "pnrCreateDate": "2018-09-24T23:00:00.000", 
            "lastUpdateTimestamp": "2018-09-26T14:51:01.643", 
            "pnrReservationSystemSequenceID": "1643", 
            "pnrPurgeDate": "2018-10-11", 
            "passengerCount": "1", 
            "reservationSystemCode": "1X", 
            "passengerList": {
                "passenger": {
                    "passengerID": "2", 
                    "lastUpdateTimestamp": "2018-09-24T18:00:54.835", 
                    "dateOfBirth": "1993-10-02", 
                    "givenName": "fgdfg", 
                    "surName": "fgdfg", 
                    "gender": "M", 
                    "infantIndicator": "true", 
                    "seatCount": "1", 
                    "reservationSystemCustomerID": "dfgdfg", 
                    "passengerTypeCode": "dfgfd", 
                    "groupDepositIndicator": "false", 
                    "passengerTicketDocList": {
                        "passengerTicketDoc": {
                            "ticketDocID": "45", 
                            "lastUpdateTimestamp": "2018-09-24T18:01:01.149", 
                            "ticketNumber": "43434343434", 
                            "ticketType": "T", 
                            "ticketIndicator": "E", 
                            "status": "T", 
                            "issuanceDate": "2010-09-20", 
                            "chargeAmount": "0.74", 
                            "currency": "USD"
                        }
                    }

               }

            }
        }
    }
}

}

This is the rows output

   Row(Feed=Row(
    feedBody=Row(
        Reservation=Row(
            recordLocatorID=u'X23344', 
            pnrCreateDate=u'2018-09-24T23:00:00.000', 
            lastUpdateTimestamp=u'2018-09-26T14:51:01.643', 
            pnrReservationSystemSequenceID=u'1643', 
            pnrPurgeDate=u'2018-10-11', 
            passengerCount=u'1', 
            reservationSystemCode=u'1X', 
            passengerList=Row(
                passenger=Row(
                passengerID=u'2', 
                lastUpdateTimestamp=u'2018-09-24T18:00:54.835', 
                dateOfBirth=u'1993-10-02', 
                givenName=u'fgdfg', 
                surName=u'fgdfg', 
                gender=u'M', 
                infantIndicator=u'true', 
                seatCount=u'1', 
                reservationSystemCustomerID=u'dfgdfg', 
                passengerTypeCode=u'dfgfd', 
                groupDepositIndicator=u'false', 
                passengerTicketDocList=Row(
                    passengerTicketDoc=Row(
                        ticketDocID=u'45', 
                        lastUpdateTimestamp=u'2018-09-24T18:01:01.149', 
                        ticketNumber=u'43434343434', 
                        ticketType=u'T', 
                        ticketIndicator=u'E', 
                        status=u'T', 
                        issuanceDate=u'2010-09-20', 
                        chargeAmount=u'0.74', 
                        currency=u'USD'))))))), _id=Row(oid=u'5bc0cc8c2ec34dd42a44fc2f'))
pvy4917
  • 1,768
  • 17
  • 23
darla
  • 17
  • 1
  • 6
  • Can you OP the Row? – pvy4917 Oct 12 '18 at 15:55
  • It would be helpful if you could provide a [reproducible example](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples). That being said, did you try wrapping the row in a list? `sqlContext.createDataFrame([rows])` – pault Oct 12 '18 at 16:07
  • Provided reproducible example json and the row OP. – darla Oct 12 '18 at 16:40
  • Thank you. Let me look into it. – pvy4917 Oct 12 '18 at 16:56
  • With this command `df = spark.sparkContext.parallelize(list(row)).toDF()` you can access to the schema correctly, but not to the data. Maybe flattening the nested Rows could do the trick. – Andrea Oct 18 '18 at 10:01

0 Answers0