1
1 2013-07-25    11599,CLOSED
2 2013-07-25    256,PENDING_PAYMENT
3 2013-07-25    12111,COMPLETE
4 2013-07-25    8827,CLOSED
5 2013-07-25    11318,COMPLETE
6 2013-07-25    7130,COMPLETE
7 2013-07-25    4530,COMPLETE
8 2013-07-25    2911,PROCESSING
9 2013-07-25    5657,PENDING_PAYMENT
10 2013-07-25   5648,PENDING_PAYMENT
11 2013-07-25   918,PAYMENT_REVIEW
12 2013-07-25   1837,CLOSED

Above data is semi structured text file data.

2nd column is seperated by space. 3rd column is seperated by tab. 4th column is seperated by , .

How to define schema(datatype) for every column like for 1st column 'int',2nd column 'timestamp',3rd column 'int',4th column 'string'.

I have tried to seperate this record into each row by following code below

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pysp  ark.sql.functions import regexp_extract
from pyspark.sql.types import IntegerType, StructField, StructType, 
     StringType, TimestampType
 
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")

my_conf.set("spark.master","local[*]")

spark = SparkSession.builder.config(conf=my_conf).getOrCreate()

schema1 = StructType([StructField("order_id", IntegerType(),True),
StructField("date", TimestampType(),True),
StructField("customer_id", IntegerType(),True),
StructField("status", StringType(),True)])


myregex = r'^(\S+) (\S+)\t(\S+)\,(\S+)'

lines_df = spark.read.format("text")\
           .option("path","C:/Users/Lenovo/Desktop/week11/week 11 
            datasets/orders_new.csv").load()
 
 
final_df =lines_df.select(regexp_extract('value',myregex,1).alias("order_id"),
regexp_extract('value',myregex,2).alias("date"),
regexp_extract('value',myregex,3).alias("customer_id"),
regexp_extract('value',myregex,4).alias("status"))


final_df.show()`

ans========
+--------+----------+-----------+---------------+
|order_id|      date|customer_id|         status|
+--------+----------+-----------+---------------+
|       1|2013-07-25|      11599|         CLOSED|
|       2|2013-07-25|        256|PENDING_PAYMENT|
|       3|2013-07-25|      12111|       COMPLETE|
|       4|2013-07-25|       8827|         CLOSED|
|       5|2013-07-25|      11318|       COMPLETE|
|       6|2013-07-25|       7130|       COMPLETE|
|       7|2013-07-25|       4530|       COMPLETE|
|       8|2013-07-25|       2911|     PROCESSING|
|       9|2013-07-25|       5657|PENDING_PAYMENT|
|      10|2013-07-25|       5648|PENDING_PAYMENT|
|      11|2013-07-25|        918| PAYMENT_REVIEW|
|      12|2013-07-25|       1837|         CLOSED|
+--------+----------+-----------+---------------+

final_df.printSchema()

|-- order_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- status: string (nullable = true)

as in printschema i get string datatype only...



------but now to define schema for this
whenever i do

    df=spark.createDataFrame(final_df.rdd,schema1)

    final_df.show()    -------i get error here

so how to define schema
plzz tell..
  • Depending on your file size, another approach would be to replace the space and tab chars with `,` and then when schema is well defined, process the comma separated file with Spark. For example run a bash script https://unix.stackexchange.com/questions/11801/replace-all-white-spaces-with-commas-in-a-text-file before running Spark job. This would make your Spark code cleaner and save some resources, since running Spark tasks costs more than local CPU. Be aware though, this makes sense only when your files are reasonably small i.e <= 50MB and their process can be done fast enough from local CPU – abiratsis Jul 31 '23 at 09:58

1 Answers1

1

After regexp_extract you need to do cast.

Try with below syntax.

Example:

final_df =lines_df.select(regexp_extract('value',myregex,1).cast("int").alias("order_id"),
regexp_extract('value',myregex,2).cast("timestamp").alias("date"),
regexp_extract('value',myregex,3).cast("int").alias("customer_id"),
regexp_extract('value',myregex,4).alias("status"))

final_df.show()
df=spark.createDataFrame(final_df.rdd,schema1)
df.printSchema()
#root
# |-- order_id: integer (nullable = true)
# |-- date: timestamp (nullable = true)
# |-- customer_id: integer (nullable = true)
# |-- status: string (nullable = true)
notNull
  • 30,258
  • 4
  • 35
  • 50
  • Thanks Bro for helping, it woked. I was looking for this ..thanks – Vivek Mishra Jul 30 '23 at 04:41
  • Now suppose if I open the file by spark context means rdd..then how to convert it into dataframe and solve it.. myfile = spark.sparkContext.textFile("C:/Users/Lenovo/Desktop/week11/week 11 datasets/orders_new.csv") please mention as i am unable to use regular function on rdd. – Vivek Mishra Jul 30 '23 at 04:57
  • 1
    @VivekMishra please keep in mind that every time you should expect an answer only for the initial question not for every problem you face during the development process. Also don't forget to mark it as solved if you received an valid answer – abiratsis Jul 31 '23 at 08:31