0

I am currently running into some issues when reading data from a Postgres database using JDBC connections in (Py)Spark. I have a table in Postgres that I would like to read in Spark, process it, and save the results as a .parquet file in an AWS S3 bucket.

I created a sample script which does some basic logic (to not overcomplicate the question):

from pyspark.sql import SparkSession
from pyspark.sql.functions import length
import argparse
import uuid
import datetime

def parse_arguments():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--loc",
        type=str,
        default="./",
        help="Output location"
    )

    args = parser.parse_known_args()[0]
    return args

if __name__=="__main__":
    args = parse_arguments()

    spark = SparkSession.builder. \
        appName("test-script"). \
        config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.6.5,org.postgresql:postgresql:42.1.1"). \
        getOrCreate()

    CENTRAL_ID = "id"
    PG_HOST = spark.conf.get("spark.yarn.appMasterEnv.PG_HOST")
    PG_PORT = spark.conf.get("spark.yarn.appMasterEnv.PG_PORT")
    PG_USER = spark.conf.get("spark.yarn.appMasterEnv.PG_USER")
    PG_DB = spark.conf.get("spark.yarn.appMasterEnv.PG_DB")
    PG_PASS = spark.conf.get("spark.yarn.appMasterEnv.PG_PASS")
    PG_MAX_CONCURRENT = spark.conf.get("spark.yarn.appMasterEnv.PG_MAX_CONCURRENT")

    table = "test_schema.test_table"
    partitions = spark.sparkContext.defaultParallelism
    fetch_size = 2000

    data = spark.read.format("jdbc"). \
        option("url", "jdbc:postgresql://{}:{}/{}".format(PG_HOST, PG_PORT, PG_DB)). \
        option("dbtable", "(SELECT *, MOD({}, {}) AS p FROM {}) AS t".format(CENTRAL_ID, partitions, table)). \
        option("user", PG_USER). \
        option("password", PG_PASS). \
        option("driver", "org.postgresql.Driver"). \
        option("partitionColumn", "p"). \
        option("lowerBound", 0). \
        option("upperBound", partitions). \
        option("numPartitions", PG_MAX_CONCURRENT). \
        option("fetchSize", fetch_size). \
        load()
    data = data.repartition(partitions)

    # Cache data
    data.cache()

    # Calculate on data
    out1 = data.withColumn("abstract_length", length("abstract"))
    out2 = data.withColumn("title_length", length("title"))

    # Create a timestamp
    time_stamp = datetime.datetime.utcnow().isoformat()
    save_id = "{}-{}".format(uuid.uuid4(), time_stamp)

    out1.select([CENTRAL_ID, "abstract_length"]).write.mode("overwrite").parquet("{}/{}/abstract".format(args.loc, save_id))
    out2.select([CENTRAL_ID, "title_length"]).write.mode("overwrite").parquet("{}/{}/title".format(args.loc, save_id))

    spark.stop()

    print("run successfull!")

The program attempts to limit the the number of concurrent queries to the Potsgres database to 8 (see PG_MAX_CONCURRENT). This is to not overload the database. After loading, I repartition to more (360) partitions, in order to distribute the data over all workers.

The EMR cluster configuration is as follows:

  • Master: m5.xlarge 4 vCore, 16 GiB memory, EBS only storage EBS Storage:64 GiB
  • 6 x Slave: m5.4xlarge 16 vCore, 64 GiB memory, EBS only storage EBS Storage:64 GiB
  • Spark 2.4.7

The spark-submit argument is as follows:

spark-submit \
--master yarn \
--conf 'spark.yarn.appMasterEnv.PG_HOST=<<host>>' \
--conf 'spark.yarn.appMasterEnv.PG_PORT=<<port>>' \
--conf 'spark.yarn.appMasterEnv.PG_DB=<<db>>' \
--conf 'spark.yarn.appMasterEnv.PG_USER=<<user>>' \
--conf 'spark.yarn.appMasterEnv.PG_PASS=<<password>>' \
--conf 'spark.yarn.appMasterEnv.PG_MAX_CONCURRENT=8' \
--conf 'spark.executor.cores=3' \
--conf 'spark.executor.instances=30' \
--conf 'spark.executor.memory=12g' \
--conf 'spark.driver.memory=12g' \
--conf 'spark.default.parallelism=360' \
--conf 'spark.kryoserializer.buffer.max=1000M' \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.dynamicAllocation.enabled=false' \
--packages 'com.johnsnowlabs.nlp:spark-nlp_2.11:2.6.5,org.postgresql:postgresql:42.1.1' \
program.py \
--loc s3a://<<bucket>>/

