2

I have a simple Google DataFlow task. It reads from a BigQuery table and writes into another, just like this:

(p
 |  beam.io.Read( beam.io.BigQuerySource(
        query='select dia, import from DS1.t_27k where true', 
        use_standard_sql=True))
 |  beam.io.Write(beam.io.BigQuerySink(
                  output_table,
                  dataset='DS1', 
                  project=project, 
                  schema='dia:DATE, import:FLOAT',
                  create_disposition=CREATE_IF_NEEDED,
                      write_disposition=WRITE_TRUNCATE
                     )
                )

I guess issue is that it seems this pipeline needs a temporary dataset to make the work. And I'm not able to force location for this temp dataset. Because my DS1 is in EU (#EUROPE-WEST1) and temporary dataset is on US (I guess), the task fails:

WARNING:root:Dataset m-h-0000:temp_dataset_e433a0ef19e64100000000000001a does not exist so we will create it as temporary with location=None
WARNING:root:A task failed with exception.
 HttpError accessing <https://www.googleapis.com/bigquery/v2/projects/m-h-000000/queries/b8b2f00000000000000002bed336369d?alt=json&maxResults=10000>: response: <{'status': '400', 'content-length': '292', 'x-xss-protection': '1; mode=block', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'expires': 'Sat, 14 Oct 2017 20:29:15 GMT', 'vary': 'Origin, X-Origin', 'server': 'GSE', '-content-encoding': 'gzip', 'cache-control': 'private, max-age=0', 'date': 'Sat, 14 Oct 2017 20:29:15 GMT', 'x-frame-options': 'SAMEORIGIN', 'alt-svc': 'quic=":443"; ma=2592000; v="39,38,37,35"', 'content-type': 'application/json; charset=UTF-8'}>, content <{
 "error": {
  "errors": [
   {
    "domain": "global",
    "reason": "invalid",
    "message": "Cannot read and write in different locations: source: EU, destination: US"
   }
  ],
  "code": 400,
  "message": "Cannot read and write in different locations: source: EU, destination: US"
 }
}

Pipeline options:

options = PipelineOptions()

google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'm-h'
google_cloud_options.job_name = 'myjob3'
google_cloud_options.staging_location = r'gs://p_df/staging'  #EUROPE-WEST1
google_cloud_options.region=r'europe-west1'
google_cloud_options.temp_location = r'gs://p_df/temp' #EUROPE-WEST1
options.view_as(StandardOptions).runner =   'DirectRunner'  #'DataflowRunner'

p = beam.Pipeline(options=options)

How can I do to avoid this error?

Notice error only appears when I run it as DirectRunner.

dani herrera
  • 48,760
  • 8
  • 117
  • 177

2 Answers2

5

The error Cannot read and write in different locations is pretty self explanatory and it might happen because of:

  • BigQuery dataset is in EU and you're running DataFlow in US
  • Your GCS buckets are in EU and you're running DataFlow in US

As you specified in the question, you have created temporary locations in GCS in EU and your BigQuery dataset is also located in the EU, so you must run DataFlow job in EU too.

In order to achieve this, you need to specify zone parameter in PipelineOptions, like this:

options = PipelineOptions()

wo = options.view_as(WorkerOptions)  # type: WorkerOptions
wo.zone = "europe-west1-b"


# rest of your options:
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'm-h'
google_cloud_options.job_name = 'myjob3'
google_cloud_options.staging_location = r'gs://p_df/staging'  # EUROPE-WEST1
google_cloud_options.region = r'europe-west1'
google_cloud_options.temp_location = r'gs://p_df/temp'  # EUROPE-WEST1
options.view_as(StandardOptions).runner = 'DataFlowRunner'

p = beam.Pipeline(options=options)
Marcin Zablocki
  • 10,171
  • 1
  • 37
  • 47
  • Hi, thanks a lot about your post. By default task are running on EU and when I execute it as DataflowRunner I have no problem with them. Issue appears just when I execute on DirectRun. Sorry about don't emphasize it on my question. – dani herrera Oct 16 '17 at 07:54
  • This is weird. Are you located in EU? – Marcin Zablocki Oct 16 '17 at 07:57
  • yep. But take a look to the msg: `we will create it as temporary with location=None` I miss a kind of parameter `temporary location=EU` – dani herrera Oct 16 '17 at 07:58
  • Could you try to explicitly specify `dataset`, `project` and `table` in `BigQuerySource` ? – Marcin Zablocki Oct 16 '17 at 08:13
  • 1
    Hi Marcin. You can seel all code on Question. Is there a way specify dataset for temporary operations? Like `temp_location` but for bigquery? – dani herrera Oct 16 '17 at 08:18
  • My previous comment was a suggestion for you to try to specify additional parameters in `BigQuerySource`... When you look at the code inside BigQuerySink or BigQuerySource, the location is determined automatically. – Marcin Zablocki Oct 16 '17 at 08:36
3

The BigQuerySource transform used in the Python DirectRunner doesn't automatically determine the locations for temp tables. See BEAM-1909 for the issue.

When using the DataflowRunner this should work.

Ben Chambers
  • 6,070
  • 11
  • 16