-3

I have a table with 372 columns and it contains many columns having "long" datatype. I want to cast them to "int" datatype.

I found some solution from another similar question asked here, but it isn't working from me.

from pyspark.sql.functions import col

schema = {col: col_type for col, col_type in df.dtypes}
time_cols = [col for col, col_type in schema.items() if col_type in "timestamp date".split() or "date" in col or "time" in col]

for column in time_cols:
    df = df.withColumn(column, col(column).cast("to_timestamp"))
ZygD
  • 22,092
  • 39
  • 79
  • 102
Rahul Diggi
  • 288
  • 2
  • 16
  • 1
    that example you posted is for timestamp and will not work for you as you want to cast long type columns. see how `df.dtypes` outputs the type and then use list comprehension. – samkart Jun 23 '22 at 10:38
  • @samkart Hi, I replaced those timestamp with long. But still isn't working. – Rahul Diggi Jun 23 '22 at 10:48

3 Answers3

1

You need to update the code snippet as below , As Long is equivalent to bigint under DataFrame.schema , once you do that you ll be able to achieve the intended result

Data Preparation

input_list = [
    (1,"HR",111,1000,45,100)
    ,(1,"Sales",112,500,30,50)
    ,(2,"Manufacture",127,600,50,500)
    ,(2,"Hr",821,700,26,60)
    ,(2,"Business",754,350,18,22)
]

sparkDF = sql.createDataFrame(input_list,['emp_no','dept','pincode','salary','age','bonus'])

sparkDF.printSchema()

root
 |-- emp_no: long (nullable = true)
 |-- dept: string (nullable = true)
 |-- pincode: long (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

Identifying Casting Columns

schema = {col: col_type for col, col_type in sparkDF.dtypes}

##########
##### pprint(schema)

{'age': 'bigint',
 'bonus': 'bigint',
 'dept': 'string',
 'emp_no': 'bigint',
 'pincode': 'bigint',
 'salary': 'bigint'}

##########

cast_cols = [ col for col, col_type in schema.items() if col_type in ["bigint"] ]

Reduce

from functools import reduce
import pyspark.sql.functions as F

from pyspark.sql.types import StringType,StructType,StructField,FloatType
from pyspark.sql.types import ArrayType,IntegerType,DateType,DoubleType,BooleanType,DecimalType


sparkDF = reduce(
    lambda df, x: df.withColumn(x, F.col(x).cast(IntegerType())),
    cast_cols,
    sparkDF,
)

sparkDF.printSchema()

root
 |-- emp_no: integer (nullable = true)
 |-- dept: string (nullable = true)
 |-- pincode: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- bonus: integer (nullable = true)

Vaebhav
  • 4,672
  • 1
  • 13
  • 33
1

It's a good practice to use .select when possible instead of many .withColumn

df = df.select(
    [col(c).cast('int') if t == 'bigint' else c for c, t in df.dtypes]
)
ZygD
  • 22,092
  • 39
  • 79
  • 102
  • Any specific reason why `withColumn` should be avoided ? – Vaebhav Jun 23 '22 at 10:59
  • 3
    withColumn creates internal projection. Try googling "spark select vs withcolumn", the first results with several more references are [this](https://stackoverflow.com/questions/59789689/spark-dag-differs-with-withcolumn-vs-select) and [this](https://medium.com/@deepa.account/spark-dataframes-select-vs-withcolumn-31388cecbca9). Basically, a few of withColumn is fine, but too many may cause problems. – ZygD Jun 23 '22 at 11:02
1

Code you shared worked like below. I think the problem is using "col" as a variable name. When it is fixed, example works. Hope it helps.


    #Prepare data
    import pyspark.sql.types as T
    from pyspark.sql.functions import col
    
    df = [{'id': 1, 'data': "data1"},
          {'id': 2, 'data': "data2"}]
    schema = T.StructType([
        T.StructField('id', LongType()),
        T.StructField('data', StringType())])
    df = spark.createDataFrame(df, schema=schema)
    df.printSchema()
    
    #Get longtype column names
    schema = {mycol: col_type for mycol, col_type in df.dtypes}
    long_cols = [col_name for col_name, col_type in schema.items() if col_type=="bigint"]
    
    #Cast columns to int
    for column in long_cols:
        df = df.withColumn(column, col(column).cast("int"))
    df.printSchema()

Here is the output I have:

    root
     |-- id: long (nullable = true)
     |-- data: string (nullable = true)
    
    root
     |-- id: integer (nullable = true)
     |-- data: string (nullable = true)
ozlemg
  • 436
  • 2
  • 10