0

I am querying an Elasticsearch cluster from PySpark (using the ES-Hadoop connector. I have the queries working as expected (I think). Basically what I am trying to do is query our syslog indexes for, say, the last 14 days, get the message and write them to text in HDFS. I am going to be using the file to train a model on my machine learning side.

All of our code is in Python - for ad-hoc testing, right now I am using PySpark (Spark 2.1.1) and Pythong 2.7.5.

I have the query to Elastic working to only give me back the messages, which is what I want, but the query comes back looking like:

[(u'AV8Tfpi-37tcXZuJd_88', {u'message': u'NET: Unregistered protocol family 36'}), (u'AV8TfrTZ37tcXZuJeALS', {u'message': u'NET: Unregistered protocol family 36'}), (u'AV8Tfrdm37tcXZuJeANA', {u'message': u'DHCPACK to 10.42.44.223 (10:0b:a9:b1:c0:94) via eth2'}), (u'AV8TfrdlHGE25JoGXLyh', {u'message': u'DHCPINFORM from 10.7.35.184 via 10.7.35.254'}), (u'AV8TfrtH37tcXZuJeAOB', {u'message': u'DHCPINFORM from 10.7.23.219 via 10.7.23.254'}), (u'AV8Tfr-T37tcXZuJeAQK', {u'message': u'DHCPINFORM from 10.42.62.156 via 10.42.63.252'}), (u'AV8TfsDkHGE25JoGXL09', {u'message': u'publish.go:104: Events sent: 157'}), (u'AV8Tfsoi37tcXZuJeAXB', {u'message': u'NET: Registered protocol family 36'}), (u'AV8TftjdHGE25JoGXMES', {u'message': u'DHCPDISCOVER from 00:24:7e:03:f1:23 via 10.7.8.254 : peer holds all free leases'}), (u'AV8Tfttu37tcXZuJeAhK', {u'message': u'NET: Unregistered protocol family 36'})]

This is just a 10-record sample. My whole index from ES is huge. My goal is to take those messages, for example in the first record, "NET: Unregistered protocol family 36" and write them out just as plain text (each message being on its own line) so at the end of the day I have 14 text files in HDFS of only the messages representing those 14 days of message from syslog.

After that, I want to do the same thing, but have the message, timestamp and hostname as a CSV file.

Now, my issue - how the heck do I use Python/PySpark to take what Elastic returns to me and just create a text file that contains just those field. It's coming back in JSON, I get that. I have no idea what that first ID number is, but that looks like its coming from ES. I am new to Python and not sure how to parse this return. It looks like its a list of JSON object, but being new to Python, I'm not sure.

This won't need to be a job that is run that often, I really, am only looking to do this to get the historical data from ES as it holds 30 days worth of data. For future state I'll just grab stuff out of our Kafka cluster and feed it in as it gets ingested and queued. For now, I just need to figure out how to parse and format what ES is returning to me. Any help would be great. Thanks!

desertnaut
  • 57,590
  • 26
  • 140
  • 166
azdatasci
  • 801
  • 1
  • 13
  • 32

1 Answers1

1

You mean something like this?

spark.version
# u'2.1.1'
jsonRDD = sc.parallelize([(u'AV8Tfpi-37tcXZuJd_88', {u'message': u'NET: Unregistered protocol family 36'}), (u'AV8TfrTZ37tcXZuJeALS', {u'message': u'NET: Unregistered protocol family 36'}), (u'AV8Tfrdm37tcXZuJeANA', {u'message': u'DHCPACK to 10.42.44.223 (10:0b:a9:b1:c0:94) via eth2'}), (u'AV8TfrdlHGE25JoGXLyh', {u'message': u'DHCPINFORM from 10.7.35.184 via 10.7.35.254'}), (u'AV8TfrtH37tcXZuJeAOB', {u'message': u'DHCPINFORM from 10.7.23.219 via 10.7.23.254'}), (u'AV8Tfr-T37tcXZuJeAQK', {u'message': u'DHCPINFORM from 10.42.62.156 via 10.42.63.252'}), (u'AV8TfsDkHGE25JoGXL09', {u'message': u'publish.go:104: Events sent: 157'}), (u'AV8Tfsoi37tcXZuJeAXB', {u'message': u'NET: Registered protocol family 36'}), (u'AV8TftjdHGE25JoGXMES', {u'message': u'DHCPDISCOVER from 00:24:7e:03:f1:23 via 10.7.8.254 : peer holds all free leases'}), (u'AV8Tfttu37tcXZuJeAhK', {u'message': u'NET: Unregistered protocol family 36'})])

jsonRDD2 = jsonRDD.map(lambda x: x[1]['message'])

# for inspection purposes only:
jsonRDD2.collect()
# [u'NET: Unregistered protocol family 36', 
#  u'NET: Unregistered protocol family 36',
#  u'DHCPACK to 10.42.44.223 (10:0b:a9:b1:c0:94) via eth2',
#  u'DHCPINFORM from 10.7.35.184 via 10.7.35.254',
#  u'DHCPINFORM from 10.7.23.219 via 10.7.23.254',
#  u'DHCPINFORM from 10.42.62.156 via 10.42.63.252',
#  u'publish.go:104: Events sent: 157', 
#  u'NET: Registered protocol family 36',
#  u'DHCPDISCOVER from 00:24:7e:03:f1:23 via 10.7.8.254 : peer holds all free leases', 
#  u'NET: Unregistered protocol family 36']

jsonRDD2.saveAsTextFile("path/to/file")
desertnaut
  • 57,590
  • 26
  • 140
  • 166
  • 1
    Perfect! Sorry, I know this was probably beginners stuff, but I'm new to Python. Been working on R for years have have done tons of C and Java in the past. Still getting my bearings with Python and Spark... :) – azdatasci Nov 08 '17 at 16:39
  • @azdatasci no worries! thanx for accepting & upvoting – desertnaut Nov 08 '17 at 16:42
  • Quick question on the save above - well first off, I am logged in as the "spark" user - when I save that file using the above, it saves it to HFDS in the /usr/spark/ folder - but it's not a text file, it in my File View of HDFS, it appears to be a folder called "stuff.txt" and in that folder are a bunch of files like "part-0001" and so forth... I assume this is how Hadoop splits the file up into its blocks, how do I dump this to a text file locally on the OS file system? – azdatasci Nov 08 '17 at 16:50
  • 1
    @azdatasci `coalesce(1)` or `repartition(1)` might do the job, but I am not quite sure - have a look at [how to make saveAsTextFile NOT split output into multiple file?](https://stackoverflow.com/questions/24371259/how-to-make-saveastextfile-not-split-output-into-multiple-file) (it's for the Scala API, but these methods are available in PySpark, too) – desertnaut Nov 08 '17 at 16:57
  • 1
    @azdatasci I mean `jsonRDD2.repartition(1).saveAsTextFile("path/to/file")` or `jsonRDD2.coalesce(1).saveAsTextFile("path/to/file")`... – desertnaut Nov 08 '17 at 17:05
  • Awesome. I'll check it out. – azdatasci Nov 08 '17 at 17:06
  • Ok, so this is pretty much what I need, but what I'd like to do is write this out to a plain text file to the local system to examine for something else. The repartition() and coalesce() functions work great to get it into one place. Now I just want to write a file, say "messages.txt" out to my local drive on the Linux box, say under /home/spark/messages.txt". The only reason I am doing this is to check some things on the formatting and I'll need to provide this file outside of HDFS for something else anyway. How would I do this? – azdatasci Nov 08 '17 at 20:52
  • Ok, to clarify something, when I do the above, I believe the last thing I end up with is a "list", not an RDD. Just wanted to clarify on that point. So now, I just need to save that list to a local text file. – azdatasci Nov 08 '17 at 21:52
  • @azdatasci `jsonRDD2` is an RDD. You end up with a list only if you `collect()`, which I did myself only for demonstration purposes (as I note). Thing is, `collect` will bring all the data locally to your driver program, and it is not recommended unless you are sure that they will fit in your main memory. – desertnaut Nov 08 '17 at 21:57
  • @desertnaugt - Gotcha, I did do a collect() back to the original RDD, which is why I have a list (I think), I'll go back and clear stuff out and do it over again without the collect() step. That might be what is giving me problems. – azdatasci Nov 08 '17 at 22:03
  • Ok, got it working, I think I did something weird with the RDD that turned it into a list for some reason. After quitting PySpark and starting over and not using the collect() to turn it into a list, it wrote out as expect. It still make the "message.txt" as a folder that contains the part-00000 file, but that contains the data. Last question - how do I make it just write the output to message.txt as the file, rather than the whole folder with the _SUCCESS file and part-00000 file? – azdatasci Nov 08 '17 at 22:25
  • 1
    @azdatasci haven't we discuss this already above (`coalesce(1)`, `repartition(1)`, and a relevant link)?? – desertnaut Nov 09 '17 at 06:13
  • What we discussed above does get everything into a single file, but when I write that out to disk, instead of a single file "messages.txt", I get a folder called message.txt and inside I have a file called part-0000 and a _SUCCESS file. Now that "part-0000" file does contain the data, I was hoping I could somehow just have it write an actual file called "messages.txt" without the folder structure. Just looking to automate this, since i have something else that looks for that "messages.txt" file. Sorry if I wasn't clear on that... – azdatasci Nov 09 '17 at 06:32
  • @azdatasci sorry, no further ideas on this; maybe try saving directly to a local path instead of HDFS? – desertnaut Nov 09 '17 at 06:34
  • 1
    That's what I am doing, when I save it, I use "file:///home/spark/messages.txt" as the path. I think what I need to do is what I've seen on some other people do is create a file using a file writer, then print each line of the RDD out to the file. It's a few lines of code, but I was hoping there was a one-liner. No worries - you have been a major help. I think I am good for now. Thanks so much! – azdatasci Nov 09 '17 at 07:29