-1

I have a CSV file which consists of new carriage returns (\n) in each row. While reading the CSV file from cloud storage using TextIO.read function of Apache beam it is considering \n as new record. how can i overcome this issue.

I have tried with by extending filebasedsource but it is reading only first line of the CSV file when we apply pTransorms.

help will be appreciated

Thanks in Advance

Damodar
  • 53
  • 12
  • Show what you have tried so far. – klutt Dec 06 '17 at 06:32
  • I'm having trouble understanding what you mean by new carriage returns. Can you give a sample of your file? – Pablo Dec 06 '17 at 07:02
  • Hi pablo, The one of the cell in my CSV contains data like this "Identified 57 confirmed privileged accounts of these 32 are non-compliant to password policy This number is indicate" so it is taking "This number is indicate" as next row. Due to parallel processing records after split are not adjacent too – Damodar Dec 06 '17 at 07:33
  • HI Klutt, i tried with this TextIO.read().from("gs://***********") and even tried with MySource source = new MySource("gs://*************", 2048,0,6); PCollection out=p.apply("ReadFileData", Read.from(source)).apply(ParDo.of(new RiskPercentageRowSplitter())); but both approaches are not helping in case of new line inside a cell – Damodar Dec 06 '17 at 07:39

1 Answers1

2

TextIO can not do this - it always splits input based on carriage returns and is not aware of CSV-specific quoting of some of these carriage returns.

However, Beam 2.2 includes a transform that will make it very easy for you to write the CSV-specific (or any other file format specific reading) code yourself: FileIO. Do something like this:

p.apply(FileIO.match().filepattern("gs://..."))
 .apply(FileIO.readMatches())
 .apply(ParDo.of(new DoFn<ReadableFile, TableRow>() {
   @ProcessElement
   public void process(ProcessContext c) throws IOException {
     try (InputStream is = Channels.newInputStream(c.element().open())) {
       // ... Use your favorite Java CSV library ...
       ... c.output(next csv record) ...
     }
   }
 }))
jkff
  • 17,623
  • 5
  • 53
  • 85
  • Hi Jkff, Thanks for your reply. If we migrate from 2.1.0 to 2.2.0 how to set DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); and how to set project id and which runner i need to set if i need to run locally – Damodar Dec 06 '17 at 09:01
  • Same way as for 2.1.0, it's a backwards compatible release and all your existing code should keep working. – jkff Dec 06 '17 at 09:16
  • Hi jkff, Thanks. it is not able to read : in filepattern Illegal char <:> at index 2: gs://bucketname/objectname – Damodar Dec 06 '17 at 09:21
  • DataflowPipelineOptions is not able to read in 2.2 should i have both dependencies 2.1 and 2.2 in my pom.xml – Damodar Dec 06 '17 at 09:23
  • hi jkff, i have added following dependency org.apache.beam beam-sdks-java-core 2.2.0 org.apache.beam beam-runners-direct-java 2.2.0 runtime but after adding it is not able to detect DataFlowpipelineOptions and getting Illegal char <:> at index 2: gs://bucketname/objectname. please advice me on this – Damodar Dec 06 '17 at 09:50
  • Could you file this as a separate question, and include your pom.xml and the relevant part of the source code and the complete error output? – jkff Dec 06 '17 at 10:33
  • Hi jkff,https://stackoverflow.com/q/47673002/7917789?sem=2 added new questiion for that – Damodar Dec 06 '17 at 11:19