0

Below is my flow:

GetFile > ExecuteSparkInteractive > PutFile

I want to read files from GetFile processor in ExecuteSparkInteractive processor, apply some transformations and put it in some location. Below is my flow enter image description here

I wrote spark scala code under code section of spark processor:

val sc1=sc.textFile("local_path")
sc1.foreach(println)

There is nothing happening in the flow. So how can I read files in spark processor using GetFile processor.

2nd Part:
I tried below flow just for practice:

ExecuteScript > PutFile > LogMessage

and I have mentioned below code in executescript processor:

readFile = open("/home/cloudera/Desktop/sample/data","r")
for line in readFile:
    lines = line.strip()
    finalline = re.sub(pattern='((?<=[0-9])[0-9]|(?<=\.)[0-9])',repl='X',string=lines)
readFile = open("/home/cloudera/Desktop/sample/data","w")
readFile.write(finalline)  

Code works fine but it doesn't write the formatted data into the destination folder. So where am I going wrong over here. Also, I installed pandas in local machine and ran pandas code from the executescript processor but nifi doesn't read pandas module. Why is it so ? I tried my best. Also, I couldn't find any relevant links for this where I can get basic flow

whatsinthename
  • 1,828
  • 20
  • 59

1 Answers1

1

This is not really how it works... GetFile is picking up files local to the NiFi node and bringing them into the NiFi flow for processing. ExecuteSparkInteractive kicks off a spark job on a remote Spark cluster, it does not transfer data to Spark. So you would likely want to put the data somewhere Spark can access it, maybe GetFile -> PutHDFS -> ExecuteSparkInteractive.

Bryan Bende
  • 18,320
  • 1
  • 28
  • 39
  • But how to save transformed data into new location after spark executes it's logic – whatsinthename Apr 10 '19 at 16:01
  • The spark job must write the results out somewhere right? Are you saying you want to move the results after spark writes them somewhere? If so you can monitor a location using ListHDFS – Bryan Bende Apr 10 '19 at 16:19
  • I didn't get you . How will be the flow then . – whatsinthename Apr 10 '19 at 16:21
  • Two different flows 1) GetFile -> PutHDFS -> ExecuteSpark 2) ListHDFS (location of spark output) -> FetchHDFS -> PutHDFS (new location) – Bryan Bende Apr 10 '19 at 16:29
  • So I will have to create two process groups . This is what you mean ?? – whatsinthename Apr 10 '19 at 16:31
  • It could all be on the root canvas, or each in process groups, that part doesn't really matter, that is just a preference of how to organize your flows – Bryan Bende Apr 10 '19 at 16:52
  • Okay this makes sense. Actually my original problem was to read events from consumeAzureEventHub processor and Transfer it to putGCSobject and then according to you I should do the spark transformation at Google cloud side. Is it possible to do it between these processors or i ll have to follow your way ? – whatsinthename Apr 10 '19 at 17:17
  • Correct, you can't run spark against data that is NiFi... I don't know what your spark job is doing, but you may be able to achieve the same functionality in NiFi without needing spark, but it depends what the spark job is doing – Bryan Bende Apr 10 '19 at 17:21
  • In spark job, i am reading the data then I am applying one regex pattern to mask the data and writing back the transformed data into file. So how can I do it in NiFi without spark ? – whatsinthename Apr 10 '19 at 17:24
  • There are probably several options... there is a ReplaceText processor in NiFi that perform text replacements based on regexes, or since you already wrote the code for your spark job, you could probably write the code for an ExecuteScript processor in either Groovy or Jython – Bryan Bende Apr 10 '19 at 18:00
  • So ConsumeAzureEventHub -> ExecuteScript -> PutGcsObject or something similar – Bryan Bende Apr 10 '19 at 18:01
  • Cool. So I ll try first then ll let u know. Thank you so much for your help :) – whatsinthename Apr 10 '19 at 18:19
  • I decided to create a python script for executescriptprocessor . So is pandas supported in this processor?? I couldn't find any details regarding this. Do you have any idea regarding this ? – whatsinthename Apr 11 '19 at 05:38
  • I could do it with pandas library of python but the thing is this package doesn't come by default. We have to install this library unless and until we are working with the Anaconda distribution. So how can I do it ?? Any idea ?? – whatsinthename Apr 11 '19 at 05:48
  • I have modified my question as 2nd part. Could you please guide me in this ?? @BryanBende – whatsinthename Apr 12 '19 at 11:16
  • This answer explains how to use Pandas from NiFi - https://stackoverflow.com/questions/55635834/python-error-in-apache-nifi-import-error-no-module-named-pandas – Bryan Bende Apr 12 '19 at 13:11
  • Your script is not operating on flow files, it is accessing a file outside of Nifi and writing back outside of NiFi, you'll want to learn more about how to use ExecuteScript processor - https://funnifi.blogspot.com/2016/02/executescript-processor-hello-world.html – Bryan Bende Apr 12 '19 at 14:30
  • Okay ll go through this – whatsinthename Apr 12 '19 at 14:33