3

Deployed the following colab python code(see link below) to Dataproc on Google Cloud and it only works when the input_list is an array with one item, when the input_list has two items then the PySpark job dies with the following error on line "for r in result.collect()" in get_similarity method below:

java.io.IOException: Premature EOF from inputStream
        at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
        at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
        at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:739)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
        at java.lang.Thread.run(Thread.java:745)
input_list=["no error"]                 <---- works
input_list=["this", "throws EOF error"] <---- does not work

link to colab for sentence similarity using spark-nlp: https://colab.research.google.com/github/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/streamlit_notebooks/SENTENCE_SIMILARITY.ipynb#scrollTo=6E0Y5wtunFi4

def get_similarity(input_list):
    df = spark.createDataFrame(pd.DataFrame({'text': input_list}))
    result = light_pipeline.transform(df)
    embeddings = []
    for r in result.collect():
        embeddings.append(r.sentence_embeddings[0].embeddings)
    embeddings_matrix = np.array(embeddings)
    return np.matmul(embeddings_matrix, embeddings_matrix.transpose())

I've tried changing the dfs.datanode.max.transfer.threads to 8192 in hadoop cluster config and still no luck:

hadoop_config.set('dfs.datanode.max.transfer.threads', "8192")

How can I get this code working when input_list has multiple items in the array?

Igor Dvorzhak
  • 4,360
  • 3
  • 17
  • 31
Machine Learning
  • 485
  • 6
  • 15

1 Answers1

0

java.io.IOException: Premature EOF from inputStream could point to the lack of disk bandwidth, HDFS DataNodes overload, or many other issues: Hadoop MapReduce job I/O Exception due to premature EOF from inputStream.

Increasing number of DataNode transfer threads in Spark application didn't change anything because you need to change this property in HDFS config on each cluster worker and restart DataNode service on each worker. The simplest will be to just re-create a cluster with hdfs:dfs.datanode.max.transfer.threads=8192 cluster property.

Note, that if root cause of the issues is a lack of disk bandwidth then increasing number of transfer threads in DataNodes will only exaggerate it, not fix it.

You have multiple options to try to solve this issue:

  1. To increase local disk bandwidth, use PD-SSD or Local SSD on worker nodes when creation cluster.

  2. If you are using a cluster with a small number of workers it could be the case that HDFS DataNodes (1 per worker) can not handle the load, as a workaround you can increase number of workers in a cluster or re-create cluster with the same capacity but a higher number of smaller workers if you are using workers with more than 4 CPU cores.

  3. Use Google Cloud Storage (gs:// schema) instead of HDFS to store data that you process - Google Cloud Storage scales much better than HDFS and should work out of the box.

Igor Dvorzhak
  • 4,360
  • 3
  • 17
  • 31