I am attempting to run the following code, in a Dataproc cluster (you can find the software versions I am using here):
# IMPORTANT: THIS CODE WAS RUN IN A SINGLE JUPYTER NOTEBOOK CELL
print("IMPORTING LIBRARIES...")
import pandas as pd
import numpy as np
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, pandas_udf
# https://spark.apache.org/docs/3.1.3/api/python/_modules/pyspark/sql/types.html
from pyspark.sql.types import ArrayType, StringType
print("STARTING SPARK SESSION...")
spark = SparkSession.builder.appName('SpacyOverPySpark') \
.getOrCreate()
print("FUNCTION DEFINTION...")
def load_spacy_model():
import spacy
print("\tLoading spacy model...")
return spacy.load("./spacy_model") # This model exists locally
@pandas_udf(ArrayType(StringType()))
def get_entities(list_of_text: pd.Series) -> pd.Series:
# retrieving the shared nlp object
nlp = broadcasted_nlp.value
# batch processing our list of text
docs = nlp.pipe(list_of_text)
# entity extraction (`ents` is a list[list[str]])
ents=[
[ent.text for ent in doc.ents]
for doc in docs
]
return pd.Series(ents)
# loading spaCy model and broadcasting it
broadcasted_nlp = spark.sparkContext.broadcast(load_spacy_model())
print("DATA READING (OR MANUAL DATA GENERATION)...")
# # Manually-generated data (DISABLED BY DEFAULT, USE FOR "TESTING")
# # IMPORTANT: Code works well for this case !!!
# pdf = pd.DataFrame(
# [
# "Python and Pandas are very important for Automation",
# "Tony Stark is an Electrical Engineer",
# "Pipe welding is a very dangerous task in Oil mining",
# "Nursing is often underwhelmed, but it's very interesting",
# "Software Engineering now opens a lot of doors for you",
# "Civil Engineering can get exiting, as you travel very often",
# "I am a Java Programmer, and I think I'm quite good at what I do",
# "Diane is never bored of doing the same thing all day",
# "My father is a Doctor, and he supports people in condition of poverty",
# "A janitor is required as soon as possible"
# ],
# columns=['posting']
# )
# sdf=spark.createDataFrame(pdf)
# Reading data from CSV stored in GCS (ENABLED BY DEFAULT, USE FOR "PRODUCTION")
sdf = spark.read.csv("gs://onementor-ml-data/1M_indeed_eng_clean.csv", header=True) # ~1M rows, 1 column 'posting', ~1GB in size
print("\tDataFrame shape: ", (sdf.count(), len(sdf.columns)))
print("NAMED ENTITY RECOGNITION USING SPACY OVER PYSPARK...")
t1 = time.time()
# df_dummy2.withColumn("entities", get_entities(col("posting"))).show(5, truncate=10)
sdf_new = sdf.withColumn('skills',get_entities('posting'))
sdf_new.show(5, truncate=10)
print("\tData mined in {:.2f} seconds (Dataframe shape: ({}, {}))".format(
time.time()-t1,
sdf_new.count(),
len(sdf_new.columns))
)
BTW, some basic specs of my cluster (this info can be updated, please request it in the comment section):
- Master node
- Standard (1 master, N workers)
- Machine type: n1-highmem-4 (originally n1-standard-4, still with errors)
- Number of GPUs: 0
- Primary disk type: pd-standard
- Primary disk size: 500GB
- Local SSDs: 0
- Worker nodes
- (Qty.:) 10 (originally 2, still with errors)
- Machine type: n1-standard-4
- Number of GPUs: 0
- Primary disk type: pd-standard
- Primary disk size: 500GB
- Local SSDs: 0
- Secondary worker nodes: 0
When running the previous script with the "manually-generated data", the entity extraction works OK (if you need details about how I created my cluster, hit that link too); however when importing the .csv
data from Cloud Storage, the following error appears (both VM and cluster names have been changed, for safety):
ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 11 on my-vm-w-9.us-central1-a.c.my-project.internal: Container marked as failed: container_1661960727108_0002_01_000013 on host: my-vm-w-9.us-central1-a.c.my-project.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.
I have also read in the logs the following warning:
WARN org.apache.spark.storage.BlockManagerMasterEndpoint: No more replicas available for broadcast_0_python !
I have made a quick research, but I was astonished at the considerable amount of very different possible causes of that error (however none of them valid for PySpark over Dataproc), so I am not quite sure if there's a more optimal troubleshooting approach for this case (than just shooting blindly at case after case I find in the web).
What could be happening here?
Thank you