2

i am reading a csv file using inferschema option enabled in data frame using below command.

df2 = spark.read.options(Map("inferSchema"->"true","header"->"true")).csv("s3://Bucket-Name/Fun/Map/file.csv")
df2.printSchema()

Output:

root
 |-- CC|Fun|Head|Country|SendType: string (nullable = true)

Now I would like to store the above output only into a csv file having just these column names and datatype of these columns like below.

column_name,datatype
CC,string
Fun,string
Head,string
Country,string
SendType,string

I tried writing this into a csv using below option, but this is writing the file with entire data.

df2.coalesce(1).write.format("csv").mode("append").save("schema.csv")

regards mahi

Mahi
  • 343
  • 1
  • 6
  • 19

5 Answers5

1

df.schema.fields to get fields & its datatype.

Check below code.

scala> val schema = df.schema.fields.map(field => (field.name,field.dataType.typeName)).toList.toDF("column_name","datatype")
schema: org.apache.spark.sql.DataFrame = [column_name: string, datatype: string]

scala> schema.show(false)
+---------------+--------+
|column_name    |datatype|
+---------------+--------+
|applicationName|string  |
|id             |string  |
|requestId      |string  |
|version        |long    |
+---------------+--------+


scala> schema.write.format("csv").save("/tmp/schema")

Srinivas
  • 8,957
  • 2
  • 12
  • 26
  • Hi Srini, thanks for response, it throwing input in single line rather than column by column as you have listed above. .toList is not working i guess can you help on this. – Mahi May 07 '20 at 09:47
  • ```val df2 = spark.read.options(Map("inferSchema"->"true","header"->"true")).csv("file.csv") ``` ```df2.printSchema() ``` ```val df_schema = df2.schema.fields.map(field => (field.name,field.dataType.typeName)).toList.toDF("column_name","datatype")``` ```df_schema.show(false)``` – Mahi May 07 '20 at 09:56
  • spark version ? – Srinivas May 07 '20 at 10:00
  • Hi Srini Spark Version is 2.4.4. Scala Version is 2.11.12 – Mahi May 07 '20 at 10:08
  • @Srinivas it is an overhead to use Spark for storing a file with 5 rows. You should just use Scala I/O API to export the column list and save into a file – abiratsis May 07 '20 at 11:09
  • 1
    @AlexandrosBiratsis, I agree on that, I am not sure why mahi storing columns details in a file. – Srinivas May 07 '20 at 11:35
  • 1
    @AlexandrosBiratsis: Basically their is a ask to store the schema of files which we will be processing and then user want to have some comparison on that further – Mahi May 07 '20 at 11:48
  • Instead of csv you can directly store schema of dataframe using df.schema.json then store this file & it will have all the details. – Srinivas May 07 '20 at 11:58
  • @Mahi yes I got that point although you shouldn't be storing the schema with Spark writers, this would be an overhead. Spark writers are used to store big-data and not files of 5 rows – abiratsis May 07 '20 at 12:18
  • Hi Srini, agreed but end users ask is to store it in csv format to s3 loacation and each file schema which we read should be recorded in separate csv file, answer suggested by you is working fine – Mahi May 07 '20 at 12:36
  • @AlexandrosBiratsis: Agreed, can we use then use quicksilver's code to write file to s3 location, i have marked srini's answer to be selected one as i am facing issue while writing file to s3 location basically each time i want a new file to be created for each file rather than append same file as schema for each file read would be varying – Mahi May 07 '20 at 12:47
  • @Mahi yes it works but using Spark to save an information that you already have available in runtime require much more execution time. You are using a distributed system for a task that you shouldn't. `df.schema` contains all the the information that you need for the implementation of saving your schema. By generating a DF (that contains schema data) and later saving it you are adding a large amount of unnecessary execution time – abiratsis May 07 '20 at 12:51
  • @AlexandrosBiratsis: Agreed on above comments regarding execution time, but then just schema obtained will not be save i need to add more columns to dataframe before saving it along with this schema in program i am getting more values from other methods which i need to add to DF then store a combined output. – Mahi May 07 '20 at 13:02
  • 1
    you can use always `df.schema` to obtain the schema of the new or updated dataframe. This has nothing to do with the way you save this schema. Please go through the solutions carefully and you will realize better how they work. It would be good to measure execution time also, then you will understand that when you do `you_schema.toDF("column_name","datatype").write.save` you are triggering a new Spark job when you could do just obtain the schema via df.schema and then save it with a simple file writer – abiratsis May 07 '20 at 13:08
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/213336/discussion-between-mahi-and-alexandros-biratsis). – Mahi May 07 '20 at 13:32
0

Try something like below use coalesce(1) and .option("header","true") to output with header

import java.io.FileWriter

object SparkSchema {

  def main(args: Array[String]): Unit = {

    val fw = new FileWriter("src/main/resources/csv.schema", true)
    fw.write("column_name,datatype\n")

    val spark = Constant.getSparkSess

    import spark.implicits._

    val df = List(("", "", "", 1l)).toDF("applicationName", "id", "requestId", "version")
    val columnList : List[(String, String)] = df.schema.fields.map(field => (field.name, field.dataType.typeName))
      .toList
    try {
      val outString = columnList.map(col => {
        col._1 + "," + col._2
      }).mkString("\n")
      fw.write(outString)
    }
    finally fw.close()

    val newColumnList : List[(String, String)] = List(("newColumn","integer"))

    val finalColList = columnList ++ newColumnList
    writeToS3("s3://bucket/newFileName.csv",finalColList)

  }

