Application Information: IBM MQ 9.2, Cloudera CDP 7.1.6, Spark 2.4.5
I am upgrading the spark code from Spark 1.6 to Spark 2.4.5. I have a json content (complex schema) push to the MQ Queue which the message length exceed 4096. I able to read the json file with the same content directly but when the same content push to the MQ, I got the corrupt record when I try to print the schema using below code.
val myMsg = JmsStreamUtils.createAsynchronousJmsQueueStream(ssc, MQConsumerFactory(host,port.toInt, qm, qn, user, credentials, qc), converter, Session.AUTO_ACKNOWLEDGE, StorageLevel.MEMORY_AND_DISK_SER)
myMsg.foreachRDD(rdd => {
val sqlContext = SparkSession.builder.getOrCreate()
val myDS = sqlContext.createDataset(rdd)
val readJson = sqlContext.read.json(myDS)
readJson.printSchema()
rdd.collect().foeach(println)
}
When I issue the rdd.collect().foreach(println)
, it only shows 4095 character in the log file.
Is there any clue what could be the reason for the corrupt record ?
My run.sh
APPNAME="$(basename "$PWD")"
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
CDPPATH="/opt/cloudera/parcels/CDH/lib"
MQJARPH="/spark/mqjars"
LOGPH="/sparklogs"
JARLIST="$MQJARPH/MQCredentialUtil.jar,$MQJARPH/spark-core_2.11-1.5.2.logging.jar,$MQJARPH/config-1.3.0.jar,$MQJARPH/com.ibm.mq.allclient.jar,$MQJARPH/fscontext.jar,$MQJARPH/guava-15.0-rc1.jar,$MQJARPH/javax.jms.jar,$MQJARPH/jta.jar,$MQJARPH/spark-jms-receiver-0.1.2-s_2.11.jar,$MQJARPH/spark-mq-jms-receiver_2.11-0.0.1-SNAPSHOT.jar,$MQJARPH/jms.jar,$MQJARPH/providerutil.jar"
$CDPPATH/spark/bin/spark-submit --master local[2] --conf spark.ui.enabled=false --jars $JARLIST --packages com.databricks:spark-csv_2.11:1.5.0 --class sparkintegration.SparkMQ "$DIR/target/scala-2.11/spark-mq-jms_2.11-0.0.1-SNAPSHOT.jar" >> $LOGPH/"$APPNAME-application-log.out" 2>> $LOGPH/"$APPNAME-log.out"
Is there any configuration setting to increase the buffer size/string length at spark end?