0

Background:

I have very large amount of data (1500GB) in Google cloud BigQuery.

I'm trying to build a ML model using those data as training dataset. So I wrote the following code in a Jupyter notebook to fetch the dataset.

import pandas as pd
from google.cloud import bigquery

import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = './my_credential.json'

client = bigquery.Client()

sql = """
    SELECT
Feature1,
Feature2,
Feature3,
target
FROM dataset
    """

sql_result = client.query(
    sql
)
sql_result.to_dataframe()

The problem:

The code throws Memory Error after 30 min of execution. I understand it is because the code tries to pull 1500GB data to my Jupyter notebook, but I don't know how to fix.

How do I train on this large amount of data using Jupyter notebook?

Eumaa
  • 971
  • 2
  • 15
  • 38
  • Instead of querying the whole dataset, have you considered sending multiple queries to retrieve only N samples, where N is your batch size during training?You can also do random queries https://www.javatpoint.com/sql-select-random but they must be non-overlapping – JacoSolari Aug 14 '19 at 09:40
  • @JacoSolari Thanks for the advice. It's a good idea. But how do I know I have covered all 1500GB dataset? And is there a risk of training on the same data (since the samples are randomized)? – Eumaa Aug 14 '19 at 10:09
  • 1
    yes, that is what I meant by non-overlapping. I am not an SQL expert but I guess that each of your samples has a unique ID. You could store the unique ID at each random query (say you fill up a list) and exclude those 'already-queried' IDs from the next query. – JacoSolari Aug 14 '19 at 10:15

3 Answers3

2

An out-of-core computation engine could be what you're looking for. Since you're using Python you should take a look at Dask and Apache Spark with PySpark.

Dask is a light-weight library implemented in Python that sits on top of NumPy and pandas to allow for parallel and/or out-of-core computation. It integrates Machine Learning through dask-ml and exposes APIs that are very similar to those of NumPy, pandas and Scikit-Learn's.

Spark is an all-inclusive framework implemented in Scala, running on the JVM and exposes an API for Python. It is more mature and widely used in the industry for Big Data processing. Spark also provides a Machine Learning library MLLib.

Both can run on your local machine or a dedicated multi-node cluster for faster computations.

1

How much memory did you plan to use for this file? 1500GB is some number and on top please remark, that this will not be the final size python is using as memory, here is starting point regarding to memory usage in python: Why do ints require three times as much memory in Python?

I would procede in the following steps:

  1. Do you really need the total data as training set? Or can you reduce it
  2. You need to check for incremental learning, a concept that learns step by step.

here are some futher explanations: https://datascience.stackexchange.com/questions/27767/opening-a-20gb-file-for-analysis-with-pandas

PV8
  • 5,799
  • 7
  • 43
  • 87
0

Thanks to JacoSolari. I found a fix by reading the 1500GB data by chunk.

See the following code to reference. The program reads 8000 rows every time, 0~7999 row first time, 8000~15999 rows the second time, ...

# read data by chunk
class DataChunkReader():
    def __init__(self):
        dataset = client.dataset('dataset_name', project='project_name')
        table_ref = dataset.table('table_name')
        self.table = client.get_table(table_ref)
        self.start_index = 0
        self.max_results = 8000 # read 8000 rows as a chunk every time

    def read_a_chunk(self):
        rows = client.list_rows(self.table, start_index = self.start_index, max_results = self.max_results).to_dataframe()
        self.start_index += self.max_results
        return rows

    def reset(self):
        self.start_index = 0

data_reader = DataChunkReader()

for i in range(2):
    df = data_reader.read_a_chunk()
    print(df)
Eumaa
  • 971
  • 2
  • 15
  • 38