3

I am trying to read some BigQuery data, (ID: my-project.mydatabase.mytable [original names protected]) from a user-managed Jupyter Notebook instance, inside Dataproc Workbench. What I am trying is inspired in this, and more specifically, the code is (please read some additional comments, on the code itself):

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, ArrayType, StringType
from google.cloud import bigquery

# UPDATE (2022-08-10): BQ conector added
spark = SparkSession.builder.appName('SpacyOverPySpark') \
                    .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.2') \
                    .getOrCreate()

# ------------------ IMPORTING DATA FROM BIG QUERY --------------------------

# UPDATE (2022-08-10): This line now runs...
df = spark.read.format('bigquery').option('table', 'my-project.mydatabase.mytable').load()

# But imports the whole table, which could become expensive and not optimal
print("DataFrame shape: ", (df.count(), len(df.columns)) # 109M records & 9 columns; just need 1M records and one column: "posting"

# I tried the following, BUT with NO success:
# sql = """
# SELECT `posting`
# FROM `mentor-pilot-project.indeed.indeed-data-clean`
# LIMIT 1000000
# """
# df = spark.read.format("bigquery").load(sql)
# print("DataFrame shape: ", (df.count(), len(df.columns)))

# ------- CONTINGENCY PLAN: IMPORTING DATA FROM CLOUD STORAGE ---------------

# This section WORKS (just to enable the following sections)
# HINT: This dataframe contains 1M rows of text, under a single column: "posting"
df = spark.read.csv("gs://hidden_bucket/1M_samples.csv", header=True)

# ---------------------- EXAMPLE CUSTOM PROCESSING --------------------------

# Example Python UDF Python
def split_text(text:str) -> list:
    return text.split()

# Turning Python UDF into Spark UDF
textsplitUDF = udf(lambda z: split_text(z), ArrayType(StringType()))

# "Applying" a UDF on a Spark Dataframe (THIS WORKS OK)
df.withColumn("posting_split", textsplitUDF(col("posting")))

# ------------------ EXPORTING DATA TO BIG QUERY ----------------------------

# UPDATE (2022-08-10) The code causing the error:

# df.write.format('bigquery') \
#   .option('table', 'wordcount_dataset.wordcount_output') \
#   .save()

# has been replace by a code that successfully stores data in BQ:

df.write \
  .format('bigquery') \
  .option("temporaryGcsBucket", "my_temp_bucket_name") \
  .mode("overwrite") \
  .save("my-project.mynewdatabase.mytable")

When reading data from BigQuery, using a SQL query, the error triggered is:

Py4JJavaError: An error occurred while calling o195.load.
: com.google.cloud.spark.bigquery.repackaged.com.google.inject.ProvisionException: Unable to provision, see the following errors:

1) Error in custom provider, java.lang.IllegalArgumentException: 'dataset' not parsed or provided.
  at com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule.provideSparkBigQueryConfig(SparkBigQueryConnectorModule.java:65)
  while locating com.google.cloud.spark.bigquery.SparkBigQueryConfig

1 error
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalProvisionException.toProvisionException(InternalProvisionException.java:226)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InjectorImpl$1.get(InjectorImpl.java:1097)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InjectorImpl.getInstance(InjectorImpl.java:1131)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelationInternal(BigQueryRelationProvider.scala:75)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:46)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:332)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:242)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:197)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: 'dataset' not parsed or provided.
    at com.google.cloud.bigquery.connector.common.BigQueryUtil.lambda$parseTableId$2(BigQueryUtil.java:153)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at com.google.cloud.bigquery.connector.common.BigQueryUtil.parseTableId(BigQueryUtil.java:153)
    at com.google.cloud.spark.bigquery.SparkBigQueryConfig.from(SparkBigQueryConfig.java:237)
    at com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule.provideSparkBigQueryConfig(SparkBigQueryConnectorModule.java:67)
    at com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule$$FastClassByGuice$$db983008.invoke(<generated>)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.ProviderMethod$FastClassProviderMethod.doProvision(ProviderMethod.java:264)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.ProviderMethod.doProvision(ProviderMethod.java:173)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalProviderInstanceBindingImpl$CyclicFactory.provision(InternalProviderInstanceBindingImpl.java:185)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalProviderInstanceBindingImpl$CyclicFactory.get(InternalProviderInstanceBindingImpl.java:162)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)
    at com.google.cloud.spark.bigquery.repackaged.com.google.inject.internal.InjectorImpl$1.get(InjectorImpl.java:1094)
    ... 18 more

When writing data to BigQuery, the error is:

Py4JJavaError: An error occurred while calling o167.save.
: java.lang.ClassNotFoundException: Failed to find data source: bigquery. Please find packages at http://spark.apache.org/third-party-projects.html

UPDATE: (2022-09-10) The error when writing data to BigQuery has been solved, please refer to the code above, as well as the comment section below.

What am I doing wrong?

Dagang
  • 24,586
  • 26
  • 88
  • 133
