1

Im using below scala code to receive messages from rabbitmq.

 import com.rabbitmq.client.Channel
    import com.rabbitmq.client.Connection
    import com.rabbitmq.client.ConnectionFactory
    import com.rabbitmq.client.ConsumerCancelledException
    import com.rabbitmq.client.QueueingConsumer
        

   val rabbitMQconnection = getRabbitMQConnection
    val channel = rabbitMQconnection.createChannel()
    val args = Map[String, AnyRef]("x-message-ttl" -> Long.box(40000))
                                       
         channel.queueDeclare("test",true,false,false,args )
          val consumer = new QueueingConsumer(channel)
             channel.basicConsume("test",true,consumer)
           var message: String = null
         val delivery = consumer.nextDelivery()
         message= new String(delivery.getBody(), StandardCharsets.UTF_8)
         println("at consumer : " +message)

My input data from RabbitMq is zipped Json data. When I use the above code, Im unable to unzip the data. Could someone please let me know how to read zipped json data.

Thank you.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
SanjanaSanju
  • 261
  • 2
  • 18
  • This doesn't look like Spark Streaming code. Why are these tags included? – OneCricketeer Jan 17 '22 at 14:33
  • You should start with `val body = delivery.getBody()`, then pass this on to some unzip method. From which, you parse it with json library... `new String` assumes the data is UTF8 encoded plaintext, not compressed – OneCricketeer Jan 17 '22 at 14:37
  • Im doing this in spark and I have pasted on a part of the code. I am using spark streaming context. The data is compressed. Could you please let me know how to handle this. I have done the similar thing in python using Pika. df = gzip.decompress(body). Unable to do the same in scala – SanjanaSanju Jan 17 '22 at 14:47
  • Even in Python, that has nothing to do with Spark. You're calling a builtin gzip module function. For Scala, you'll need to find a similar library. Also, Gzip and zip are not the same – OneCricketeer Jan 17 '22 at 14:54
  • Since you can use Java code with Scala, for Gzip specifically, see https://stackoverflow.com/q/23606047/2308683 And similar answer would apply to using `ZipInputStream` instead.. From the decompressed bytes, you should be able to pass them on to a string constructor or JSON parser (Jackson accepts byte arrays, for example) – OneCricketeer Jan 17 '22 at 15:06
  • @OneCricketeer Thank you . I will try this. – SanjanaSanju Jan 17 '22 at 15:11
  • @oneCricketeer. Could you please help with the below. https://stackoverflow.com/questions/70750017/empty-rdd-while-consuming-messages-from-rabbitmq-in-sparks-scala – SanjanaSanju Jan 18 '22 at 10:49
  • Sorry, never used RabbitMQ with Spark. – OneCricketeer Jan 18 '22 at 16:30

0 Answers0