2

Please if anyone can tell me what's wrong I would great appreciate it.

I'm doing multiprocessing with python and using sqlalchemy for the model.

This is the error:

Error in worker: (pymysql.err.OperationalError) (2006, "MySQL server has gone away (BrokenPipeError(32, 'Broken pipe'))") [SQL: "SELECT websites.id AS websites_id, websites.websiteurl AS websites_websiteurl, websites.blogurl AS websites_blogurl, websites.fbid AS websites_fbid, websites.keywordusedtofind AS websites_keywordusedtofind, websites.scrapedon AS websites_scrapedon \nFROM websites \nWHERE (websites.websiteurl LIKE concat(concat('%%', %(websiteurl_1)s), '%%'))"] [parameters: {'websiteurl_1': 'designerblogs.com'}] Traceback (most recent call last): File "/usr/local/lib/python3.5/site-packages/pymysql/connections.py", line 1039, in _write_bytes self._sock.sendall(data) BrokenPipeError: [Errno 32] Broken pipe

Here is my script:

 engine = create_engine('mysql+pymysql://{0}:{1}@{2}/{3}'.format(dbUser,dbPass,hostName,db),echo=False,pool_recycle=3600)
    Base.metadata.create_all(bind=engine)
    Session = scoped_session(sessionmaker(autocommit=True, autoflush=False, bind=engine))

def worker(i, keywordArray):
    for idx,keyword in enumerate(keywordArray):
        try:
            session = Session()
            print("Working on {0} out of {1}".format(idx + 1, len(keywordArray)))
            keyword = keyword.keyword

            print('Worker ', i, ' working on getting google results')
            searchLinks = []
            for i in range(0,1000,100):
                print('Worker ', i, ' working range: ', i)
                soup = getSoup(url)
                if 'did not match any documents.' in soup.get_text():
                    break
                else:
                    x = getArray(soup,keyword)
                    if x:
                        searchLinks += x

            print('Worker ', i, ' working on searchlinks adding to db')
            for result in searchLinks:
                url = result[0]
                blogUrl = result[1]
                plainUrl = url.replace('https://','').replace('http://','').replace('www.','').replace('/','')
                id = ''
                id = session.query(Website).filter(Website.websiteurl.contains(plainUrl)).all()
                if not id:
                    print('adding url: ', url, ' blog: ', blogUrl)
                    session = Session()
                    session.add(Website(websiteurl=url,blogurl=blogUrl,keywordusedtofind=keyword))
                    session.commit()
                    #session.remove()
            # UPDATE KEYWORD IN DB WHEN DONE
            row = session.query(Keyword).filter(Keyword.keyword == keyword).first()
            row.lastscraped = datetime.utcnow()
            session.commit()
            #session.remove()
        except Exception as err:
            print('Error in worker: ', err)
            print(traceback.format_exc())
            pass
    return

if __name__ == "__main__":
    try:
        # INITIAL VARIABLES
        jobs = []
        session = Session()
        keywords = session.query(Keyword).filter(Keyword.lastscraped == None).all()
        #session.remove()
        numProcesses = 20
        keywordArrays = numpy.array_split(numpy.array(keywords),numProcesses)

        # MULTIPROCESSING
        for i in range(numProcesses):
            p = multiprocessing.Process(target=worker,args=(i, keywordArrays[i]))
            jobs.append(p)
            p.start()
        for job in jobs:
            job.join()
    except Exception as err:
        logging.error(err)
        print(err)
        print(traceback.format_exc())
    finally:
        print('FINALLY: COMPLETED')
Fruchtzwerg
  • 10,999
  • 12
  • 40
  • 49
