4

I am facing the following exception tries various ways but not resolved.

It gives the exception in parallel distributed computing processing using ray library Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Traceback (most recent call last):
  File "etl_engine_ray.py", line 148, in <module>
    print(perform_etl(**requested_data))
  File "etl_engine_ray.py", line 138, in perform_etl
    futures = [process_etl.remote(each, uid, integration_id, time_stamp) for each in data]
  File "etl_engine_ray.py", line 138, in <listcomp>
    futures = [process_etl.remote(each, uid, integration_id, time_stamp) for each in data]
  File "/home/glue_user/.local/lib/python3.7/site-packages/ray/remote_function.py", line 124, in _remote_proxy
    return self._remote(args=args, kwargs=kwargs)
  File "/home/glue_user/.local/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 295, in _invocation_remote_span
    return method(self, args, kwargs, *_args, **_kwargs)
  File "/home/glue_user/.local/lib/python3.7/site-packages/ray/remote_function.py", line 263, in _remote
    self._pickled_function = pickle.dumps(self._function)
  File "/home/glue_user/.local/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/home/glue_user/.local/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 620, in dump
    return Pickler.dump(self, obj)
  File "/home/glue_user/spark/python/pyspark/context.py", line 362, in __getnewargs__
    "It appears that you are attempting to reference SparkContext from a broadcast "
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
from pyspark import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import SelectFields
import ray
import settings

sc = SparkContext.getOrCreate()
glue_context = GlueContext(sc)

@ray.remote
def process_etl(path:str, uid: str, integration_id: str, time_stamp: int):
    try:
            dynamic_df = glue_context.create_dynamic_frame_from_options(
                            connection_type = settings.CONNECTION_TYPE,
                            connection_options={
                                'paths':[path], 
                                'recurse':True, 
                                'groupFiles': settings.S3_GROUP_FILES,
                                'groupSize': settings.S3_GROUP_SIZE},
                            format='json',
                            format_options={"jsonPath": "*"}
                        )

            # select only those column name that required
            selected_data = SelectFields.apply(
                frame = dynamic_df, 
                paths=['partner_script_id', 'details', 'registered_installation_id', 'type']
            )

            # Create file format
            file_name = os.path.basename(path).split('.')[0]
            parquet_path = f'{settings.S3_BUCKET_PATH}/{integration_id}/{uid}/{time_stamp}/{file_name}.parquet'

            # If pipeline available then use custom pipeline
            if file_name in settings.CUSTOM_ETL_PIPELINE:
                selected_data = settings.CUSTOM_ETL_PIPELINE.get(file_name)(selected_data)

            # Wtie data into bucket in parquet format
            glue_context.write_dynamic_frame_from_options(
                                                        selected_data,
                                                        connection_type=settings.CONNECTION_TYPE,
                                                        connection_options={'path': parquet_path},
                                                        format='parquet',
                                                        format_options = {
                                                            "compression": "snappy", 
                                                            'blockSize': settings.BLOCK_SIZE, 
                                                            'pageSize': settings.PAGE_SIZE}
                                )
    except Exception as error:
            print(f'Exception in perform_etl is {error}') 
    return parquet_path    

def perform_etl(uid: str, integration_id: str, type: str, data: list) -> dict:
    
    time_stamp = int(time.time())
    futures = [process_etl.remote(each, uid, integration_id, time_stamp) for each in data]
    # a = sc.parallelize(data)
    # d = a.map(lambda each: process_etl.remote(each, uid, integration_id, time_stamp)).collect()
    # print(d)
    final_data = ray.get(futures)
    print(time.time() - start_time)
    return final_data


if __name__ == '__main__':
    print(perform_etl(**requested_data))

I have done lots of R and D but still have not found any root cause of it. Its not resolved please help me out with this.

VISHAL LIMGIRE
  • 529
  • 1
  • 5
  • 21

0 Answers0