5

I am trying to implement a Reshuffle transform to prevent excessive fusion, but I don't know how to alter the version for <KV<String,String>> to deal with simple PCollections. (How to reshuffle PCollection <KV<String,String>> is described here.)

How would I expand the official Avro I/O example code to reshuffle before adding more steps in my pipeline?

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

Schema schema = new Schema.Parser().parse(new File("schema.avsc"));

PCollection<GenericRecord> records =
    p.apply(AvroIO.Read.named("ReadFromAvro")
        .from("gs://my_bucket/path/records-*.avro")
        .withSchema(schema));
Community
  • 1
  • 1
Tobi
  • 904
  • 1
  • 8
  • 29

1 Answers1

5

Thanks to the code snippet provided by the Google support team I figured it out:

To get a reshuffled PCollection:

PCollection<T> reshuffled = data.apply(Repartition.of());

The Repartition class used:

import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import java.util.concurrent.ThreadLocalRandom;

public class Repartition<T> extends PTransform<PCollection<T>, PCollection<T>> {

    private Repartition() {}

    public static <T> Repartition<T> of() {
        return new Repartition<T>();
    }

    @Override
    public PCollection<T> apply(PCollection<T> input) {
        return input
                .apply(ParDo.named("Add arbitrary keys").of(new AddArbitraryKey<T>()))
                .apply(GroupByKey.<Integer, T>create())
                .apply(ParDo.named("Remove arbitrary keys").of(new RemoveArbitraryKey<T>()));
    }

    private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
        }
    }

    private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            for (T s : c.element().getValue()) {
                c.output(s);
            }
        }
    }
}
Tobi
  • 904
  • 1
  • 8
  • 29
  • Can you elaborate on `AddArbitaryKey`? Why is it necessary and is the particular implementation of `AddArbitraryKey` important, i.e. will it affect the way that the key space is distributed across workers? – harveyxia Dec 03 '16 at 22:19
  • It should lead to redistribution in an arbitrary fashion as does the `Redistribution` transform (see: https://github.com/apache/incubator-beam/pull/1036). Randomly chosen integer keys should lead to a random distribution. – Tobi Dec 05 '16 at 09:21
  • Thanks, and what is your use case for `Redistribution`? – harveyxia Dec 05 '16 at 16:12
  • I was building a large pipeline via a loop and wanted to make sure that [no fusion](https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion) happens. – Tobi Dec 06 '16 at 16:25
  • java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey. @Tobi - did you run into this issue? – abhish_gl Oct 09 '20 at 19:49
  • @abhish_gl No, I did not run into this issue - but it looks like you are missing a Windowing function as you have an unlimited/streaming input (non-bounded PCollection). Check the Beam documentation on how to apply a Windowing function in your pipeline: https://beam.apache.org/documentation/programming-guide/#windowing – Tobi Oct 19 '20 at 09:47