I am facing the following exception tries various ways but not resolved.
It gives the exception in parallel distributed computing processing using ray library Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Traceback (most recent call last):
File "etl_engine_ray.py", line 148, in <module>
print(perform_etl(**requested_data))
File "etl_engine_ray.py", line 138, in perform_etl
futures = [process_etl.remote(each, uid, integration_id, time_stamp) for each in data]
File "etl_engine_ray.py", line 138, in <listcomp>
futures = [process_etl.remote(each, uid, integration_id, time_stamp) for each in data]
File "/home/glue_user/.local/lib/python3.7/site-packages/ray/remote_function.py", line 124, in _remote_proxy
return self._remote(args=args, kwargs=kwargs)
File "/home/glue_user/.local/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 295, in _invocation_remote_span
return method(self, args, kwargs, *_args, **_kwargs)
File "/home/glue_user/.local/lib/python3.7/site-packages/ray/remote_function.py", line 263, in _remote
self._pickled_function = pickle.dumps(self._function)
File "/home/glue_user/.local/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/home/glue_user/.local/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 620, in dump
return Pickler.dump(self, obj)
File "/home/glue_user/spark/python/pyspark/context.py", line 362, in __getnewargs__
"It appears that you are attempting to reference SparkContext from a broadcast "
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
from pyspark import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import SelectFields
import ray
import settings
sc = SparkContext.getOrCreate()
glue_context = GlueContext(sc)
@ray.remote
def process_etl(path:str, uid: str, integration_id: str, time_stamp: int):
try:
dynamic_df = glue_context.create_dynamic_frame_from_options(
connection_type = settings.CONNECTION_TYPE,
connection_options={
'paths':[path],
'recurse':True,
'groupFiles': settings.S3_GROUP_FILES,
'groupSize': settings.S3_GROUP_SIZE},
format='json',
format_options={"jsonPath": "*"}
)
# select only those column name that required
selected_data = SelectFields.apply(
frame = dynamic_df,
paths=['partner_script_id', 'details', 'registered_installation_id', 'type']
)
# Create file format
file_name = os.path.basename(path).split('.')[0]
parquet_path = f'{settings.S3_BUCKET_PATH}/{integration_id}/{uid}/{time_stamp}/{file_name}.parquet'
# If pipeline available then use custom pipeline
if file_name in settings.CUSTOM_ETL_PIPELINE:
selected_data = settings.CUSTOM_ETL_PIPELINE.get(file_name)(selected_data)
# Wtie data into bucket in parquet format
glue_context.write_dynamic_frame_from_options(
selected_data,
connection_type=settings.CONNECTION_TYPE,
connection_options={'path': parquet_path},
format='parquet',
format_options = {
"compression": "snappy",
'blockSize': settings.BLOCK_SIZE,
'pageSize': settings.PAGE_SIZE}
)
except Exception as error:
print(f'Exception in perform_etl is {error}')
return parquet_path
def perform_etl(uid: str, integration_id: str, type: str, data: list) -> dict:
time_stamp = int(time.time())
futures = [process_etl.remote(each, uid, integration_id, time_stamp) for each in data]
# a = sc.parallelize(data)
# d = a.map(lambda each: process_etl.remote(each, uid, integration_id, time_stamp)).collect()
# print(d)
final_data = ray.get(futures)
print(time.time() - start_time)
return final_data
if __name__ == '__main__':
print(perform_etl(**requested_data))
I have done lots of R and D but still have not found any root cause of it. Its not resolved please help me out with this.