3

I'm trying to execute a Pyspark statement that writes to BigTable within a Python for loop, which leads to the following error (job submitted using Dataproc). Any client not properly closed (as suggested here) and if yes, any way to do so in Pyspark ?

Note that manually re-executing the script each time with a new Dataproc job works fine, so the job itself is correct.

Thanks for your support !

Pyspark script


from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import json

sc = SparkContext()
sqlc = SQLContext(sc) 

def create_df(n_start,n_stop):

    # Data
        
    row_1 = ['a']+['{}'.format(i) for i in range(n_start,n_stop)]
    row_2 = ['b']+['{}'.format(i) for i in range(n_start,n_stop)]
      
    # Spark schema
    
    ls = [row_1,row_2]
    schema = ['col0'] + ['col{}'.format(i) for i in range(n_start,n_stop)]
    
    # Catalog

    first_col = {"col0":{"cf":"rowkey", "col":"key", "type":"string"}}
    other_cols =  {"col{}".format(i):{"cf":"cf", "col":"col{}".format(i), "type":"string"} for i in range(n_start,n_stop)}
    
    first_col.update(other_cols)
    columns = first_col
        
    d_catalogue = {}
    d_catalogue["table"] = {"namespace":"default", "name":"testtable"}
    d_catalogue["rowkey"] = "key"
    d_catalogue["columns"] = columns
        
    catalog = json.dumps(d_catalogue)
    
    # Dataframe

    df = sc.parallelize(ls, numSlices=1000).toDF(schema=schema) 
    
    return df,catalog

for i in range(0,2):

   N_step = 100
   N_start = 1
   N_stop = N_start+N_step

   data_source_format = "org.apache.spark.sql.execution.datasources.hbase"

   df,catalog = create_df(N_start,N_stop)
   
   df.write\
        .options(catalog=catalog,newTable= "5")\
            .format(data_source_format)\
                .save()

   N_start += N_step
   N_stop += N_step

Dataproc job

gcloud dataproc jobs submit pyspark <my_script>.py \
    --cluster $SPARK_CLUSTER \
        --jars <path_to_jar>/bigtable-dataproc-spark-shc-assembly-0.1.jar \
            --region=us-east1 

Error

...
ERROR com.google.bigtable.repackaged.io.grpc.internal.ManagedChannelOrphanWrapper: *~*~*~ Channel ManagedChannelImpl{logId=41, target=bigtable.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
...
py-r
  • 419
  • 5
  • 15
  • Did you try other languages? like the sample here https://github.com/GoogleCloudPlatform/cloud-bigtable-examples/blob/master/scala/bigtable-shc/README.md – Dagang Jan 07 '21 at 18:11
  • Could you provide the full PySpark code? I can help reproduce. – Dagang Jan 07 '21 at 18:15
  • @Dagang. Thanks for proposing to help. Yes I've run the entire tutorial in Scala, though not tested the loop: https://stackoverflow.com/questions/65483442/spark-hbase-gcp-template-3-3-missing-libraries – py-r Jan 07 '21 at 19:45
  • @Dagang: I've added full Pyspark code. Thanks ! – py-r Jan 07 '21 at 19:45
  • Are you using the latest version of gcloud? If not, try updating to the latest version. It looks similar to this issue that was fixed recently. https://github.com/googleapis/java-bigtable-hbase/issues/2504 https://cloud.google.com/sdk/gcloud/reference/components/update – Oliver Aragon Jan 11 '21 at 18:12
  • @Oliver Aragon: Well done - Thanks ! I've just updated gcloud: SDK 322.0.0 instead of 321.0.0. I still do the error message in the stack `Make sure to call shutdown()/shutdownNow()` but the job completes. Is it still a concern ? Glad if you have an idea. Feel free anyway to post the answer if you want the +1. – py-r Jan 11 '21 at 19:54
  • I'll post it as answer to help anyone that might have this issue find this faster. – Oliver Aragon Jan 11 '21 at 20:10

1 Answers1

1

If you are not using the latest version, try updating to it. It looks similar to this issue that was fixed recently. I would imagine the error message still showing up, but the job now finishing means that the support team is still working on it and hopefully they will fix it in the next release.

Oliver Aragon
  • 488
  • 3
  • 9
  • Why does it have to do with gcloud? The error seems to be from the job itself. – Dagang Jan 12 '21 at 03:31
  • He is using the command 'gcloud dataproc', but the error message is from bigtable, so it occurred to me that the dataproc command calls bigtable in the background and therefore would be responsible to call shutdown() – Oliver Aragon Jan 12 '21 at 14:37