The first type of error I am getting:

Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.4 in stage 0.0 (TID 26, ip-172-31-35-159.eu-central-1.compute.internal, executor 9): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 140265 ms
Driver stacktrace:

I am unsure what this means. Could it be that fetching the data from the table takes too long? Or is there another reason?

The second type of error that I am getting is:

    ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container from a bad node: container_1609406414316_0002_01_000013 on host: ip-172-31-44-127.eu-central-1.compute.internal. Exit status: 137. Diagnostics: [2020-12-31 10:08:38.093]Container killed on request. Exit code is 137

This seems to indicate an OOM problem. But I cannot understand why I am getting an OOM error, because it seems to me that I allocated enough memory to the executors and drivers. Also when I look cluster's stats, I get the idea it has plenty of memory: Cluster stats

Could it be the case that when using 8 concurrent queries to the Postgres server, it sends 1/8th of the data to each of the executors, so the executors should be ready to receive 1/8th of the total size? Or does fetchSize limit the data size that is send to the executors in order to avoid memory issues? Or maybe is there another reason? The entire table that I try to process is ~110 GB.

Could somebody help? Thanks in advance!

thijsvdp
  • 404
  • 3
  • 16

2 Answers2

2

It seems you are timing out. Have you tried increasing your timeout configuration in your spark submit argument?

--conf 'spark.network.timeout=10000000' \

Spark cluster full of heartbeat timeouts, executors exiting on their own

Zach
  • 21
  • 1
  • Thanks, I will retry. However, this will not solve the OOM error, right? Do you have an idea what causes it? – thijsvdp Dec 31 '20 at 11:41
  • It seems to work better, but still getting the `ExecutorLostFailure: Container killed on request. Exit code is 137` errors... For now, the timeouts seem to be gone. – thijsvdp Dec 31 '20 at 12:16
0

Since the executor container is running out of memory, try adding this to your spark submit configuration:

--conf 'spark.executor.memory=20g' \

If it fails, try upping it more.

Zach
  • 21
  • 1
  • So then I have to decrease the number of executors, because of the memory restrictions on the cluster. But isn't it weird that the EMR memory stats (upper right figure) does not show any sign of being overloaded? – thijsvdp Dec 31 '20 at 12:29
  • Oh, I see now. You have plenty of memory overall, but it seems that one of the executors is running out of memory, yes? – Zach Dec 31 '20 at 12:36
  • 1
    the fetchsize may need to be turned down to 500 or less – Zach Dec 31 '20 at 12:42
  • Yes that's right, one or more are running out of memory for some reason. I am unsure why this is exactly. The total amount of data to read in is much smaller (110GB versus 360GB) than the memory I allocated... It seems that for some reason a fewer amount of executors (18 instead of 30) get a larger amount of data. Any ideas? – thijsvdp Dec 31 '20 at 12:42
  • the default fetchsize seems to be 10. Have you tried turning it down from 2000 to 100 or 500? – Zach Dec 31 '20 at 12:45
  • I ran the program yesterday with the default `fetchSize`. Also resulting in the '137' error. I could try for values of 100, or 500, but my understanding is that this will not change the problem as I encountered it for the default already? – thijsvdp Dec 31 '20 at 12:52
  • I just ran the code with `--conf 'spark.executor.memory=24g'` and `--conf 'spark.executor.instances=15'` without much different results... I think it is weird though that if I set `--conf 'spark.executor.instances=15'`, I get 10 executors, and if I set `--conf 'spark.executor.instances=30'`, I get 18 executors... Could this be the underlying problem? – thijsvdp Dec 31 '20 at 12:58
  • I am sorry, I have to go now. I will try the `fetchSize` thing later, and post the results here :) – thijsvdp Dec 31 '20 at 12:59
  • 1
    this seems to be relevant: https://stackoverflow.com/questions/29940711/apache-spark-setting-executor-instances-does-not-change-the-executors – Zach Dec 31 '20 at 13:07
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/226701/discussion-between-zach-and-thijsvdp). – Zach Jan 01 '21 at 15:21