1

Application Information: IBM MQ 9.2, Cloudera CDP 7.1.6, Spark 2.4.5

I am upgrading the spark code from Spark 1.6 to Spark 2.4.5. I have a json content (complex schema) push to the MQ Queue which the message length exceed 4096. I able to read the json file with the same content directly but when the same content push to the MQ, I got the corrupt record when I try to print the schema using below code.

val myMsg = JmsStreamUtils.createAsynchronousJmsQueueStream(ssc, MQConsumerFactory(host,port.toInt, qm, qn, user, credentials, qc), converter, Session.AUTO_ACKNOWLEDGE, StorageLevel.MEMORY_AND_DISK_SER)
myMsg.foreachRDD(rdd => {
  val sqlContext = SparkSession.builder.getOrCreate()
  val myDS = sqlContext.createDataset(rdd)
  val readJson = sqlContext.read.json(myDS)
  readJson.printSchema()
  rdd.collect().foeach(println)
}

When I issue the rdd.collect().foreach(println), it only shows 4095 character in the log file.

Is there any clue what could be the reason for the corrupt record ?

My run.sh

APPNAME="$(basename "$PWD")"
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
CDPPATH="/opt/cloudera/parcels/CDH/lib"
MQJARPH="/spark/mqjars"
LOGPH="/sparklogs"
JARLIST="$MQJARPH/MQCredentialUtil.jar,$MQJARPH/spark-core_2.11-1.5.2.logging.jar,$MQJARPH/config-1.3.0.jar,$MQJARPH/com.ibm.mq.allclient.jar,$MQJARPH/fscontext.jar,$MQJARPH/guava-15.0-rc1.jar,$MQJARPH/javax.jms.jar,$MQJARPH/jta.jar,$MQJARPH/spark-jms-receiver-0.1.2-s_2.11.jar,$MQJARPH/spark-mq-jms-receiver_2.11-0.0.1-SNAPSHOT.jar,$MQJARPH/jms.jar,$MQJARPH/providerutil.jar"
$CDPPATH/spark/bin/spark-submit --master local[2] --conf spark.ui.enabled=false --jars $JARLIST --packages com.databricks:spark-csv_2.11:1.5.0 --class sparkintegration.SparkMQ "$DIR/target/scala-2.11/spark-mq-jms_2.11-0.0.1-SNAPSHOT.jar" >> $LOGPH/"$APPNAME-application-log.out" 2>> $LOGPH/"$APPNAME-log.out"

Is there any configuration setting to increase the buffer size/string length at spark end?

Chia
  • 11
  • 2
  • Is MQ that you are referring to is IBM MQ? – Shashi Sep 07 '21 at 06:50
  • Yes, it's reading from IBM MQ 9. – Chia Sep 07 '21 at 07:03
  • I don't have any knowledge of Scala or Spark. But I think the issue is something to do with the size of the buffer being passed by application to to read message from MQ queue. Looks like a default buffer size is 4K is being passed. MQ queues can support message size upto 100MB with default size being 4MB. – Shashi Sep 07 '21 at 08:29
  • @shashi, thanks for your hint. I tried to increase the message size to 100MB as per [link]https://www.ibm.com/support/pages/instructions-and-warnings-how-increase-maxmsgl-mq-queue-managers-queues-and-channels-default-4-mb-higher-number. It's still showing on the reading of 4095 character in the log file. Not sure what could be the road block for this. – Chia Sep 07 '21 at 09:44
  • You have modified the attributes on the queue manager side. I was suggesting you to check on your application side. Your application is passing a 4K buffer size to read a message. If that is the case, maximum message size that can be read is 4K only. So look from your application side on how to change the buffer size. – Shashi Sep 07 '21 at 10:27
  • You stated you are using MQ 9.2 but your JARLIST has dbhcore which is not part of the 9.2 jar files and i think was removed in 8.0 or earlier. For 9.2 you should only need com.ibm.mq.allclient.jar, jms.jar. if you are using .bindings files you would also need fscontext.jar and providerutil.jar. It is possible you have a mixture of jars from different versions of MQ or just a very outdated version. – JoshMc Sep 09 '21 at 06:27
  • Thanks @JoshMc for pointing out. I have updated my run.sh with the jar file highlighted accordingly. Anyway, the read limit 4096 still exist. Wondering if anything need to do with the CDP site or not. – Chia Sep 10 '21 at 07:56
  • The following should also be removed (they are all included in the com.ibm.mq.allclient.jar): $MQJARPH/com.ibm.mq.commonservices.jar $MQJARPH/com.ibm.mq.headers.jar $MQJARPH/com.ibm.mq.jar $MQJARPH/com.ibm.mq.jmqi.jar $MQJARPH/com.ibm.mqjms.jar $MQJARPH/com.ibm.mq.pcf.jar $MQJARPH/connector.jar – JoshMc Sep 14 '21 at 05:27

1 Answers1

0

I know absolutely nothing about Scala or Spark but Mr. Google says: Scala runs on the JVM, so Java and Scala stacks can be freely mixed for totally seamless integration.

So, you are using the Java/MQ JAR files??? True???

IBM MQ Labs did some really, really strange things with Java/MQ and JMS/MQ client library. The MQ client library will initially use a 4KB buffer to get the message. If it fails to get the entire message then it will increase the buffer size to the size of the message and perform the get again.

I wrote many, many blog posts about this back in the Summer of 2019. These are the Java/MQ related posts and there is another set for JMS/MQ.

Try setting the following JVM parameter to a value larger than the message size you are trying to retrieve.

i.e.

java -Dcom.ibm.mq.jmqi.defaultMaxMsgSize=250000 blah blah blah

where 250000 is the maximum message size of your messages. You can use whatever value you want.

You should say what version of the MQ/Java JAR files you are using. You could try a different release of MQ/Java JAR files in case there is a bug in the ones you are using.

Roger
  • 7,062
  • 13
  • 20