4

I have [~] as my delimiter for some csv files I am reading.

1[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]

I have tried this

val rddFile = sc.textFile("file.csv")
val rddTransformed = rddFile.map(eachLine=>eachLine.split("[~]"))
val df = rddTransformed.toDF()
display(df)

However this issue with this, is that it comes as a single value array with [ and ] in each field. So the array would be

["1[","]a[","]b[",...]

I can't use

val df = spark.read.option("sep", "[~]").csv("file.csv")

Because multi-character seperator is not supported. What other approach can I take?

1[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
2[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
3[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]

Edit - this is not a duplicate, the duplicated thread is about multi delimiters, this is multi-character single delimiter

test acc
  • 561
  • 2
  • 11
  • 24
  • 1
    the rdd map isn't working because `split` takes a regex. If you escape the `[` and `]` with backticks it will work: `.split("\\[~\\]")` – Tim Aug 29 '18 at 18:21
  • @Tim Thanks. Is there a way to translate the array to actual different columns? (Note: The number of columns is completely arbitrary for the file and unknown before hand) – test acc Aug 29 '18 at 18:26

1 Answers1

7
val df = spark.read.format("csv").load("inputpath")
df.rdd.map(i => i.mkString.split("\\[\\~\\]")).toDF().show(false)

try below

for your another requirement

val df1 = df.rdd.map(i => i.mkString.split("\\[\\~\\]").mkString(",")).toDF()
val iterationColumnLength = df1.rdd.first.mkString(",").split(",").length
df1.withColumn("value",split(col("value"),",")).select((0 until iterationColumnLength).map(i => col("value").getItem(i).as("col_" + i)): _*).show

enter image description here

Chandan Ray
  • 2,031
  • 1
  • 10
  • 15
  • The issue with this is the same to what I used `rddFile.map(eachLine=>eachLine.split("\\[\\~\\]"))`, (when correctly escaping the split characters) split returns an array and gets pushed into a single column of array type, instead of splitting it into separate columns. – test acc Aug 29 '18 at 18:35
  • updated the answer to convert one column to multiple – Chandan Ray Aug 29 '18 at 18:46
  • `df1.withColumn("value",explode(col("value"))).withColumn("index", lit(1)).groupBy("index").pivot("value").sum("index").filter(col("index") =!= 1).drop("index", "").show` This portion returned a blank dataframe. `df1` had a large amount of records, but the following transformation seemed to only produce an empty dataframe. – test acc Aug 29 '18 at 19:05
  • That’s correct. But you have no rows correct. Could you please send the exact test data. The code only convert your array of one column to multiple column in basis of number of values in array. Please send your file.csv with few sample records – Chandan Ray Aug 29 '18 at 19:16
  • I updated OP with test data that's not working. I have multiple rows. However, even if the test data was only 1 row, it should still work. – test acc Aug 29 '18 at 19:19
  • Updated as per your requirement. Please check – Chandan Ray Aug 29 '18 at 21:16
  • Hey, I have one more quick question, I do not think this would be possible, but I have a predefined `schema` object. Generally I would do `spark.read.option("sep", ",").option("badRecordsPath", badRecordLoc).schema(schema).csv(inputLoc)`. By passing a schema with bad record path, it would put rows with the incorrect number of columns in the bad record location. Would it be possible to add this to your implementation? I'm assuming not possible. – test acc Aug 30 '18 at 15:39
  • @testacc I would suggest to filter out those rows on basis of the condition. You may not use this option as above – Chandan Ray Aug 30 '18 at 17:47
  • I noted it did not entirely work. – thebluephantom Oct 01 '18 at 14:41