1

I am attempting to run the following code, in a Dataproc cluster (you can find the software versions I am using here):

# IMPORTANT: THIS CODE WAS RUN IN A SINGLE JUPYTER NOTEBOOK CELL

print("IMPORTING LIBRARIES...")

import pandas as pd
import numpy as np
import time

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, pandas_udf
# https://spark.apache.org/docs/3.1.3/api/python/_modules/pyspark/sql/types.html
from pyspark.sql.types import ArrayType, StringType

print("STARTING SPARK SESSION...")

spark = SparkSession.builder.appName('SpacyOverPySpark') \
                    .getOrCreate()

print("FUNCTION DEFINTION...")

def load_spacy_model():
    import spacy
    print("\tLoading spacy model...")
    return spacy.load("./spacy_model")  # This model exists locally


@pandas_udf(ArrayType(StringType()))
def get_entities(list_of_text: pd.Series) -> pd.Series:
    # retrieving the shared nlp object
    nlp = broadcasted_nlp.value
    # batch processing our list of text
    docs = nlp.pipe(list_of_text)
    # entity extraction (`ents` is a list[list[str]])
    ents=[
        [ent.text for ent in doc.ents]
        for doc in docs
    ]
    return pd.Series(ents)

# loading spaCy model and broadcasting it
broadcasted_nlp = spark.sparkContext.broadcast(load_spacy_model())

print("DATA READING (OR MANUAL DATA GENERATION)...")

# # Manually-generated data (DISABLED BY DEFAULT, USE FOR "TESTING")
# # IMPORTANT: Code works well for this case !!!

# pdf = pd.DataFrame(
#     [
#         "Python and Pandas are very important for Automation",
#         "Tony Stark is an Electrical Engineer",
#         "Pipe welding is a very dangerous task in Oil mining",
#         "Nursing is often underwhelmed, but it's very interesting",
#         "Software Engineering now opens a lot of doors for you",
#         "Civil Engineering can get exiting, as you travel very often",
#         "I am a Java Programmer, and I think I'm quite good at what I do",
#         "Diane is never bored of doing the same thing all day",
#         "My father is a Doctor, and he supports people in condition of poverty",
#         "A janitor is required as soon as possible"
#     ],
#     columns=['posting']
# )
# sdf=spark.createDataFrame(pdf)

# Reading data from CSV stored in GCS (ENABLED BY DEFAULT, USE FOR "PRODUCTION")

sdf = spark.read.csv("gs://onementor-ml-data/1M_indeed_eng_clean.csv", header=True)  # ~1M rows, 1 column 'posting', ~1GB in size

print("\tDataFrame shape: ", (sdf.count(), len(sdf.columns)))

print("NAMED ENTITY RECOGNITION USING SPACY OVER PYSPARK...")

t1 = time.time()

# df_dummy2.withColumn("entities", get_entities(col("posting"))).show(5, truncate=10)
sdf_new = sdf.withColumn('skills',get_entities('posting'))

sdf_new.show(5, truncate=10)

print("\tData mined in {:.2f} seconds (Dataframe shape: ({}, {}))".format(
    time.time()-t1,
    sdf_new.count(),
    len(sdf_new.columns))
)

BTW, some basic specs of my cluster (this info can be updated, please request it in the comment section):

  • Master node
    • Standard (1 master, N workers)
    • Machine type: n1-highmem-4 (originally n1-standard-4, still with errors)
    • Number of GPUs: 0
    • Primary disk type: pd-standard
    • Primary disk size: 500GB
    • Local SSDs: 0
  • Worker nodes
    • (Qty.:) 10 (originally 2, still with errors)
    • Machine type: n1-standard-4
    • Number of GPUs: 0
    • Primary disk type: pd-standard
    • Primary disk size: 500GB
    • Local SSDs: 0
    • Secondary worker nodes: 0

When running the previous script with the "manually-generated data", the entity extraction works OK (if you need details about how I created my cluster, hit that link too); however when importing the .csv data from Cloud Storage, the following error appears (both VM and cluster names have been changed, for safety):

ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 11 on my-vm-w-9.us-central1-a.c.my-project.internal: Container marked as failed: container_1661960727108_0002_01_000013 on host: my-vm-w-9.us-central1-a.c.my-project.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.

I have also read in the logs the following warning:

WARN org.apache.spark.storage.BlockManagerMasterEndpoint: No more replicas available for broadcast_0_python !

I have made a quick research, but I was astonished at the considerable amount of very different possible causes of that error (however none of them valid for PySpark over Dataproc), so I am not quite sure if there's a more optimal troubleshooting approach for this case (than just shooting blindly at case after case I find in the web).

What could be happening here?

Thank you

David Espinosa
  • 760
  • 7
  • 21
  • How large is your dataset? Have your checked Spark UI to see how much shuffle data it generates? `Container released on a *lost* node` indicates the YARN NodeManager on w-9 crashed or became unhealthy, you can check the NodeManager log in Cloud Logging or `/var/log/hadoop-yarn` on the worker node to verify. Usually it is caused by OOM or insufficient local disk space. See https://aws.amazon.com/premiumsupport/knowledge-center/emr-exit-status-100-lost-node/ – Dagang Sep 02 '22 at 05:25
  • As dagang mentioned, this error message commonly appears since spark ran out of memory or there’s insufficient disk space. – Eduardo Ortiz Sep 07 '22 at 18:29
  • Hello @Dagang , that's the weird part, the dataset is only 1GB in size, which leaves the hypothesis of "cluster running out of memory" in a very debatable position. I will try to include the other requested information, in the original post description. – David Espinosa Sep 09 '22 at 16:36

0 Answers0