15

Have a GAE datastore kind with several 100'000s of objects in them. Want to do several involved queries (involving counting queries). Big Query seems a god fit for doing this.

Is there currently an easy way to query a live AppEngine Datastore using Big Query?

Allan D.
  • 163
  • 1
  • 8

6 Answers6

17

You can't run a BigQuery directly on DataStore entities, but you can write a Mapper Pipeline that reads entities out of DataStore, writes them to CSV in Google Cloud Storage, and then ingests those into BigQuery - you can even automate the process. Here's an example of using the Mapper API classes for just the DataStore to CSV step:

import re
import time
from datetime import datetime
import urllib
import httplib2
import pickle

from google.appengine.ext import blobstore
from google.appengine.ext import db
from google.appengine.ext import webapp

from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template

from mapreduce.lib import files
from google.appengine.api import taskqueue
from google.appengine.api import users

from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op

from apiclient.discovery import build
from google.appengine.api import memcache
from oauth2client.appengine import AppAssertionCredentials


#Number of shards to use in the Mapper pipeline
SHARDS = 20

# Name of the project's Google Cloud Storage Bucket
GS_BUCKET = 'your bucket'

# DataStore Model
class YourEntity(db.Expando):
  field1 = db.StringProperty() # etc, etc

ENTITY_KIND = 'main.YourEntity'


class MapReduceStart(webapp.RequestHandler):
  """Handler that provides link for user to start MapReduce pipeline.
  """
  def get(self):
    pipeline = IteratorPipeline(ENTITY_KIND)
    pipeline.start()
    path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
    logging.info('Redirecting to: %s' % path)
    self.redirect(path)


class IteratorPipeline(base_handler.PipelineBase):
  """ A pipeline that iterates through datastore
  """
  def run(self, entity_type):
    output = yield mapreduce_pipeline.MapperPipeline(
      "DataStore_to_Google_Storage_Pipeline",
      "main.datastore_map",
      "mapreduce.input_readers.DatastoreInputReader",
      output_writer_spec="mapreduce.output_writers.FileOutputWriter",
      params={
          "input_reader":{
              "entity_kind": entity_type,
              },
          "output_writer":{
              "filesystem": "gs",
              "gs_bucket_name": GS_BUCKET,
              "output_sharding":"none",
              }
          },
          shards=SHARDS)


def datastore_map(entity_type):
  props = GetPropsFor(entity_type)
  data = db.to_dict(entity_type)
  result = ','.join(['"%s"' % str(data.get(k)) for k in props])
  yield('%s\n' % result)


def GetPropsFor(entity_or_kind):
  if (isinstance(entity_or_kind, basestring)):
    kind = entity_or_kind
  else:
    kind = entity_or_kind.kind()
  cls = globals().get(kind)
  return cls.properties()


application = webapp.WSGIApplication(
                                     [('/start', MapReduceStart)],
                                     debug=True)

def main():
  run_wsgi_app(application)

if __name__ == "__main__":
  main()

If you append this to the end of your IteratorPipeline class: yield CloudStorageToBigQuery(output), you can pipe the resulting csv filehandle into a BigQuery ingestion pipe... like this:

class CloudStorageToBigQuery(base_handler.PipelineBase):
  """A Pipeline that kicks off a BigQuery ingestion job.
  """
  def run(self, output):

# BigQuery API Settings
SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'Some_ProjectXXXX'
DATASET_ID = 'Some_DATASET'

# Create a new API service for interacting with BigQuery
credentials = AppAssertionCredentials(scope=SCOPE)
http = credentials.authorize(httplib2.Http())
bigquery_service = build("bigquery", "v2", http=http)

jobs = bigquery_service.jobs()
table_name = 'datastore_dump_%s' % datetime.utcnow().strftime(
    '%m%d%Y_%H%M%S')
files = [str(f.replace('/gs/', 'gs://')) for f in output]
result = jobs.insert(projectId=PROJECT_ID,
                    body=build_job_data(table_name,files)).execute()
logging.info(result)

def build_job_data(table_name, files):
  return {"projectId": PROJECT_ID,
          "configuration":{
              "load": {
                  "sourceUris": files,
                  "schema":{
                      # put your schema here
                      "fields": fields
                      },
                  "destinationTable":{
                      "projectId": PROJECT_ID,
                      "datasetId": DATASET_ID,
                      "tableId": table_name,
                      },
                  }
              }
          }
Tim
  • 41,901
  • 18
  • 127
  • 145
Michael Manoochehri
  • 7,931
  • 6
  • 33
  • 47
7

With the new (from September 2013) streaming inserts api you can import records from your app into BigQuery.

The data is available in BigQuery immediately so this should satisfy your live requirement.

Whilst this question is now a bit old, this may be an easier solution for anyone stumbling across this question

At the moment though getting this to work from a the local dev server is patchy at best.

danmux
  • 2,735
  • 1
  • 25
  • 28
5

We're doing a Trusted Tester program for moving from Datastore to BigQuery in two simple operations:

  1. Backup the datastore using Datastore Admin's backup functionality
  2. Import backup directly into BigQuery

It automatically takes care of the schema for you.

More info (to apply): https://docs.google.com/a/google.com/spreadsheet/viewform?formkey=dHdpeXlmRlZCNWlYSE9BcE5jc2NYOUE6MQ

Ryan Boyd
  • 2,978
  • 1
  • 21
  • 19
3

For BigQuery you got to export those Kind into a CSV or delimited record structure , load into to BigQuery and you can query. There is no facility that i know of which allows querying the live GAE Datastore.

Biquery is Analytical query engine that means you can't change the record. No update or delete allowed, you can only append.

Sathish Senathi
  • 215
  • 2
  • 8
  • Now you do it by enabling the standard SQL. I mean Big query now supports DML. https://cloud.google.com/bigquery/docs/updating-data – Bala.Raj Feb 20 '19 at 08:31
2

No, BigQuery is a different product that needs the data to be uploaded to it. It cannot work over the datastore. You can use GQL to query the datastore.

Shay Erlichmen
  • 31,691
  • 7
  • 68
  • 87
1

As of 2016, This is very possible now! You must do the following:

  1. Make a new bucket in google storage
  2. Backup entities using using the database admin at console.developers.google.com I have a complete tutorial
  3. Head to bigquery Web UI, and import the files generated in step 1.

See this post for a complete example of this workflow!

Tim D
  • 195
  • 1
  • 10