0

I'm writing a Dataflow pipeline that should do 3 things:

  • Reading .csv files from GCP Storage
  • Parsing the data to BigQuery campatible TableRows
  • Writing the data to a BigQuery table

Up until now this all worked like a charm. And it still does, but when I change the source and destination variables nothing changes. The job that actually runs is an old one, not the recently changed (and committed) code. Somehow when I run the code from Eclipse using the BlockingDataflowPipelineRunner the code itself is not uploaded but an older version is used.

Normally nothing wrong with the code but to be as complete as possible:

public class BatchPipeline {
    String source = "gs://sourcebucket/*.csv";
    String destination = "projectID:datasetID.testing1";    

    //Creation of the pipeline with default arguments
    Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

    PCollection<String> line = p.apply(TextIO.Read.named("ReadFromCloudStorage")
            .from(source));

    @SuppressWarnings("serial")
    PCollection<TableRow> tablerows = line.apply(ParDo.named("ParsingCSVLines").of(new DoFn<String, TableRow>(){
        @Override
        public void processElement(ProcessContext c){
             //processing code goes here
        }
    }));

    //Defining the BigQuery table scheme
    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("datetime").setType("TIMESTAMP").setMode("REQUIRED"));
    fields.add(new TableFieldSchema().setName("consumption").setType("FLOAT").setMode("REQUIRED"));
    fields.add(new TableFieldSchema().setName("meterID").setType("STRING").setMode("REQUIRED"));
    TableSchema schema = new TableSchema().setFields(fields);
    String table = destination;

    tablerows.apply(BigQueryIO.Write
            .named("BigQueryWrite")
            .to(table)
            .withSchema(schema)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withoutValidation());

    //Runs the pipeline
    p.run();
}

This problem arose because I've just changed laptops and had to reconfigure everything. I'm working on a clean Ubuntu 16.04 LTS OS with all the dependencies for GCP development installed (normally). Normally everything is configured quite well since I'm able to start a job (which shouldn't be possible if my config is erred, right?). I'm using Eclipse Neon btw.

So where could the problem lie? It seems to me that there is a problem uploading the code, but I've made sure that my cloud git repo is up-to-date and the staging bucket has been cleaned up ...

**** UPDATE ****

I never found what was exactly going wrong but when I checked out the creation dates of the files in my deployed jar, I indeed saw that they were never really updated. The jar file itself had however a recent timestamp which made me overlook that problem completely (rookie mistake).

I eventually got it all working again by simply creating a new Dataflow project in Eclipse and copying my .java files from the broken project into the new one. Everything worked like a charm from then on.

Matteus
  • 97
  • 2
  • 11
  • Have you verified that the staging bucket is empty prior to running, and populated with a new (timestamped) jar when you run? – Sam McVeety Jan 05 '17 at 17:05
  • Yes, I've already tried with a new and empty bucket. After the job has finished it's populated with new jar files but still my new code is not executed ... Somehow Dataflow is not receiving my actual newly written code which I just can't understand. – Matteus Jan 09 '17 at 17:10

1 Answers1

1

Once you submit a Dataflow job, you can check which artifacts were part of the job specification by inspecting the files that are part of the job description which is available via DataflowPipelineWorkerPoolOptions#getFilesToStage. The code snippet below gives a little sample of how to get this information.

PipelineOptions myOptions = ...
myOptions.setRunner(DataflowPipelineRunner.class);
Pipeline p = Pipeline.create(myOptions);

// Build up your pipeline and run it.
p.apply(...)
p.run();

// At this point in time, the files which were staged by the 
// DataflowPipelineRunner will have been populated into the
// DataflowPipelineWorkerPoolOptions#getFilesToStage
List<String> stagedFiles = myOptions.as(DataflowPipelineWorkerPoolOptions.class).getFilesToStage();
for (String stagedFile : stagedFiles) {
  System.out.println(stagedFile);
}

The above code should print out something like:

/my/path/to/file/dataflow.jar
/another/path/to/file/myapplication.jar
/a/path/to/file/alibrary.jar

It is likely that the resources part of the job that your uploading are out of date in some way containing your old code. Look through all the directories and jar parts of the staging list and find all instances of BatchPipeline and verify their age. jar files can be extracted using the jar tool or any zip file reader. Alternatively use javap or any other class file inspector to validate that the BatchPipeline class file lines up with the expected changes you have made.

Community
  • 1
  • 1
Lukasz Cwik
  • 1,641
  • 12
  • 14
  • I'm fairly new to Dataflow so I is there like a guide for this? I don't really know how to start debugging this problem ... If you have some recommendations, shoot coz I could use some decent ways/tips to be able to debug these kind of problems on my own in the future. – Matteus Jan 09 '17 at 17:14
  • Are you trying to say that my answer did solve your problem and that you would like general guidance about how to debug future issues or are you trying to say that my answer didn't have enough detail for you to be able to try it? – Lukasz Cwik Jan 09 '17 at 21:27
  • I could indeed use some guidance on how to debug my code since I'm still rather new to this. And your solution might do the trick for this problem but to be honest I don't really know how to begin implementing it ... So if it's not too much trouble for you, I would really appreciate a more detailed version of your answer. Thanks in advance! – Matteus Jan 15 '17 at 08:32
  • Thanks for giving a more in detail answer! Got everything working again (see update above) although I didn't find the exact problem. – Matteus Jan 27 '17 at 14:06