I'm having a pyspark job which runs without any issues when ran locally, but when It runs from the aws cluster, it gets stuck at the point when it reaches the below code. The job just process 100 records. "some_function" posts data into a website and it returns a response at the end. Any idea what's going wrong or How I can debug this? FYI: "Some_function" is outside of the class, I guess the issue is related to ["closures"][1], But not sure how to fix it
response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()
Full code below
def ctgs(entries):
col1 = entries[0]
col2 = entries[1]
col3 = entries[2]
rec = {
up_col1 : col1,
up_col2 : col2,
up_col3 : col3
}
return rec
def some_func1(rec, dict_name, id):
recs{
rec_list = list(rec)
seid = id
}
headers = "some header"
attrburl = "www.someurl.com"
response = requests.post(attrburl, data=json.dumps(rec_list)), headers)
return response
class Processor:
def __init(self, sc, arguments):
self.sc = sc
self.env = arguments.env
self.dte = arguments.dte
self.sendme = arguments.sendme
def send_them(ext_data, dict_name,id):
attributes = ext_data.rdd.map(lambda x: ctgs(x['col1'], x['col2'], x[col3]))
response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()
def extr_data(self):
ext_data=spark.sql('''select col1, col2, col3 from table_name''')
send_them(ext_data,dict_name,id)
def process(self):
dict_name = { dict_id: '34343-3434-3433-343'}
id = 'dfdfd-erere-dfd'
extr_data()
def argument_parsing(args):
parser.add_argument("--env", required=True)
parser.add_argument("--dte", required=True)
parser.add_argument("--sendme", required=False)
args = parser.parse_args(args)
return args
def main(args):
arguments = argument_parsing(args)
sc = SparkSession \
.builder \
.appName("job_name") \
.enableHiveSupport() \
.getOrCreate()
sc.sparkContext.setLogLevel("ERROR")
processor = Processor(sc, arguments)
processor.process()