1

I have a c++ binary that uses glog. I run that binary within beam python on cloud dataflow. I want to save c++ binary's stdout, stderr and any log file for later inspection. What's the best way to do that?

This guide gives an example for beam java. I tried to do something like that.

def sample(target, output_dir):
    import os
    import subprocess

    import tensorflow as tf

    log_path = target + ".log"
    with tf.io.gfile.GFile(log_path, mode="w") as log_file:
        subprocess.run(["/app/.../sample.runfiles/.../sample",
                        "--target", target,
                        "--logtostderr"],
                       stdout=log_file,
                       stderr=subprocess.STDOUT)

I got the following error.

...
  File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/home/swang/.cache/bazel/_bazel_swang/09eb83215bfa3a8425e4385b45dbf00d/execroot/__main__/bazel-out/k8-opt/bin/garage/sample_launch.runfiles/pip_parsed_deps_apache_beam/site-packages/apache_beam/transforms/core.py", line 1877, in <lambda>
    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
  File "/home/swang/.cache/bazel/_bazel_swang/09eb83215bfa3a8425e4385b45dbf00d/execroot/__main__/bazel-out/k8-opt/bin/garage/sample_launch.runfiles/__main__/garage/sample_launch.py", line 17, in sample
  File "/usr/local/lib/python3.8/subprocess.py", line 493, in run
    with Popen(*popenargs, **kwargs) as process:
  File "/usr/local/lib/python3.8/subprocess.py", line 808, in __init__
    errread, errwrite) = self._get_handles(stdin, stdout, stderr)
  File "/usr/local/lib/python3.8/subprocess.py", line 1489, in _get_handles
    c2pwrite = stdout.fileno()
AttributeError: 'GFile' object has no attribute 'fileno' [while running 'Map(functools.partial(<function sample at 0x7f45e8aa5a60>, output_dir='gs://swang/sample/20220815_test'))-ptransform-28']

google.cloud.storage API also does not seem to expose fileno().

import google.cloud.storage
google.cloud.storage.blob.Blob("test", google.cloud.storage.bucket.Bucket(google.cloud.storage.client.Client(), "swang"))
<Blob: swang, test, None>
blob = google.cloud.storage.blob.Blob("test", google.cloud.storage.bucket.Bucket(google.cloud.storage.client.Client(), "swang"))
reader = google.cloud.storage.fileio.BlobReader(blob)
reader.fileno()
Traceback (most recent call last):
  File "/usr/lib/python3.8/code.py", line 90, in runcode
    exec(code, self.locals)

I also considered writing the logs in c++ binary rather than passing them to python. As glog is implemented on top of c++ FILE rather than iostream, I have to reset stdout etc to gcs at FILE level like this rather than reset cout to gcs in iostream level like this. But gcs c++ API is only implemented on top of iostream, so this approach does not work. Using dup2 like this is another approach but seem too complicated and expensive.

bill
  • 650
  • 8
  • 17

1 Answers1

1

You can use the Filesystems module of Beam to open a writable channel (file handle where you have write permissions) in any of the filesystems supported by Beam. If you are running in Dataflow, this will automatically use the credentials of the Dataflow job to access Google Cloud Storage: https://beam.apache.org/releases/pydoc/current/apache_beam.io.filesystems.html?apache_beam.io.filesystems.FileSystems.create

If you are writing to GCS, you need to make sure that you don't overwrite an object, that would produce an error.

Israel Herraiz
  • 611
  • 3
  • 8
  • Is beam's filesystem similar to tf.gfile? It does not have any beam specific logic and is used by `beam.io.ReadFromTFRecord()` etc to work across local, gs:// etc? (As gs:// path does not represent actual files, `fileno()` likely will raise and error?) – bill Nov 01 '22 at 18:42