  def writeToS3(s3FileNameWithpath : String,finalColList : List[(String,String)]) {

    val outString =  finalColList.map(col => {
      col._1 + "," + col._2
    }).mkString("\\n")

    import org.apache.hadoop.fs._
    import org.apache.hadoop.conf.Configuration
    val conf = new Configuration()
    conf.set("fs.s3a.access.key", "YOUR ACCESS KEY")
    conf.set("fs.s3a.secret.key", "YOUR SECRET KEY")

    val dest = new Path(s3FileNameWithpath)
    val fs = dest.getFileSystem(conf)
    val out = fs.create(dest, true)
    out.write( outString.getBytes )
    out.close()
  }

}


QuickSilver
  • 3,915
  • 2
  • 13
  • 29
  • Hi QuickSilver, here we are reading a file first using infer schema and then from that schema we need to extract the output. based on file we read the columns names for each file can be different – Mahi May 07 '20 at 10:32
  • it does not matter the output is independent of schema will change the ans ti read from file @Mahi – QuickSilver May 07 '20 at 10:53
  • Hi QuickSilver thanks for inputs, srini above has suggested similar steps but when i tried running this, as you can see in my above comments its not creating columns by column rows rather its just creating a single row with all column name in that and datatype as string one value – Mahi May 07 '20 at 10:57
  • @QuickSilver it is an overhead to use Spark for storing a file with 5 rows. You should just use Scala I/O API to export the column list and save into a file – abiratsis May 07 '20 at 10:58
  • 1
    @AlexandrosBiratsis Agreed – QuickSilver May 07 '20 at 11:14
  • @Mahi I think you need to append column name to a single file hence you can use above code for it – QuickSilver May 07 '20 at 11:15
  • @QuickSilver: can we write using above method to s3 location, basically i want to write a new file each time this method is called not append or overwrite existing file. also before writing finall output to file i need to add few more columns to Dataframe – Mahi May 07 '20 at 12:48
  • updated the ans with S3 file write code with additional columns – QuickSilver May 07 '20 at 13:35
0

An alternative to @QuickSilver's and @Srinivas' solutions, which they should both work, is to use the DDL representation of the schema. With df.schema.toDDL you get:

CC STRING, fun STRING, Head STRING, Country STRING, SendType STRING

which is the string representation of the schema then you can split and replace as shown next:

import java.io.PrintWriter

val schema = df.schema.toDDL.split(",")
// Array[String] = Array(`CC` STRING, `fun` STRING, `Head` STRING, `Country` STRING, `SendType` STRING)

val writer = new PrintWriter("/tmp/schema.csv")

writer.write("column_name,datatype\n")
schema.foreach{ r => writer.write(r.replace(" ", ",") + "\n") }
writer.close()

To write to S3 you can use Hadoop API as QuickSilver already implemented or a 3rd party library such as MINIO:

import io.minio.MinioClient

val minioClient = new MinioClient("https://play.min.io", "ACCESS_KEY", "SECRET_KEY")

minioClient.putObject("YOUR_BUCKET","schema.csv", "/tmp/schema.csv", null)

Or even better by generating a string, storing it into a buffer and then send it via InputStream to S3:

import java.io.ByteArrayInputStream
import io.minio.MinioClient

val minioClient = new MinioClient("https://play.min.io", "ACCESS_KEY", "SECRET_KEY")

val schema = df.schema.toDDL.split(",")
val schemaBuffer = new StringBuilder

schemaBuffer ++= "column_name,datatype\n"
schema.foreach{ r => schemaBuffer ++= r.replace(" ", ",") + "\n" }

val inputStream = new ByteArrayInputStream(schemaBuffer.toString.getBytes("UTF-8"))

minioClient.putObject("YOUR_BUCKET", "schema.csv", inputStream, new PutObjectOptions(inputStream.available(), -1))

inputStream.close
abiratsis
  • 7,051
  • 3
  • 28
  • 46
0

@PySpark

df_schema = spark.createDataFrame([(i.name, str(i.dataType)) for i in df.schema.fields], ['column_name', 'datatype'])
df_schema.show()

This will create new dataFrame for schema of existing dataframe
UseCase:

Useful when you want create table with Schema of the dataframe & you cannot use below code as pySpark user may not be authorized to execute DDL commands on database.

df.createOrReplaceTempView("tmp_output_table")
spark.sql("""drop table if exists schema.output_table""")   
spark.sql("""create table schema.output_table as select * from tmp_output_table""")
rp92643
  • 1
  • 1
0

In Pyspark - You can find all column names & data types (DataType) of PySpark DataFrame by using df.dtypes. Follow this link for more details pyspark.sql.DataFrame.dtypes

Having said that, try using below code -

data = df.dtypes
cols = ["col_name", "datatype"]

df = spark.createDataFrame(data=data,schema=cols)

df.show()
Dipanjan Mallick
  • 1,636
  • 2
  • 8
  • 20