I have a small PySpark script that I wrote that is looking for a value called resource_tags_user_engagement.
If the value is blank, null or includes a word it should be replaced by a default. But instead of just replacing blank, null or word it's replacing ALL values:
import sys
import pyspark.sql.functions as f
from pyspark.context import SparkContext
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
# Set Glue Context
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
spark.sql("set spark.sql.parquet.enableVectorizedReader=false")
# Create Dynamic Data Frame from table in the glue database
cost_allocation = glueContext.create_dynamic_frame.from_catalog(database="company_cost_allocation", table_name="company_cost_allocation")
# Convert dynamic frame to dta frame
cost_allocation_df = cost_allocation.toDF()
# Set default engagements
cost_allocation_df = cost_allocation_df.withColumn('resource_tags_user_engagement',
f.when(
(f.col('line_item_usage_account_id').isin('123456789101', '123456789102', '123456789103', '123456789104', '123456789105', '123456789106', '123456789107', '123456789108', '123456789109' )) &
(f.col('resource_tags_user_engagement') == '' ) |
(f.col('resource_tags_user_engagement').isNull()) |
(f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '400000008378'
)) \
.withColumn('resource_tags_user_engagement',
f.when(
((f.col('line_item_usage_account_id') == f.lit('123456789110')) |
(f.col('line_item_usage_account_id') == f.lit('123456789111'))) &
(f.col('resource_tags_user_engagement') == f.lit('') ) |
(f.col('resource_tags_user_engagement').isNull()) |
(f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '807000000401'
)) \
.withColumn('resource_tags_user_engagement',
f.when(
(f.col('line_item_usage_account_id').isin('123456789112', '123456789113', '123456789114')) &
(f.col('resource_tags_user_engagement') == '' ) |
(f.col('resource_tags_user_engagement').isNull()) |
(f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '807000000412'
)) \
.withColumn('resource_tags_user_engagement',
f.when(
(f.col('line_item_usage_account_id').isin('123456789115', '123456789116', '123456789117', '123456789118', '123456789119', '123456789120', '123456789121', '123456789122', '123456789123')) &
(f.col('resource_tags_user_engagement') == '' ) |
(f.col('resource_tags_user_engagement').isNull()) |
(f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '400000008692'
)) \
.withColumn('resource_tags_user_engagement',
f.when(
(f.col('line_item_usage_account_id').isin('123456789124', '123456789125', '123456789126')) &
(f.col('resource_tags_user_engagement') == '' ) |
(f.col('resource_tags_user_engagement').isNull()) |
(f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '807000000412'
)) \
.withColumn('resource_tags_user_engagement',
f.when(
(f.col('line_item_usage_account_id').isin('123456789127', '123456789128', '123456789129', '123456789130', '123456789131')) &
(f.col('resource_tags_user_engagement') == '' ) |
(f.col('resource_tags_user_engagement').isNull()) |
(f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '808000000298'
)) \
.withColumn('resource_tags_user_engagement',
f.when(
(f.col('line_item_usage_account_id') == '123456789132') &
(f.col('resource_tags_user_engagement') == '' ) |
(f.col('resource_tags_user_engagement').isNull()) |
(f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '803000006453'
)) \
.withColumn('resource_tags_user_engagement',
f.when(
((f.col('line_item_usage_account_id') == f.lit('123456789133')) |
(f.col('line_item_usage_account_id') == f.lit('123456789134'))) &
(f.col('resource_tags_user_engagement') == f.lit('') ) |
(f.col('resource_tags_user_engagement').isNull()) |
(f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '400000008426'
)) \
.withColumn('resource_tags_user_engagement',
f.when(
((f.col('line_item_usage_account_id') == f.lit('123456789135')) |
(f.col('line_item_usage_account_id') == f.lit('123456789136'))) &
(f.col('resource_tags_user_engagement') == f.lit('') ) |
(f.col('resource_tags_user_engagement').isNull()) |
(f.col('resource_tags_user_engagement').rlike('^[a-zA-Z]')), '800000047650'
).otherwise(f.col('resource_tags_user_engagement')))
# Convert back to a DynamicFrame for further processing.
partitioned_dynamicframe = DynamicFrame.fromDF(cost_allocation_df, glueContext, "partitioned_df")
# Repartition the dynamic frame before writing to S3
cost_allocation_df = cost_allocation_df.repartition(5)
# Write to S3
output_dir = "s3://company-cur-reports/company-costs-transformed-legacy-billing"
datasink = glueContext.write_dynamic_frame.from_options(frame = partitioned_dynamicframe, connection_type = "s3", connection_options = {"path": output_dir}, format = "parquet", transformation_ctx = "datasink")
Why is it doing that? How can I get the script to replace only values that are null, blank or have a word in them?