0

I have a one column dataframe I ingested from a text file. So from this:

oneColDF = (spark.read
.format("text")        
.load(file_path))

display(oneColDF)

, leading to this df:

enter image description here

with a hundred or so rows. There is no dependable delimiter in this format (for instance, a space won't work because some fields have spaces within), however, the columns are fixed-width, so i know column name and width for each field (all fields are strings). I know this because I was provided with dictionary:

fixed_width_column_defs = {
  "submitted_at": (1, 15),
  "order_id": (16, 40),
  "customer_id": (56, 40),
  "sales_rep_id": (96, 40),
  "sales_rep_ssn": (136, 15),
  "sales_rep_first_name": (151, 15),
  "sales_rep_last_name": (166, 15),
  "sales_rep_address": (181, 40),
  "sales_rep_city": (221, 20),
  "sales_rep_state": (241, 2),
  "sales_rep_zip": (243, 5),
  "shipping_address_attention": (248, 30),
  "shipping_address_address": (278, 40),
  "shipping_address_city": (318, 20),
  "shipping_address_state": (338, 2),
  "shipping_address_zip": (340, 5),
  "product_id": (345, 40),
  "product_quantity": (385, 5),
  "product_sold_price": (390, 20)
}

So I can add empty columns like this:

multiColDF= oneColDF.withColumn('submitted_at',     
lit(None).cast(StringType())).withColumn('order_id', lit(None).cast(StringType())).withColumn('customer_id', lit(None).cast(StringType())).withColumn('sales_rep_id', lit(None).cast(StringType())).withColumn('sales_rep_ssn', lit(None).cast(StringType())).withColumn('sales_rep_first_name', lit(None).cast(StringType())).withColumn('sales_rep_last_name', lit(None).cast(StringType())).withColumn('sales_rep_address', lit(None).cast(StringType())).withColumn('sales_rep_city', lit(None).cast(StringType())).withColumn('sales_rep_state', lit(None).cast(StringType())).withColumn('sales_rep_zip', lit(None).cast(StringType())).withColumn('shipping_address_attention', lit(None).cast(StringType())).withColumn('shipping_address_address', lit(None).cast(StringType())).withColumn('shipping_address_city', lit(None).cast(StringType())).withColumn('shipping_address_state', lit(None).cast(StringType())).withColumn('shipping_address_zip', lit(None).cast(StringType())).withColumn('product_id', lit(None).cast(StringType())).withColumn('product_quantity', lit(None).cast(StringType())).withColumn('product_sold_price', lit(None).cast(StringType()))

, so now the dataframe has all the columns:

enter image description here

So I am trying to figure out how to loop through the dataframe to update all the columns with the appropriate data from the value column.

for row in multiColDF.rdd.collect():
   addTheDatafromValColToEachCol()

I am stuck at this point. I would be grateful for any ideas, either building on what i have done or perhaps a simpler solution. Thank you.

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
Timothy Clotworthy
  • 1,960
  • 2
  • 19
  • 42
  • Suggest this might be renamed "access fixed field data file with spark sql". Old timers will understand that. – pauljohn32 Aug 13 '21 at 19:43

1 Answers1

1

You do not need to add the columns before, you can do this dynamically using substring and looping through the dictionary using select:

from pyspark.sql import functions as F
out = df.select("value",*[F.substring("value",*v).alias(k) 
                     for k,v in fixed_width_column_defs.items()])
anky
  • 74,114
  • 11
  • 41
  • 70
  • thank you! That did the trick. I am trying to understand the "*v"? – Timothy Clotworthy Aug 12 '21 at 17:20
  • 1
    @TimothyClotworthy substring takes 3 parameters, 1: col , 2: position 3: length , we have the col, but not the other 2, hence we splat the values in v which is a tuple to populate the param 2 and param 3, try printing `[*(2,3)]`, you will see that the tuple has been unpacked to the container which is a list here. that might help :) – anky Aug 12 '21 at 17:23
  • sorry, from the context of the code above, how do I print out the [*(2,3)]? – Timothy Clotworthy Aug 12 '21 at 17:31
  • @TimothyClotworthy sorry , I mean print it independently to understand like: `print([*(2,3)])` or even `print(*(2,3))` to see the unpacking of a tuple using a `*` . More details here : https://stackoverflow.com/questions/400739/what-does-asterisk-mean-in-python – anky Aug 12 '21 at 17:32
  • hmm, so then I am confused on what I presume is another unpacking (this: *[F.substring("value",*v).alias(k) for k,v in fixed_width_column_defs.items()])? Since it has asterisk in front of a fairly complex expression... Thanks! – Timothy Clotworthy Aug 12 '21 at 18:09
  • @TimothyClotworthy that is for the select expression which requires individual columns, hence we unpack the list – anky Aug 12 '21 at 18:16
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/235950/discussion-between-timothy-clotworthy-and-anky). – Timothy Clotworthy Aug 12 '21 at 19:05