0

The scenario is: EventHub -> Azure Databricks (using pyspark)

File format: CSV (Quoted, Pipe delimited and custom schema )

I am trying to read CSV strings comming from eventhub. Spark is successfully creating the dataframe with the proper schema, but the dataframe end up empty after every message.

I managed to do some tests outside streaming environment, and when getting the data from a file, all goes well, but it fails when the data comes from a string.

So I found some links to help me on this, but none worked:

can-i-read-a-csv-represented-as-a-string-into-apache-spark-using-spark-csv?rq=1

Pyspark - converting json string to DataFrame

Right now I have the code below:

schema = StructType([StructField("Decisao",StringType(),True), StructField("PedidoID",StringType(),True), StructField("De_LastUpdated",StringType(),True)])
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])

df = spark.read \
.option("header", "true") \
.option("mode","FAILFAST") \
.option("delimiter","|") \
.schema(schema) \
.csv(csvData)

df.show()

Is that even possible to do with CSV files?

Flavio Pegas
  • 388
  • 1
  • 9
  • 26

1 Answers1

0

You can construct schema like this via Row and split on | delimiter

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
body = 'DECISAO|PEDIDOID|DE_LASTUPDATED\r\n"asdasdas"|"1015905177"|"sdfgsfgd"'
csvData = sc.parallelize([body])
schemaDF = csvData\
.map(lambda x: x.split("|"))\
.map(lambda x: Row(x[0],\
                   x[1],\
                   x[2],\
                   x[3],\
                   x[4]))\
.toDF(["Decisao", "PedidoID", "De_LastUpdated", "col4", "col5"])

for i in schemaDF.take(1): print(i)
Row(Decisao='DECISAO', PedidoID='PEDIDOID', De_LastUpdated='DE_LASTUPDATED\r\n"asdasdas"', col4='"1015905177"', col5='"sdfgsfgd"')

schemaDF.printSchema()
root
 |-- Decisao: string (nullable = true)
 |-- PedidoID: string (nullable = true)
 |-- De_LastUpdated: string (nullable = true)
 |-- col4: string (nullable = true)
 |-- col5: string (nullable = true)

thePurplePython
  • 2,621
  • 1
  • 13
  • 34
  • Well, that is not what I did, but it helped finding what was wrong. Basically I changed [body] to body.split('\n') and all went fine. thks! – Flavio Pegas Jun 25 '19 at 14:28