0

Me using spark-sql-2.3.1v , kafka with java8 in my project. With

--driver-memory 4g \
--driver-cores 2 \
--num-executors 120 \
--executor-cores 1 \
--executor-memory 768m \

At consumer side , me trying to write the files in hdfs Me using something like this below code

                          dataSet.writeStream()
                                        .format("parquet")
                                        .option("path", parqetFileName)
                                        .option("mergeSchema", true)
                                        .outputMode("Append")
                                        .partitionBy("company_id","date")
                                        .option("checkpointLocation", checkPtLocation)
                                        .trigger(Trigger.ProcessingTime("25 seconds"))
                                        .start();

When I store into hdfs folder it looks something below i.e. each file is around 1.5k+ i.e. few KBs.

$ hdfs dfs -du -h /transactions/company_id=24779/date=2014-06-24/
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00026-1027fff9-5745-4250-961a-fd56508b7ea3.c000.snappy.parquet
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00057-6604f6cc-5b8d-41f4-8fc0-14f6e13b4a37.c000.snappy.parquet
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00098-754e6929-9c75-430f-b6bb-3457a216aae3.c000.snappy.parquet
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00099-1d62cbd5-7409-4259-b4f3-d0f0e5a93da3.c000.snappy.parquet
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00109-1965b052-c7a6-47a8-ae15-dea301010cf5.c000.snappy.parquet

Due to this small files , its taking a lot of processing time , while I read back larger set of data from hdfs & count the number of rows then it is resulting into below heap space error.

2020-02-12 07:07:57,475 [Driver] ERROR org.apache.spark.deploy.yarn.ApplicationMaster - User class threw exception: java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:3664)
        at java.lang.String.<init>(String.java:207)
        at java.lang.String.substring(String.java:1969)
        at java.net.URI$Parser.substring(URI.java:2869)
        at java.net.URI$Parser.parse(URI.java:3049)
        at java.net.URI.<init>(URI.java:588)
        at org.apache.spark.sql.execution.streaming.SinkFileStatus.toFileStatus(FileStreamSinkLog.scala:52)
        at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex$$anonfun$2.apply(MetadataLogFileIndex.scala:46)
        at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex$$anonfun$2.apply(MetadataLogFileIndex.scala:46)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.<init>(MetadataLogFileIndex.scala:46)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:336)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
        at com.spgmi.ca.prescore.utils.DbUtils.loadFromHdfs(DbUtils.java:129)
        at com.spgmi.ca.prescore.spark.CountRecords.main(CountRecords.java:84)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
2020-02-12 07:07:57,533 [Reporter] WARN  org.apache.spark.deploy.yarn.ApplicationMaster - Reporter thread fails 1 time(s) in a row.
java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: dev1-dev.com":8030;
        at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:805)
        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1497)
        at org.apache.hadoop.ipc.Client.call(Client.java:1439)
        at org.apache.hadoop.ipc.Client.call(Client.java:1349)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
        at com.sun.proxy.$Proxy22.allocate(Unknown Source)
        at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
        at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
        at com.sun.proxy.$Proxy23.allocate(Unknown Source)
        at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:296)
        at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:249)
        at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$allocationThreadImpl(ApplicationMaster.scala:540)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:606)
Caused by: java.nio.channels.ClosedByInterruptException
        at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:753)
        at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
        at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:687)
        at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:790)
        at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:411)
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1554)

Questions:

  1. are these small files going to result in "small file problem" in spark processing? If so how to deal with this scenario.

  2. If I want to count the total number of records from given hdfs folder, how to do it ?

  3. How to know how much heap-space necessary to handle this kind of data ?

After new changes

--driver-memory 16g \
--driver-cores 1 \
--num-executors 120 \
--executor-cores 1 \
--executor-memory 768m \

Run successfully results are :

2020-02-12 20:28:56,188 [Driver] WARN  com.spark.mypackage.CountRecords -  NUMBER OF PARTITIONS AFTER HDFS READ : 77926
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|               24354|   94|
|               26425|   96|
|               32414|   64|
|               76143|   32|
|               16861|   32|
|               30903|   64|
|               40335|   32|
|               64121|   64|
|               69042|   32|
|               32539|   64|
|               34759|   32|
|               41575|   32|
|                1591|   64|
|                3050|   98|
|               51772|   32|
+--------------------+-----+

2020-02-12 20:50:32,301 [Driver] WARN  com.spark.mypackage.CountRecords -  RECORD COUNT: 3999708
BdEngineer
  • 2,929
  • 4
  • 49
  • 85
  • 2
    Saw your `amazon-s3` tag and wanted to mention that modern object stores (Ozone, S3, ADLS2) scale much better than "classical" `hdfs`-hadoop in terms of metadata, and thus much less susceptible to smallfiles problem. That said, client applications still pay a penalty for having to maintain huge directory and file listings. – mazaneicha Feb 12 '20 at 21:36
  • 1
    As @mazaneicha says, MinIO even benchmarked themselves against HDFS https://blog.min.io/hdfsbenchmark/ – OneCricketeer Feb 12 '20 at 22:02
  • https://www.slideshare.net/Hadoop_Summit/ozone-scaling-hdfs-to-trillions-of-objects-103004375 – mazaneicha Feb 13 '20 at 13:28

1 Answers1

3
  1. Yes. Small files is not only a Spark problem. It causes unnecessary load on your NameNode. You should spend more time compacting and uploading larger files than worrying about OOM when processing small files. The fact that your files are less than 64MB / 128MB, then that's a sign you're using Hadoop poorly.

  2. Something like spark.read("hdfs://path").count() would read all the files in the path, then count the rows in the Dataframe

  3. There is no hard-set number. You need to enable JMX monitoring on your jobs and see what the heap size is reaching. Otherwise, arbitrarily double the current memory you're giving the job until it starts not getting OOM. If you start approaching more than 8GB, then you need to consider reading less data in each job by adding more parallelization.


FWIW, Kafka Connect can also be used to output partitioned HDFS/S3 paths.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • 2000 partitions? Do you have that many `companies * days` in your data? Why do you need so many executors? Why does the driver need 2 cores and 4g memory if it doesn't really do any work? Why can't you use 1-2g for the executors which actually do the work? – OneCricketeer Feb 12 '20 at 17:41
  • 1
    You make the files larger by reducing the granularity in your date partition. Make it company ID by year or month instead, for example – OneCricketeer Feb 12 '20 at 17:42
  • Not sure what you mean. All HDFS references take a Path. That Path must be a file or directory. Or see https://stackoverflow.com/a/31784292/2308683 – OneCricketeer Feb 12 '20 at 19:36
  • @BdEngineer If you give a folder to the Spark reader, it recursively reads all files – OneCricketeer Feb 12 '20 at 22:01
  • 1
    Like, why can't you use `date=2014-06` instead of including the day? And yes, you can `coalesce` the dataframe or you can increase the trigger interval to something like 5 to 15 **minutes** – OneCricketeer Feb 13 '20 at 04:16