1

I am trying to copy specific files from Bucket A to Bucket B. Bucket A is structured (directories), whereas Bucket B will have no directories. The challenge is that I need to name my files based on their original filename. Normally, I would create a custom filename policy and modify it as necessary. However, the only way I know to access the original filename is by passing through each element and pulling its metadata. How can I gain access to each element within TextIO.write?

I've considered creating a transform before TextIO.write that takes in a pcollection of elements and outputs a pcollection of KV where the key is the original filename and the value is the element (similar to this example). However, if I do that, how does my writer know how to write a KV?

I was able to get a hackey way of this working by using writedynamic and partitioning by each element's filename in a serializablefunction. Then I could pass through partitiontype to my filename policy and in turn, achieve my desired result. That being said, this seems far from efficient and wasn't designed for this since I don't actually need to partition anything.

Scicrazed
  • 542
  • 6
  • 20
  • Please take a look at the suggestion here, and see if this fits your use case. If not, would you please describe in a bit more detail? https://stackoverflow.com/questions/47022853/apache-beam-textio-glob-get-original-filename – Alex Amato Jul 23 '19 at 18:30
  • Hi Alex. So I linked that post in my question above (2nd paragraph), but to reiterate, I'm not sure how to use Text/FileIO.write to write a KV object. Let's say I implement that logic where the key is the original filename and the value is the element. How will my writer know how to write the KV correctly? Is there somewhere I can specify that the writer needs to only write values from the KV object? – Scicrazed Jul 23 '19 at 18:51
  • Apologies, I missed your link and misunderstood. – Alex Amato Jul 23 '19 at 21:01
  • 1
    @Scicrazed you can write only the values with `writeDynamic` using `.via(Contextful.fn(KV::getValue), TextIO.sink())`, note the `KV::getValue` expression. See code [here](https://stackoverflow.com/a/55890045/6121516), for example – Guillem Xercavins Jul 23 '19 at 21:57
  • That is exactly what I was looking for. I saw it in the docs a while ago and completely forgot about it. I never truly understood what Contextful.fn() does - do you know? – Scicrazed Jul 24 '19 at 20:51

2 Answers2

3

When using writeDynamic the by method specifies the criterium used to partition the incoming data to its corresponding destination. For example, if this is decided upon the key of the KV pair we can use .by(KV::getKey) and the destination file name can be tuned thanks to .withNaming.

In addition, with the via method we can provide a function to be applied to each partition as explained here. In this case we want to use the keys to select the destination but we do not want to write them in the output files. Therefore, we can write the value and omit the key with .via(Contextful.fn(KV::getValue), TextIO.sink()).

Whereas by accepts a SerializableFunction as parameter, the via method requires the use of Contextful<Contextful.Fn<UserT,OutputT>> outputFn. That's why I wrap KV::getValue in a Contextful.fn(). In some examples like this template it can be useful to provide context such as a required side input but here I just want to pass the function.

Code snippet (more details here)

p.apply("Create Data", Create.of(KV.of("one", "this is row 1"), KV.of("two", "this is row 2"), KV.of("three", "this is row 3"), KV.of("four", "this is row 4")))
 .apply(FileIO.<String, KV<String, String>>writeDynamic()
    .by(KV::getKey)
    .withDestinationCoder(StringUtf8Coder.of())
    .via(Contextful.fn(KV::getValue), TextIO.sink())
    .to(output)
    .withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));
Guillem Xercavins
  • 6,938
  • 1
  • 16
  • 35
  • That's exactly what I was looking for. However, even after looking at the docs, I don't understand the purpose of a Contextful function. Could you elaborate on that a little bit? – Scicrazed Jul 29 '19 at 14:59
1

Here are a few approaches you may wish to consider, depending on if you are trying to one off copy or create some sort of way of doing this system:

If you are just trying to copy files around. Then you may not need dataflow at all. You can use gsutil to copy the files.

If you just need to copy files without modification and still want to use dataflow you could use gsutil in dataflow yourself.

If you need to transform each file. You may want to make transforms which operate on a whole file, reading it in entirely and modifying it entirely, and writing it out in a custom ParDo. Example

Alternatively to using Dataflow. You can use google cloud functions to trigger whenever a GCS file is created.

Note: TextIO and FileIO are record based transforms, not file based transforms. They pull a file appart into records, to achieve parallelism. The original filenames and order of records are not really maintained. I see you have tried to maintain the filename with a KV, but as you mentioned FileIO does not allow you to pass in a filename with each record.

Alex Amato
  • 1,685
  • 10
  • 15
  • This was super helpful, although the answer I was looking for was Guillem's comment! I understand that TextIO and FileIo are record based transforms, in my case, the records were all placed into a file and metadata about them was stored in the filename. Pulling the filename would allow me to more easily classify them as opposed to parsing all the records once again. As far as I understand, each record has access to the filename of the file it was originally in. – Scicrazed Jul 24 '19 at 20:55