0

I have a small PySpark script that I wrote that is looking for a value called resource_tags_user_engagement.

If the value is blank, null or includes a word it should be replaced by a default. But instead of just replacing blank, null or word it's replacing ALL values:

import sys
import pyspark.sql.functions as f
from pyspark.context import SparkContext
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

# Set Glue Context
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
spark.sql("set spark.sql.parquet.enableVectorizedReader=false")

# Create Dynamic Data Frame from table in the glue database
cost_allocation = glueContext.create_dynamic_frame.from_catalog(database="company_cost_allocation", table_name="company_cost_allocation")

# Convert dynamic frame to dta frame
cost_allocation_df = cost_allocation.toDF()

# Set default engagements
cost_allocation_df = cost_allocation_df.withColumn('resource_tags_user_engagement',          
         f.when(
               (f.col('line_item_usage_account_id').isin('123456789101',  '123456789102', '123456789103',  '123456789104', '123456789105', '123456789106', '123456789107', '123456789108', '123456789109' )) &
               (f.col('resource_tags_user_engagement') == '' ) |
               (f.col('resource_tags_user_engagement').isNull()) |
               (f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '400000008378'
               )) \
               .withColumn('resource_tags_user_engagement',          
         f.when(
               ((f.col('line_item_usage_account_id') == f.lit('123456789110')) | 
               (f.col('line_item_usage_account_id') == f.lit('123456789111'))) & 
               (f.col('resource_tags_user_engagement') == f.lit('') ) |
               (f.col('resource_tags_user_engagement').isNull()) |
               (f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '807000000401'
               )) \
                .withColumn('resource_tags_user_engagement',          
         f.when(
               (f.col('line_item_usage_account_id').isin('123456789112',  '123456789113', '123456789114')) &
               (f.col('resource_tags_user_engagement') == '' ) |
               (f.col('resource_tags_user_engagement').isNull()) |
               (f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '807000000412'
               )) \
               .withColumn('resource_tags_user_engagement',          
         f.when(
               (f.col('line_item_usage_account_id').isin('123456789115',  '123456789116', '123456789117',  '123456789118', '123456789119', '123456789120', '123456789121', '123456789122', '123456789123')) &
               (f.col('resource_tags_user_engagement') == '' ) |
               (f.col('resource_tags_user_engagement').isNull()) |
               (f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '400000008692'
               )) \
                .withColumn('resource_tags_user_engagement',          
         f.when(
               (f.col('line_item_usage_account_id').isin('123456789124',  '123456789125', '123456789126')) &
               (f.col('resource_tags_user_engagement') == '' ) |
               (f.col('resource_tags_user_engagement').isNull()) |
               (f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '807000000412'
               )) \
                .withColumn('resource_tags_user_engagement',          
         f.when(
               (f.col('line_item_usage_account_id').isin('123456789127',  '123456789128', '123456789129', '123456789130', '123456789131')) &
               (f.col('resource_tags_user_engagement') == '' ) |
               (f.col('resource_tags_user_engagement').isNull()) |
               (f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '808000000298'
               )) \
                .withColumn('resource_tags_user_engagement',          
        f.when(
               (f.col('line_item_usage_account_id') == '123456789132') &
               (f.col('resource_tags_user_engagement') == '' ) |
               (f.col('resource_tags_user_engagement').isNull()) |
               (f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '803000006453'
               )) \
                .withColumn('resource_tags_user_engagement',          
         f.when(
               ((f.col('line_item_usage_account_id') == f.lit('123456789133')) | 
               (f.col('line_item_usage_account_id') == f.lit('123456789134'))) &
               (f.col('resource_tags_user_engagement') == f.lit('') ) |
               (f.col('resource_tags_user_engagement').isNull()) |
               (f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '400000008426'
               )) \
                .withColumn('resource_tags_user_engagement',
        f.when(
               ((f.col('line_item_usage_account_id') == f.lit('123456789135')) | 
               (f.col('line_item_usage_account_id') == f.lit('123456789136'))) &
               (f.col('resource_tags_user_engagement') == f.lit('') ) |
               (f.col('resource_tags_user_engagement').isNull()) |
               (f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '800000047650'
               ).otherwise(f.col('resource_tags_user_engagement')))


# Convert back to a DynamicFrame for further processing.
partitioned_dynamicframe = DynamicFrame.fromDF(cost_allocation_df, glueContext, "partitioned_df")

# Repartition the dynamic frame before writing to S3
cost_allocation_df = cost_allocation_df.repartition(5)

# Write to S3
output_dir = "s3://company-cur-reports/company-costs-transformed-legacy-billing"
datasink = glueContext.write_dynamic_frame.from_options(frame = partitioned_dynamicframe, connection_type = "s3", connection_options = {"path": output_dir}, format = "parquet", transformation_ctx = "datasink")

Why is it doing that? How can I get the script to replace only values that are null, blank or have a word in them?

bluethundr
  • 1,005
  • 17
  • 68
  • 141
  • 1
    You're missing the `.otherwise(f.col('resource_tags_user_engagement'))` in all but the last `withColumn` statement. If the condition doesn't match, `when` will return `null` – pault Nov 26 '19 at 16:17
  • 1
    Also, you can chain together multiple calls to `when`, instead of using `withColumn` repeatedly: [Spark Equivalent of IF Then ELSE](https://stackoverflow.com/questions/39048229/spark-equivalent-of-if-then-else). Making that change would probably fix your code. – pault Nov 26 '19 at 16:18
  • Ok thanks! I will look into that. – bluethundr Nov 26 '19 at 16:28

0 Answers0