2

Development from the previous thread found that the assumptions when asking the question were off-topic (subprocess was actually not causing the problems), so I'm making a more focused post.

My error message:

No handlers could be found for logger "google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager"

My intent:

Pass on Google PubSub message attributes as Python variables for re-use in later code.

My code:

import time
import logging

from google.cloud import pubsub_v1

project_id = "redacted"
subscription_name = "redacted"

def receive_messages_with_custom_attributes(project_id, subscription_name):
    """Receives messages from a pull subscription."""
    # [START pubsub_subscriber_sync_pull_custom_attributes]

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message.data))
        if message.attributes:
            #print('Attributes:')
            for key in message.attributes:
                value = message.attributes.get(key);
                #commented out to not print to terminal
                #which should not be necessary
                #print('{}: {}'.format(key, value))
        message.ack()

        print("this is before variables")
        dirpath = "~/subfolder1/"
        print(dirpath)
        namepath = message.data["name"]
        print(namepath)
        fullpath = dirpath + namepath
        print(fullpath)
        print("this is after variables")


    subscriber.subscribe(subscription_path, callback=callback)
    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)
    # [END pubsub_subscriber_sync_pull_custom_attributes]

receive_messages_with_custom_attributes(project_id, subscription_name)

My full console output from running the above code:

Listening for messages on projects/[redacted]
Received message: {
  "kind": "storage#object",
  "id": "[redacted]/0.testing/1548033442364022",
  "selfLink": "https://www.googleapis.com/storage/v1/b/[redacted]/o/BSD%2F0.testing",
  "name": "BSD/0.testing",
  "bucket": "[redacted]",
  "generation": "1548033442364022",
  "metageneration": "1",
  "contentType": "application/octet-stream",
  "timeCreated": "2019-01-21T01:17:22.363Z",
  "updated": "2019-01-21T01:17:22.363Z",
  "storageClass": "MULTI_REGIONAL",
  "timeStorageClassUpdated": "2019-01-21T01:17:22.363Z",
  "size": "0",
  "md5Hash": "1B2M2Y8AsgTpgAmY7PhCfg==",
  "mediaLink": "https://www.googleapis.com/download/storage/v1/b/[redacted]/o/BSD%2F0.testing?generation=1548033442364022&alt=media",
  "crc32c": "AAAAAA==",
  "etag": "CPb0uvvZ/d8CEAE="
}

this is before variables
/home/[redacted]
No handlers could be found for logger "google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager"

As you can see, the first string and string-defined-as-variable were printed, but the code breaks on attempting to define variables from the just-generated dictionary, and no further print()s were executed.

Potentially related thread, that user was publishing with cron jobs, and found a fix from crontab envpaths, but my situation is receiving and not using any cron jobs, but might hint at another layer behind/within python?

Can anyone please help me with adding a handler to make this code run as intended?

Larry Cai
  • 881
  • 1
  • 11
  • 24
  • So do you have `GOOGLE_APPLICATION_CREDENTIALS` set up when running this? – tripleee Jan 21 '19 at 04:35
  • It was running on a Google Cloud Engine, so I only went as far as to `gcloud auth login`. Is that enough, or do I need to download and `export` the .json key? – Larry Cai Jan 21 '19 at 23:50

2 Answers2

1

First, if I understand correctly by what you are showing in your output, you are using a Pub/Sub notification to send a message whenever you make changes to a Cloud Storage object. This information could be helpful.

Now, message.data["name"] is not going to work because message.data is a BYTES object. Thus, can't be indexed as a dict.

To treat it as a dict, you first have to decode it as base64 (import base64). After that, what you are left is a string which looks like JSON format. You then use json.load() (don't forget to import json) to transform this string into a dict. Now you can index the message.

The code for this will be:

print("This is before variables")
dirpath = "/subfolder1/"
print(dirpath)

#Transform the bytes object into a string by decoding it
namepath = base64.b64decode(message.data).decode('utf-8')

#Transform the json formated string into a dict
namepath = json.loads(namepath)

print(namepath["name"])
fullpath = dirpath + namepath["name"]
print(fullpath)
print("this is after variables")

Now, if your intent is to read the attributes only, they are properly defined at the top like:

    if message.attributes:
        print('Attributes:')
        for key in message.attributes:
            value = message.attributes.get(key)
            print('{}: {}'.format(key, value))

So, you could use:

    print("this is before variables")
    dirpath = "~/subfolder1/"
    print(dirpath)
    namepath = message.attributes["objectId"]
    print(namepath)
    fullpath = dirpath + namepath
    print(fullpath)
    print("this is after variables")

Keep in mind that for this particular case, "objectId" is the name of the file because it's the attribute that the notification from Pub/Sub for Cloud Storage uses. If you pretend to send custom messages, change "objectId" to your desired attribute name.

Nahuel Varela
  • 1,022
  • 7
  • 17
  • 1
    Using the `ascii` codec presupposes that the `bytes` object contains no non-ASCII values. In the general case, you have to know (or correctly guess) the encoding, though blindly assuming something like `cp-1252` is a quick and dirty workaround. (Too many bugs and questions arise from doing this completely blindly, though, i.e. not understanding the implications. Google "mojibake".) – tripleee Jan 21 '19 at 10:41
  • 1
    Yes! You are right. The bytes object is encoded in base64 [https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage]. I updated my answer. – Nahuel Varela Jan 21 '19 at 11:13
  • Thank you for the help! You're definitely on to something here, but it's not fully working yet. I added `namepath = base64.b64decode(message.data).decode('utf-8')` and `namepath = json.loads(namepath)` with a `print("string")` between every line, and the code still breaks at the `base64.b64decode`, giving the same error about handlers. Is my use of indentation correct? Or are they too "un-indented" and have lost the `message.data` variables? – Larry Cai Jan 22 '19 at 00:19
  • I have tried deeper indents, putting it in line with either the `if` or the `for` sections. Both instances, code still breaks at `base64.b64decode`, but the output console goes crazy since it doesn't `message.ack()` and keeps looping. – Larry Cai Jan 22 '19 at 01:13
  • I have made some progress by adapting the code sample for pubsub appengine website (it displays all messages in a topic on a html). For my case, the 1st step after `message.ack()` was to do the `json.loads`, i.e. `payload = json.loads(message.data.decode('utf-8'))`. This `print`s the ENTIRE message in a string in the console.The `handler` error is still present, now I have to figure out how to trim it down to the `objectId`... – Larry Cai Jan 22 '19 at 02:19
0

As Nahuel and tripleee explained, the problem is with the messages being BYTES instead of strings. However, their code didn't exactly work, and still threw out errors, and I have no idea why. By cross-referencing with Google's sample code for the pubsub appengine website, and a few more hours of trial and error, I have found the following code to be working. Might be inelegant and/or have bad practices, in that case please edit it and make it more robust.

#Continues from after message.ack(), above code remains unchanged
#except needing to <import json>

    #this makes a message.data a true python dict with strings.
    payload = json.loads(message.data.decode('utf-8')) 

    #this finds the value of the dict with key "name"
    namepath = payload["name"]

    #this is just a static string to pre-pend to the file path
    dirpath = "/home/[redacted]/"

    #combine them into a single functioning path
    fullpath = dirpath + namepath

    #currently type 'unicode', so convert them to type 'str'
    fullpath = fullpath.encode("utf-8")

And at the end we will have a fullpath that is purely type 'str' to be used by later functions/commands.

Larry Cai
  • 881
  • 1
  • 11
  • 24