0

I want to download certain files from to the temp location before the pipeline starts.The files .mmdb files which are to be read in the ParDo fucntion.The files are stored on Google Storage but the method consuming the .mmdb files requires them to be a File(java.io) object.

If I include it in --filesToStage ,they are available as InputStream inside the zip . I want to access them as files not InputStream. What is the best way to achieve this?

I am currently downloading the files in a temporary folder on the worker inside the Setup of the ParDo .

user3777228
  • 159
  • 1
  • 14

2 Answers2

2

This is a very broad and high level question. The answer depends on your logic that consumes the files. File represents a file on a filesystem so if you have a component that requires the input to be an instance of File then it is a correct thing to write it to a temp folder locally. Beam doesn't provide a better abstraction for this case.

However I would recommend to look into updating the logic that currently handles Files to accept other kinds of input as well. You likely hit the issue caused by the lack of separation of concerns and tight coupling. That is you have a component that takes in a File, opens it, deals with errors while opening it, reads it, parses data from it, maybe even validates and processes the data. All of these are separate concerns and probably should be handled by separate components that you can combine and replace together when needed, for example:

  • a class that knows how to deal with a filesystem and turn a path into a byte stream;
  • similar class that knows how to deal with getting a file over http (e.g. GCS use case) and turn it into a byte stream;
  • a component that knows how to parse the byte stream into data;
  • a component that processes the parsed data;
  • other things can probably live anywhere;

This way you can easily implement any other sources for your component, compose and test them independently.

For example, you could implement your logic as 2 joined PCollections, one of which would read from the GCS location directly, parse the text lines, and and process it in the actual business logic before joining it with the other PCollection.

Anton
  • 2,431
  • 10
  • 20
  • This File has to be read ideally once for each worker. The input to the pipeline is pub/sub. – user3777228 Jan 04 '19 at 12:59
  • I'm a little confused by your question, but have you seen [this](https://stackoverflow.com/questions/53404579/dataflow-apache-beam-how-to-access-current-filename-when-passing-in-pattern) – WIT Jan 12 '19 at 23:01
0

I think I understand what you are/were trying to do and I was looking to do the same.

This worked for me (in the setup() method of the DoFn):

 if(not FileSystems.exists(local_db_location) ):
        with FileSystems.open(  self._cloud_database_loc ) as af:
            with FileSystems.create(local_db_location) as local_file:
                try:
                    shutil.copyfileobj(af,local_file,length=131072)
                except:
                    raise
    else:
        #DB exists
Rob Knights
  • 237
  • 4
  • 6