1

I am reading data from Kafka in NiFi and want to use a python script to upload the data into GCS. We have been using PutGCS Object processor in NiFi till now, but want to avoid using the GCP Controller services as we have to disable and enable them again when the GCP service account key changes (we have automated this via python).

So we are thinking to use a python script to get the data in the NiFi flowfile and write it to GCS. The problem is we do not want to write the data to a local file and then push it to GCS. Is there a way to write data held in a python variable directly to a file GCS?

We are looking for something similar to what is available for node.js like below:

How to upload an in memory file data to google cloud storage using nodejs? and

How to upload the multer file buffer in memory to google cloud storage bucket?

djgcp
  • 163
  • 1
  • 14

1 Answers1

5

I would agree with the comment posted by John Hanley and I would use the upload_from_string() method. Notice that you'd necessarily need to transform the contents of your NiFi flowfile into a string variable and then use that method to upload the blob to Cloud Storage in a similar fashion to:

from google.cloud import storage

def upload_blob(bucket_name, destination_blob_name, data_from_flowfile_as_string):
    """Uploads a file to the bucket."""
    # bucket_name = "your-bucket-name"
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_string(data_from_flowfile_as_string)

I don't have any relevant knowledge about the intricacies of reading the NiFi flowfile or if there are any relevant challenges in saving it's contents as a string variable, I believe you could found this other post within the community as well as this other Github code to be useful in order to get the desired contents from the Flowfile.

On the GCP Cloud Storage side, notice that the use of the Cloud Storage Client Library will require you to setup the required authentication for your script to make any interactions with your bucket as explained on the relevant section of the documentation, which could be achieved using a service account key and the GOOGLE_APPLICATION_CREDENTIALS environment variable.

Daniel Ocando
  • 3,554
  • 2
  • 11
  • 19
  • Thanks Daniel. It worked although in addition to using `GOOGLE_APPLICATION_CREDENTIALS`, I also had to set the `HTTP_PROXY` variable. However, what remains is that I would also like to have the md5 hash returned for the upload. The return type of upload_from_string is string, but I could not get any info out of it. I also tried setting the checksum as md5, but got nothing: `>>> blob = Blob("data", bucket) >>> output=blob.upload_from_string("\n".join(data),"md5") >>> print(output) None >>>` – djgcp Oct 26 '20 at 13:11
  • I think you might have failed to add the relevant parameter field within the function. Have you tested by using `output=blob.upload_from_string("\n".join(data),checksum="md5")` ? – Daniel Ocando Oct 26 '20 at 13:16
  • If I provide `checksum="md5"` instead of just `"md5"`, it gives me a TypeError: `>>> output=blob.upload_from_string("\n".join(data),checksum="md5") Traceback (most recent call last): File "", line 1, in TypeError: upload_from_string() got an unexpected keyword argument 'checksum' >>>`. Even if I don't specify any argument, the output is `None`. – djgcp Oct 26 '20 at 14:17
  • I haven't been able to reproduce the `TypeError: upload_from_string() got an unexpected keyword argument 'checksum'` error message. Please make sure that you are creating a client, referencing a bucket, creating an object within that bucket and finally use the `output=blob.upload_from_string("\n".join(data),checksum="md5")`. Maybe if you share the specific code being used I could check anything that might be wrong! – Daniel Ocando Oct 26 '20 at 16:04
  • Hi Daniel, I think it does not give output of the md5 which is calculated. As a workaround I am retrieving the md5 by using the JSON API for the uploaded object. – djgcp Oct 26 '20 at 17:33