Please if anyone can tell me what's wrong I would great appreciate it.
I'm doing multiprocessing with python and using sqlalchemy for the model.
This is the error:
Error in worker: (pymysql.err.OperationalError) (2006, "MySQL server has gone away (BrokenPipeError(32, 'Broken pipe'))") [SQL: "SELECT websites.id AS websites_id, websites.websiteurl AS websites_websiteurl, websites.blogurl AS websites_blogurl, websites.fbid AS websites_fbid, websites.keywordusedtofind AS websites_keywordusedtofind, websites.scrapedon AS websites_scrapedon \nFROM websites \nWHERE (websites.websiteurl LIKE concat(concat('%%', %(websiteurl_1)s), '%%'))"] [parameters: {'websiteurl_1': 'designerblogs.com'}] Traceback (most recent call last): File "/usr/local/lib/python3.5/site-packages/pymysql/connections.py", line 1039, in _write_bytes self._sock.sendall(data) BrokenPipeError: [Errno 32] Broken pipe
Here is my script:
engine = create_engine('mysql+pymysql://{0}:{1}@{2}/{3}'.format(dbUser,dbPass,hostName,db),echo=False,pool_recycle=3600)
Base.metadata.create_all(bind=engine)
Session = scoped_session(sessionmaker(autocommit=True, autoflush=False, bind=engine))
def worker(i, keywordArray):
for idx,keyword in enumerate(keywordArray):
try:
session = Session()
print("Working on {0} out of {1}".format(idx + 1, len(keywordArray)))
keyword = keyword.keyword
print('Worker ', i, ' working on getting google results')
searchLinks = []
for i in range(0,1000,100):
print('Worker ', i, ' working range: ', i)
soup = getSoup(url)
if 'did not match any documents.' in soup.get_text():
break
else:
x = getArray(soup,keyword)
if x:
searchLinks += x
print('Worker ', i, ' working on searchlinks adding to db')
for result in searchLinks:
url = result[0]
blogUrl = result[1]
plainUrl = url.replace('https://','').replace('http://','').replace('www.','').replace('/','')
id = ''
id = session.query(Website).filter(Website.websiteurl.contains(plainUrl)).all()
if not id:
print('adding url: ', url, ' blog: ', blogUrl)
session = Session()
session.add(Website(websiteurl=url,blogurl=blogUrl,keywordusedtofind=keyword))
session.commit()
#session.remove()
# UPDATE KEYWORD IN DB WHEN DONE
row = session.query(Keyword).filter(Keyword.keyword == keyword).first()
row.lastscraped = datetime.utcnow()
session.commit()
#session.remove()
except Exception as err:
print('Error in worker: ', err)
print(traceback.format_exc())
pass
return
if __name__ == "__main__":
try:
# INITIAL VARIABLES
jobs = []
session = Session()
keywords = session.query(Keyword).filter(Keyword.lastscraped == None).all()
#session.remove()
numProcesses = 20
keywordArrays = numpy.array_split(numpy.array(keywords),numProcesses)
# MULTIPROCESSING
for i in range(numProcesses):
p = multiprocessing.Process(target=worker,args=(i, keywordArrays[i]))
jobs.append(p)
p.start()
for job in jobs:
job.join()
except Exception as err:
logging.error(err)
print(err)
print(traceback.format_exc())
finally:
print('FINALLY: COMPLETED')