7

I'm having a pyspark job which runs without any issues when ran locally, but when It runs from the aws cluster, it gets stuck at the point when it reaches the below code. The job just process 100 records. "some_function" posts data into a website and it returns a response at the end. Any idea what's going wrong or How I can debug this? FYI: "Some_function" is outside of the class, I guess the issue is related to ["closures"][1], But not sure how to fix it

response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()

Full code below

def ctgs(entries):
    col1 = entries[0]
    col2 = entries[1]
    col3 = entries[2]

    rec = {
    up_col1 : col1,
    up_col2 : col2,
    up_col3 : col3

    }

    return rec

def some_func1(rec, dict_name, id):
recs{
    rec_list = list(rec)
    seid = id
    }
    headers = "some header"
    attrburl = "www.someurl.com"

    response = requests.post(attrburl, data=json.dumps(rec_list)), headers)

    return response


class Processor:
    def __init(self, sc, arguments):
    self.sc = sc
    self.env = arguments.env
    self.dte = arguments.dte
    self.sendme = arguments.sendme

    def send_them(ext_data, dict_name,id):
        attributes = ext_data.rdd.map(lambda x: ctgs(x['col1'], x['col2'], x[col3]))

        response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()

    def extr_data(self):
        ext_data=spark.sql('''select col1, col2, col3 from table_name''')
        send_them(ext_data,dict_name,id)


    def process(self):
        dict_name = { dict_id: '34343-3434-3433-343'}
        id = 'dfdfd-erere-dfd'
        extr_data()



def argument_parsing(args):
    parser.add_argument("--env", required=True)
    parser.add_argument("--dte",  required=True)
    parser.add_argument("--sendme", required=False)
    args = parser.parse_args(args)
    return args


def main(args):

        arguments = argument_parsing(args)

        sc = SparkSession \
            .builder \
            .appName("job_name") \
            .enableHiveSupport() \
            .getOrCreate()

        sc.sparkContext.setLogLevel("ERROR")

        processor = Processor(sc, arguments)
        processor.process()
user7343922
  • 316
  • 4
  • 17
  • Obviously there is the problem of connectivity to a given URL. You can start with specifying [timeout](https://3.python-requests.org/user/quickstart/#timeouts). Probably there is also an option in `requests` module that enables debug messages. – gudok Oct 18 '21 at 04:32
  • The log you showed looks like **driver logs**. What do you see in **executor logs**? And are you sure the outgoing network is enabled? – pltc Oct 18 '21 at 04:47
  • Change log level to debug (if not already), put `requests.post()` call in `try-except` block and manually check from aws cluster whether you can access `attribute_url` or some firewall rule blocking the connection – zweack Oct 18 '21 at 04:52
  • @gudok I have added the executor log now and I tried adding timeout=4, even then the job is just hanging in the same position for ever. Any other option I can try? As I said, the job runs without any issues when I run from local and I also tried pinging the link from the cluster I'm able to get responses – user7343922 Oct 18 '21 at 14:04
  • @pitc Added executor screehshot – user7343922 Oct 18 '21 at 14:34
  • @gudok FYI - I'm wondering whether my problem is because calling map inside mappartitions?, where the function "some_function" resides outside of the class, whether calling a function which resides outside of the class, will cause any issues. like this https://stackoverflow.com/questions/44289962/errorsparkcontext-can-only-be-used-on-the-driver-not-in-code-that-it-run-on-wo , But I'm not sure How I can implement this in my case – user7343922 Oct 18 '21 at 21:44
  • @gudok , It seems like this issues is almost same as mine(I'm getting the same error in the executor), But I'm not very clear about the solution posted, Can you shed some light on this? https://stackoverflow.com/questions/64462317/could-not-find-valid-spark-home-while-searching-on-aws-emr – user7343922 Oct 18 '21 at 22:30
  • You left out your imports in the code but I assume you are usying pysparks requests. This needs the spark context and will not work to be called inside executor code. https://spark.apache.org/docs/latest/api/python/_modules/pyspark/resource/requests.html – Matt Andruff Nov 08 '21 at 16:26

2 Answers2

1

You are correct this is an issue with closures/executors.

Code that is inside mapPartitions will run on executors when in cluster. Running 'local' will obscures these type of bugs/error as it scopes all the functions to driver which is running on your machine. There is not a scope issue running in 'local'

There are two types of problems when dealing with closures/executors. Your scoped variables not being serializable and the environment that the executor is running in.

The environment check should be easy. Can you connect to the URL from one of your executors if you just ssh in and try and connect. (My bet is you are waiting on a URL looking up in DNS). In fact I'd suggest you start by checking the security group for the EMR cluster and seeing what nodes are allowed to access.

The scope is a little more challenging. If requests is initiated in the global scope but not serializable this could cause an issue. (You can't serialize a inflight connection to a database/website.) You could just initiate it inside of mapPartitions and this would solve the issue. The thing is this would usually fail immediately and doesn't really fit the issue you are describing. Unless this is causing python interpreter to die and falsely report it's waiting, I do not think this is the issue.

Matt Andruff
  • 4,974
  • 1
  • 5
  • 21
  • Yes, The request is posted from a function outside of the class, I even tried to run this using 1 Node. Even then the same issue. And I'm able to reach the URL from the executor. But as I mentioned in the post, I'm getting the error "Couldn't find SPARK_HOME path". But how can I initiate it inside the mappartitions? How can I fix this Issue? Also How can I apply the solution posted here in mine? https://stackoverflow.com/questions/64462317/could-not-find-valid-spark-home-while-searching-on-aws-emr – user7343922 Oct 20 '21 at 18:25
  • Outside of Local you will always get a closure issue relying on the spark context(-->Couldn't find SPARK_HOME path) on an executor. (--> code inside mapPartitions) You will need to initialize the connection inside mapPartions, and I can't tell you how to do that as you haven't posted the code for 'requests'. – Matt Andruff Oct 20 '21 at 19:28
0

I suggest that you try this python solution. This will not use Pyspark Request object that relies on spark session. You will need this library installed on every node for this to work or pass the it on spark-submit. Basically this removes the requirement on Spark Session, and hence will work in a MapPartition enclosure.

basically you would use:

def some_func1(rec, dict_name, id):
    import grequest
    recs{
        rec_list = list(rec)
        seid = id
    }
    headers = "some header"
    attrburl = "www.someurl.com"

    response = grequests.post(attrburl, data=json.dumps(rec_list)), headers)

    return response
Matt Andruff
  • 4,974
  • 1
  • 5
  • 21
  • Yes, before posting the bounty I rewrote the entire thing in python, But I'm wondering is there any pyspark solution. – user7343922 Nov 12 '21 at 14:33
  • Well the solution, when using mapPartitions is to use language dependent tools(ie python tools), not spark dependent tools that might have a dependency on spark context. Basically, you should use spark, but inside 'mapParitions' use python code that doesn't depend on spark internals. – Matt Andruff Nov 12 '21 at 15:00