You need to update the code snippet as below , As Long
is equivalent to bigint
under DataFrame.schema
, once you do that you ll be able to achieve the intended result
Data Preparation
input_list = [
(1,"HR",111,1000,45,100)
,(1,"Sales",112,500,30,50)
,(2,"Manufacture",127,600,50,500)
,(2,"Hr",821,700,26,60)
,(2,"Business",754,350,18,22)
]
sparkDF = sql.createDataFrame(input_list,['emp_no','dept','pincode','salary','age','bonus'])
sparkDF.printSchema()
root
|-- emp_no: long (nullable = true)
|-- dept: string (nullable = true)
|-- pincode: long (nullable = true)
|-- salary: long (nullable = true)
|-- age: long (nullable = true)
|-- bonus: long (nullable = true)
Identifying Casting Columns
schema = {col: col_type for col, col_type in sparkDF.dtypes}
##########
##### pprint(schema)
{'age': 'bigint',
'bonus': 'bigint',
'dept': 'string',
'emp_no': 'bigint',
'pincode': 'bigint',
'salary': 'bigint'}
##########
cast_cols = [ col for col, col_type in schema.items() if col_type in ["bigint"] ]
Reduce
from functools import reduce
import pyspark.sql.functions as F
from pyspark.sql.types import StringType,StructType,StructField,FloatType
from pyspark.sql.types import ArrayType,IntegerType,DateType,DoubleType,BooleanType,DecimalType
sparkDF = reduce(
lambda df, x: df.withColumn(x, F.col(x).cast(IntegerType())),
cast_cols,
sparkDF,
)
sparkDF.printSchema()
root
|-- emp_no: integer (nullable = true)
|-- dept: string (nullable = true)
|-- pincode: integer (nullable = true)
|-- salary: integer (nullable = true)
|-- age: integer (nullable = true)
|-- bonus: integer (nullable = true)