0

I have filename and filestream saved in the SQL Server database. I want to upload this filestream as a file on AWS S3 Bucket using Apache NIFI.

Currently I am following below processors in the same sequence:

  1. ExcecuteSQL (Here I wrote SQL query: select filename, filestream from table)
  2. ConvertAvroToJson (because ExecuteSQL returns Avro format data)
  3. EvaluateJsonPath (To read filestream column)
  4. Base64EncodeContent
  5. PutS3Object

enter image description here

Now problem is, This approach doesn't convert file stream to file on S3 Bucket. It just uploads a file on s3 bucket having filestream column data. It should work like if filestream is of "png" image type then it should upload png image to s3 bucket. and If filestream is of "xlsx" type then it should upload xlsx file on s3 bucket.

This is the sample database: enter image description here

code_hr
  • 11
  • 2
  • the filename must be `*.png` for png and `*.xls` for excel - then s3 client should determine "content type" automatically. – daggett May 02 '23 at 18:55
  • @dagget when I read file stream from database then it comes in an avro-binary form. After converting it to json it becomes like this{"filestream": "PK\u0003\u0004\u0014\u0000\u0006\u0000\b\u0000\u0000\u0000!\u0000t6Z¦z\u0001\u0000\u0000\u0084\u0005\u0000\u0000\u0013\u0000\b\u0002[Content_Types].xml ¢\u0004\u0002( \u0000\u0002\u0000\}. Now same thing is shown in s3 bucket. I need to convert this binary data to a file automatically in s3. – code_hr May 03 '23 at 04:54
  • how many records you are reading from db? – daggett May 03 '23 at 13:15
  • @daggett I have 10-20 records in db for testing purpose. I have tried this flow with only one record also. I just added screenshot of my database in the main question. – code_hr May 03 '23 at 13:33
  • i don't see in your flow splitting of records to separate files... i would suggest you to use script to read binary data - is it ok for you? – daggett May 03 '23 at 13:35
  • @daggett I will try it by using script, Thanks – code_hr May 03 '23 at 13:38

2 Answers2

0

maybe there is a nifi native way to insert read blob column however you could use ExecuteGroovyScript processor instead.

add SQL.mydb parameter on the level of processor and link it to required DBCP pool.

use following script body (have no chance to test):

def ff=session.get()
if(!ff)return

//just assumption - i don't know your table structure...
def query = '''
  select from myTable (file_name,bin_content) where update_time > :p_timestamp
'''
def params = [
  p_timestamp: ff.last_timestamp
]

def outFiles = [] //list of files for output

//SQL.mydb is a reference to groovy.sql.Sql instance
SQL.mydb.eachRow(query, params){ row->
  def binFile = ff.clone(false) //clone incoming file but without content
  binFile.filename = row.file_name
  binFile.write{ stream-> stream << row.bin_content.getBinaryStream() }
  outFiles << binFile
}

REL_SUCCESS << outFiles   //transfer list of new files to success
session.remove(ff)  //drop incoming file

the script above will execute sql select and for each received record will produce a new flow file with name and content received from db.

details about ExecuteGroovyScript processor features:

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-groovyx-nar/1.20.0/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html

daggett
  • 26,404
  • 3
  • 40
  • 56
  • Thanks, I tried it ExecuteScript -> PutS3Object Processor but it didn't work. ExecuteScript returned nothing to PutS3Object Processor. – code_hr May 03 '23 at 15:44
  • You should be using ExecuteGroovyScript. – daggett May 03 '23 at 15:47
  • I tried by using ExecuteGroovyScript also (ExecuteGroovyScript -> PutS3Object) but it also didn't work. ExecuteGroovyScript is processing the script but its not passing anything to PutS3Object – code_hr May 03 '23 at 15:53
  • means sql query returns no data. try to put `println("--- FILE COUNT: "+outFiles.size())` to see file count. – daggett May 03 '23 at 18:44
  • Finally it worked, Thanks. I have mentioned updated script in answer – code_hr May 04 '23 at 05:37
0

This script worked!

import groovy.sql.Sql

def ff=session.create()

def sqlIns = Sql.newInstance('jdbc:sqlserver://servername:port;databaseName=dbname;encrypt=true;trustServerCertificate=true', 'username', 'password', 'com.microsoft.sqlserver.jdbc.SQLServerDriver')

// Query the database to fetch the data
def query = 'SELECT FileName, FileStream FROM table'

def outFiles = [] //list of files for output

sqlIns.eachRow(query){ row->
 log.info "${row.FileName}"
 def binFile = ff.clone(false)
 binFile.filename = row.FileName
 binFile.write{ stream-> stream << row.FileStream }
 outFiles << binFile
 }

REL_SUCCESS << outFiles
session.remove(ff)  

enter image description here

code_hr
  • 11
  • 2