Daniel Rusu
  • 115
  • 1
  • 11
  • 2
    The problem is with MySQL not with SQLAlchemy. Check [this](https://stackoverflow.com/questions/7942154/mysql-error-2006-mysql-server-has-gone-away) question. It discuss similar issue. – Sohaib Farooqi Nov 18 '17 at 02:59
  • 1
    Note that mixing multiprocessing and SQLAlchemy is a bad idea. In general your processes should each contain a private connection pool, i.e. engine. Other techniques to avoid sharing connections may also be used. If your processes happen to share connections, as yoyr code would seem to do, bad things will happen. I hope you've read http://docs.sqlalchemy.org/en/latest/core/pooling.html#using-connection-pools-with-multiprocessing with care. – Ilja Everilä Nov 18 '17 at 08:01
  • Also, using `scoped_session` makes no difference, because you're using processes, not threads. These warnings might not apply if you're on Windows (no forking), or using another method than forking for spawning processes. – Ilja Everilä Nov 18 '17 at 08:10
  • Thanks for the responses. Would you recommend using multithreading instead of multiprocessing then? – Daniel Rusu Nov 19 '17 at 00:05
  • If you need multiprocessing, use Python 3's `multiprocessing.set_start_method('forkserver')`. Then each new process will not start with main process's states at fork point. They will have their own connection (if it is created outside main). Basically, the newly forked process will execute the script with `__name__ != "__main__"`. – Yongwei Wu Feb 01 '18 at 04:28

1 Answers1

2

that the pooled connections are not shared to a forked process. TCP connections are represented as file descriptors, which usually work across process boundaries, meaning this will cause concurrent access to the file descriptor on behalf of two or more entirely independent Python interpreter states.

Depending on specifics of the driver and OS, the issues that arise here range from non-working connections to socket connections that are used by multiple processes concurrently, leading to broken messaging (the latter case is typically the most common).

The SQLAlchemy Engine object refers to a connection pool of existing database connections. So when this object is replicated to a child process, the goal is to ensure that no database connections are carried over. There are three general approaches to this:

  • Disable pooling using NullPool. This is the most simplistic, one shot system that prevents the Engine from using any connection more than once:

    from sqlalchemy.pool import NullPool
    engine = create_engine("mysql://user:pass@host/dbname", poolclass=NullPool)
    
  • Call Engine.dispose() on any given Engine as soon one is within the new process. In Python multiprocessing, constructs such as multiprocessing.Pool include “initializer” hooks which are a place that this can be performed; otherwise at the top of where os.fork() or where the Process object begins the child fork, a single call to Engine.dispose() will ensure any remaining connections are flushed. This is the recommended approach:

    engine = create_engine("mysql://user:pass@host/dbname")
    
    def run_in_process():
        # process starts.  ensure engine.dispose() is called just once
        # at the beginning
        engine.dispose()
    
        with engine.connect() as conn:
            conn.execute(text("..."))
    
    p = Process(target=run_in_process)
    p.start()
    
  • An event handler can be applied to the connection pool that tests for connections being shared across process boundaries, and invalidates them. This approach, when combined with an explicit call to dispose() as mentioned above, should cover all cases:

    from sqlalchemy import event
    from sqlalchemy import exc
    import os
    
    engine = create_engine("...")
    
    @event.listens_for(engine, "connect")
    def connect(dbapi_connection, connection_record):
        connection_record.info['pid'] = os.getpid()
    
    @event.listens_for(engine, "checkout")
    def checkout(dbapi_connection, connection_record, connection_proxy):
        pid = os.getpid()
        if connection_record.info['pid'] != pid:
            connection_record.connection = connection_proxy.connection = None
            raise exc.DisconnectionError(
                    "Connection record belongs to pid %s, "
                    "attempting to check out in pid %s" %
                    (connection_record.info['pid'], pid)
            )
    

Above, we use an approach similar to that described in Disconnect Handling - Pessimistic to treat a DBAPI connection that originated in a different parent process as an “invalid” connection, coercing the pool to recycle the connection record to make a new connection.

When using the above recipe, ensure the dispose approach from #2 is also used, as if the connection pool is exhausted in the parent process when the fork occurs, an empty pool will be copied into the child process which will then hang because it has no connections.