I hope everyone's Friday is going well. I am desperately looking for some help with apache beam Go SDK (https://github.com/apache/beam/tree/master/sdks). I've written a pipeline using it to process PubSub events and got into a stage where my workers are starting nicely, but no messages are being consumed from pubsub. I've tried to run the example provided in the SDK (streaming_wordcap) that's using the same pubsubio and the result is the same. No messages in the newly created topics are being consumed. I wonder if there is an extra option that I should be enabling? Any deployment-specific flag? I am a little bit lost now.
There are messages in the subscription (a few million). When performed an experiment and changed the subscription name to something that doesn't exist I have seen errors in dataflow logs. Otherwise no errors, no info apart from generic dataflow debug.
2022-07-08T11:21:31.793474125ZStarting 3 workers in europe-west4-a...
Debug
2022-07-08T11:21:31.820662575ZStarting worker pool setup.
Debug
2022-07-08T11:22:00.789962383ZAutoscaling: Raised the number of workers to 3 so that the pipeline can catch up with its backlog and keep up with its input rate.
Debug
2022-07-08T11:22:50.806937837ZWorkers have started successfully.
Here is a part of my pipeline code:
var (
inputTopic = flag.String("topic", "", "PubSub input topic (required).")
inputSubscription = flag.String("inputSubscription", "", "PubSub input subscription (required).")
outputTableSpec = flag.String("outputTableSpec", "", "Output BQ table (required).")
)
func init() {
beam.RegisterType(reflect.TypeOf((*event.Envelope)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*decodeEnvelopeJSONFunc)(nil)).Elem())
[...]
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
if err := validateFlags(); err != nil {
log.Exit(ctx, err.Error())
}
project := gcpopts.GetProject(ctx)
p, s := beam.NewPipelineWithRoot()
pubSubMessages := pubsubio.Read(s, project, *inputTopic, &pubsubio.ReadOptions{
Subscription: *inputSubscription, WithAttributes: false, IDAttribute: "", TimestampAttribute: "",
})
eventMapper := DecodeAndMap(s, pubSubMessages)
bigqueryio.Write(s, project, *outputTableSpec, eventMapper)
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "failed to execute job: %v", err)
}
}
func DecodeAndMap(s beam.Scope, messages beam.PCollection) beam.PCollection {
s = s.Scope("DecodeAndMap")
events := beam.ParDo(s, &decodeEnvelopeJSONFunc{}, messages)
return beam.ParDo(s, &mapPayloadFunc{}, events)
}
type decodeEnvelopeJSONFunc struct{}
func (f *decodeEnvelopeJSONFunc) ProcessElement(ctx context.Context, msg []byte, emit func(*event.Envelope)) error {
var e event.Envelope
log.Infoln(ctx, "decoding envelope")
if err := json.NewDecoder(bytes.NewReader(msg)).Decode(&e); err != nil {
return fmt.Errorf("failed to decode envelope: %w", err)
}
log.Infoln(ctx, "emitting envelope")
emit(&e)
return nil
}
[...]
Here is how I am deploying my pipeline
go run ./pkg/my-mapper/. \
--runner dataflow \
--job_name my-mapper \
--project mb-gcp-project \
--region europe-west4 --zone europe-west4-a \
--temp_location gs://my-beam-tmp-data-bucket/tmp/ \
--staging_location gs://my-beam-tmp-data-bucket/binaries/ \
--worker_harness_container_image=apache/beam_go_sdk:latest \
--subnetwork regions/europe-west4/subnetworks/my-subnetwork \
--num_workers 3 \
--max_num_workers 10 \
--async --update \
--topic my-topic-name --inputSubscription my-sub-name --outputTableSpec my-gcp-project:my_dataset.mapped_data
2022/07/08 12:16:33 Cross-compiling ... as /tmp/worker-1-1657278993706049280
[...]
"type": "JOB_TYPE_STREAMING"
}
2022/07/08 12:20:11 Submitted job: 2022-07-08_04_20_11-11918574995509384496