I am querying an Elasticsearch cluster from PySpark (using the ES-Hadoop connector. I have the queries working as expected (I think). Basically what I am trying to do is query our syslog indexes for, say, the last 14 days, get the message and write them to text in HDFS. I am going to be using the file to train a model on my machine learning side.
All of our code is in Python - for ad-hoc testing, right now I am using PySpark (Spark 2.1.1) and Pythong 2.7.5.
I have the query to Elastic working to only give me back the messages, which is what I want, but the query comes back looking like:
[(u'AV8Tfpi-37tcXZuJd_88', {u'message': u'NET: Unregistered protocol family 36'}), (u'AV8TfrTZ37tcXZuJeALS', {u'message': u'NET: Unregistered protocol family 36'}), (u'AV8Tfrdm37tcXZuJeANA', {u'message': u'DHCPACK to 10.42.44.223 (10:0b:a9:b1:c0:94) via eth2'}), (u'AV8TfrdlHGE25JoGXLyh', {u'message': u'DHCPINFORM from 10.7.35.184 via 10.7.35.254'}), (u'AV8TfrtH37tcXZuJeAOB', {u'message': u'DHCPINFORM from 10.7.23.219 via 10.7.23.254'}), (u'AV8Tfr-T37tcXZuJeAQK', {u'message': u'DHCPINFORM from 10.42.62.156 via 10.42.63.252'}), (u'AV8TfsDkHGE25JoGXL09', {u'message': u'publish.go:104: Events sent: 157'}), (u'AV8Tfsoi37tcXZuJeAXB', {u'message': u'NET: Registered protocol family 36'}), (u'AV8TftjdHGE25JoGXMES', {u'message': u'DHCPDISCOVER from 00:24:7e:03:f1:23 via 10.7.8.254 : peer holds all free leases'}), (u'AV8Tfttu37tcXZuJeAhK', {u'message': u'NET: Unregistered protocol family 36'})]
This is just a 10-record sample. My whole index from ES is huge. My goal is to take those messages, for example in the first record, "NET: Unregistered protocol family 36" and write them out just as plain text (each message being on its own line) so at the end of the day I have 14 text files in HDFS of only the messages representing those 14 days of message from syslog.
After that, I want to do the same thing, but have the message, timestamp and hostname as a CSV file.
Now, my issue - how the heck do I use Python/PySpark to take what Elastic returns to me and just create a text file that contains just those field. It's coming back in JSON, I get that. I have no idea what that first ID number is, but that looks like its coming from ES. I am new to Python and not sure how to parse this return. It looks like its a list of JSON object, but being new to Python, I'm not sure.
This won't need to be a job that is run that often, I really, am only looking to do this to get the historical data from ES as it holds 30 days worth of data. For future state I'll just grab stuff out of our Kafka cluster and feed it in as it gets ingested and queued. For now, I just need to figure out how to parse and format what ES is returning to me. Any help would be great. Thanks!