David Espinosa
  • 760
  • 7
  • 21
  • 2
    Did you add BQ connector when creating the cluster? – Dagang Aug 09 '22 at 18:02
  • 2
    Did you add the BQ connector jar in the dataproc? – Subash Aug 09 '22 at 20:42
  • Hello @Dagang. I have updated the enunciate, your suggestion worked, but opened a couple of derived questions. Thanks. – David Espinosa Aug 10 '22 at 21:35
  • Hello @Subash . I have updated the enunciate, your suggestion worked, but opened a couple of derived questions. Thanks. – David Espinosa Aug 10 '22 at 21:35
  • Seems there is a typo in `my-project:mydatabase.mytable`, I think it should be `my-project.mydatabase.mytable` – Dagang Aug 10 '22 at 21:56
  • Thanks @Dagang, typo has been corrected. – David Espinosa Aug 10 '22 at 22:02
  • 1
    Do you have table `wordcount_dataset.wordcount_output` in your project? – Dagang Aug 10 '22 at 22:05
  • I just did it @Dagang. I thought however that the writing line of code was gonna take care of creating both the dataset as the table, which seems not to be the case (SHould I always create them beforehand then?) Plus, I have updated the error triggered (still triggers one). Thanks – David Espinosa Aug 10 '22 at 22:13
  • 1
    Try add `.mode("overwrite")` when saving. https://stackoverflow.com/questions/27033823/how-to-overwrite-the-output-directory-in-spark – Dagang Aug 10 '22 at 22:30
  • Done @Dagang. Triggers error `'Either temporary or persistent GCS bucket must be set'`. BTW, in the link you provided, there were lots of answers, the one I used was `df.write.mode("overwrite").format('bigquery').option('table', 'my-project.test_output_data.test_output_table').save()` (amongst others). Would be nice to have a "better / correct" implementation suggested. Thanks – David Espinosa Aug 10 '22 at 22:53
  • 1
    It is documented here https://github.com/GoogleCloudDataproc/spark-bigquery-connector#writing-data-to-bigquery, you can do either direct or indirect write which requires you set a bucket. – Dagang Aug 10 '22 at 23:00
  • Also in this [doc](https://cloud.google.com/dataproc-serverless/docs/guides/bigquery-connector-spark-example?hl=en#submit_a_pyspark_wordcount_batch_workload), it has `spark.conf.set('temporaryGcsBucket', bucket)`. – Dagang Aug 10 '22 at 23:50
  • Thanks @Dagang. I have succeeded when writing data to BigQuery (I have updated to code). Any ideas about why I cannot run the SQL query? – David Espinosa Aug 11 '22 at 01:16
  • 1
    You need to add some Spark properties `spark.conf.set("viewsEnabled","true")` and `spark.conf.set("materializationDataset","")` https://github.com/GoogleCloudDataproc/spark-bigquery-connector#reading-data-from-a-bigquery-query – Dagang Aug 11 '22 at 01:24
  • I added an answer, could you accept it? Thanks! – Dagang Aug 11 '22 at 04:07
  • Hello @Dagang, I have included some modification requests in your answer. Let's improve it there, thks. – David Espinosa Aug 11 '22 at 19:36

1 Answers1

4

Key points found during the discussion:

  1. Add the BigQuery connector as a dependency through spark.jars=<gcs-uri> or spark.jars.packages=com.google.cloud.spark:spark-bigquery-with-dependencies_<scala-version>:<version>.

  2. Specify the correct table name in <project>.<dataset>.<table> format.

  3. The default mode for dataframe writer is errorifexists. When writing to a non-existent table, the dataset must exist, the table will be created automatically. When writing to an existing table, mode needs to be set as "append" or "overwrite" in df.write.mode(<mode>)...save().

  4. When writing to a BQ table, do either

    a) direct write (supported since 0.26.0)

    df.write \
      .format("bigquery") \
      .option("writeMethod", "direct") \
      .save("dataset.table")
    

    b) or indirect write

    df.write \
      .format("bigquery") \
      .option("temporaryGcsBucket","some-bucket") \
      .save("dataset.table")
    

    See this doc.

  5. When reading from BigQuery through a SQL query, add mandatory properties viewsEnabled=true and materializationDataset=<dataset>:

    spark.conf.set("viewsEnabled","true")
    spark.conf.set("materializationDataset","<dataset>")
    
    sql = """
      SELECT tag, COUNT(*) c
      FROM (
        SELECT SPLIT(tags, '|') tags
        FROM `bigquery-public-data.stackoverflow.posts_questions` a
        WHERE EXTRACT(YEAR FROM creation_date)>=2014
      ), UNNEST(tags) tag
      GROUP BY 1
      ORDER BY 2 DESC
      LIMIT 10
      """
    df = spark.read.format("bigquery").load(sql)
    df.show()
    

    See this doc.

Dagang
  • 24,586
  • 26
  • 88
  • 133
  • I'd appreciate some additions on your answer before accepting it. For "3", it's important to mention that whatever the destination table ID is, it must exist ==> beforehand <== in BQ, otherwise it won't work (at least that's my experience from this case). For "4a", any explanation as to why "direct write" never worked in the experiments? – David Espinosa Aug 11 '22 at 19:33
  • Sounds good. I updated #3, actually the table doesn't have to exist, the dataset does. What error did you see with direct write? – Dagang Aug 12 '22 at 16:40
  • 1
    Regarding direct write, I just learned that it is added since 0.26.0, the doc needs to be fixed. – Dagang Aug 12 '22 at 17:31
  • Hello @Dagang, thanks for the input. I will accept the answer now. – David Espinosa Aug 15 '22 at 20:57