2

We have a Cloud Dataflow Job that takes in a BigQuery table, transforms it and then writes each record out to a different table depending on the month/year in the timestamp for that record. So when we run our job over a table with 12 months of data there should be 12 output tables. The first month will be the main output and the other 11 months will be the side outputs.

We have found that a job will fail when we run it over 10 or more months(9 side outputs).

Is this a limit on Cloud Dataflow or is it a bug?

I noticed in the execution graph when it was running with more than 8 side outputs that some of the outputs said "running" but they didn't seem to be writing any records.

Here are some of our job ids:

2015-06-14_23_58_06-14457541029573485807 (8 side outputs - passed)

2015-06-14_23_48_43-15277609445992188388 (9 side outputs - failed)

2015-06-14_23_11_46-10500077558949649888 (7 side outputs - passed)

2015-06-14_22_38_48-1428211312699949403 (3 side outputs - passed)

2015-06-14_21_44_27-16273252623089185131 (11 side outputs - failed)

This is the code that processes the data. There is no caching involved. (TressOutputManager only holds a cache of TupleTag<TableRow>)

public class TressDenormalizationDoFn extends DoFn<TableRow, TableRow> {
    @Inject
    @Named("tress.mappers")
    private Set<CPTMapper> mappers;
    @Inject
    private TressOutputManager tuples;

    @Override
    public void processElement(ProcessContext c) throws Exception {
        TableRow row = c.element().clone();
        for (CPTMapper mapper : mappers) {
            String mapped = mapper.map((String) row.get("event"));
            if (mapped != null) {
                row.set(mapper.getId(), mapped);
            }
        }
        // places the record in the correct month based on the time stamp
        String timeStamp = (String) row.get("time_local");
        if(timeStamp != null){
            timeStamp = timeStamp.substring(0, 7).replaceAll("-", "_");

            if (tuples.isMainOutput(timeStamp)) {
                c.output(row);
            } else {
                c.sideOutput(tuples.getTuple(timeStamp), row);
            }
        }
    }
}
DarrenCibis
  • 865
  • 10
  • 25
  • 1
    Sorry for the trouble. We are investigating. – Jeremy Lewi Jun 15 '15 at 13:02
  • Darren, I looked at the failed job with 9 side outputs and I don't see anything obviously wrong with its configuration or with the amount of output it writes to BigQuery. It seems to differ from the succeeding one with 8 outputs only in the amount of data it reads. Could it be that you're caching a large amount of data in worker memory, e.g. in your DoFn's? Any chance you could contact dataflow-feedback@ and show us as much of the code of this pipeline as you're comfortable showing? – jkff Jun 16 '15 at 21:34
  • Eugene, Thanks for taking a look at this. I have added a code snippet to my question. We are not caching anything in memory so I don't think that this is the issue. – DarrenCibis Jun 17 '15 at 04:29
  • Indeed I see nothing wrong with the code (unless there's something weird going on in subsequent DoFn's). I'm afraid I don't know how to help you, because debugging an OOM is next to impossible without a heap dump, and this doesn't match any known issue. We are considering enabling heap dumps on OOM so that at least you would be able to download and analyze them using a memory profiler. Can the pipeline run locally? It will of course be too slow to process all the data, but at least you will be able to watch memory usage and attach a memory profiler (e.g. YourKit or MemoryAnalyzer). – jkff Jun 17 '15 at 18:25
  • Maybe it has to do with [`.clone`](http://stackoverflow.com/questions/1106102/clone-vs-copy-constructor-vs-factory-method)? What happens if you manually copy the TableRow (serialize/deserialize)? – absolutelyNoWarranty Jan 26 '16 at 02:56

0 Answers0