0

I have a PySpark program that is doing fairly simple data conversion on a large number of records. I sporadically get the following error. I've added code to make sure none of the IntegerType values that I write are out of the normal 32-bit integer value range. This out of range integer seems to be some internal value, not something I pass in.

This happens on a simple call to count. The Python line that triggers this error is very simple:

data_frame_count = data_frame.count()

I am running this on Spark 2.0 under Amazon EMR 5.0.

I've done a search on this error. I see another conversation thread that is focused on fancier logistic regression processing. Here, I am doing, very simple, load data, basic parsing/cleaning of data, and writing out data.

Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:497)
    at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:475)
    at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:475)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:475)
    at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:280)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:60)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:60)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:60)
    at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:158)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:106)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:119)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
clay
  • 18,138
  • 28
  • 107
  • 192

1 Answers1

0

It seems that there is an open issue for it...

https://issues.apache.org/jira/browse/SPARK-1476

From the description:

underlying abstraction for blocks in spark is a ByteBuffer : which limits the size of the block to 2GB. This has implication not just for managed blocks in use, but also for shuffle blocks (memory mapped blocks are limited to 2gig, even though the api allows for long), ser-deser via byte array backed outstreams (SPARK-1391), etc. This is a severe limitation for use of spark when used on non trivial datasets.