I have a continuous stream of JSON chunks displayed on my Windows console 10 times a second. They are of the following pattern.
...
..
{
"acl_chk_fail": 0,
"audits_received": 0,
"blocks_ack": 337,
"connect_events": 2,
"connected_clients": 1,
"disconnect_events": 1,
"dup_ack": 78226,
"dup_ack_fail": 0,
"dup_drop": 0,
"hdr_chk_fail": 1,
"historical_messages_dropped": 0,
"messages_blocked": 34127,
"messages_received": 112353,
"messages_verified": 112353,
"new_ack": 32522,
"new_ack_fail": 1504,
"pub_block": 337,
"pub_msg": 34026,
"sig_chk_fail": 0
}
{
"acl_chk_fail": 0,
"audits_received": 0,
"blocks_ack": 690,
"connect_events": 74,
"connected_clients": 2,
"disconnect_events": 86,
"dup_ack": 2549130,
"dup_ack_fail": 2217,
"dup_drop": 0,
"hdr_chk_fail": 72,
"historical_messages_dropped": 0,
"messages_blocked": 77526,
"messages_received": 2629872,
"messages_verified": 2628902,
"new_ack": 71157,
"new_ack_fail": 6369,
"pub_block": 690,
"pub_msg": 77526,
"sig_chk_fail": 0
}
..
..
I need to INSERT every JSON chunk displayed on the console into a local MySQL database. I created the MySQL dB as follows:
CREATE DATABASE r_q_arch;
USE r_q_arch;
CREATE TABLE IF NOT EXISTS json_data(
record_id INTEGER PRIMARY KEY NOT NULL auto_increment,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP() , -- Eg: 2019-12-16 16:18:18.910
acl_chk_fail INTEGER,
audits_received INTEGER,
blocks_ack INTEGER,
connect_events INTEGER,
connected_clients INTEGER,
disconnect_events INTEGER,
dup_ack INTEGER,
dup_ack_fail INTEGER,
dup_drop INTEGER,
hdr_chk_fail INTEGER,
historical_messages_dropped INTEGER,
messages_blocked INTEGER,
messages_received INTEGER,
messages_verified INTEGER,
new_ack INTEGER,
new_ack_fail INTEGER,
pub_block INTEGER,
pub_msg INTEGER,
sig_chk_fail INTEGER
);
COMMIT;
I then used a Python script to manually insert a record to check if my MySQL connection was working . It does.
from datetime import datetime
currentDT = datetime.now()
# path for module 'MySQLdb': C:\Python37\Lib\site-packages
import sys
sys.path.append('C:\Python37\Lib\site-packages')
import MySQLdb
conn = MySQLdb.connect(host='localhost', user='root', passwd='', db='r_q_arch')
c = conn.cursor()
sql_stmt = """INSERT INTO json_data VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """
values = (1, '2019-12-16 16:14:12.234543',
0,
0,
690,
74,
2,
86,
2549130,
2217,
0,
72,
0,
77526,
2629872,
2628902,
1157,
6369,
690,
77526,
0)
c.execute(sql_stmt,values) #https://stackoverflow.com/questions/41028774/python-mysql-mysql-exceptions-programmingerror-1064-you-have-an-error-in-y
conn.commit()
c.execute("""SELECT * FROM json_data;""")
rows = c.fetchall()
for eachRow in rows:
print(eachRow)
The closest resource I have seen involves INSERTing Streaming JSON data from the Twitter API (Resource link: https://towardsdatascience.com/streaming-twitter-data-into-a-mysql-database-d62a02b050d6). But, my streaming JSON data is not from an API, it is generated on the console using the Python script below. How do I proceed to insert streaming JSON data displayed on the console into my local MySQL database? The Keys and Values of the JSON data are of the form described in the first code snippet.
import json
import zmq
topic = b'transaction-1m'
ctxt = zmq.Context.instance()
stats_sink = ctxt.socket(zmq.SUB)
stats_sink.subscribe(topic)
stats_sink.connect('tcp://my_ip_address:port’)
try:
while True:
tpc, msg = stats_sink.recv_multipart()
msg = json.loads(str(msg, encoding='ascii'))
print(json.dumps(msg, indent=True, sort_keys=True))
finally:
stats_sink.close()
ctxt.destroy()