0

I am building a distributed spider in python, there is a problem with the code. I created an packagemanager object outside, and pass it to gensismanager.py, and i created 4 threads based on 4 functions in gensismanager. but when each of them calling self.packagemanaer, they are calling different object, they got their own copy of that. Why? How can I only share one copy of that? Thanks!

in GensisManamger.py

class GensisManager(object):
    host_ip = '192.168.1.118'
    host_port = 10001
    authkey = 'pathea'

    def __init__(self, service, packageManager):
        self.logManager = LogManager(service)
        self.packageFactory = PackageFactory()
        self.service = service
        self.dataWrapper = DataWrapper()
        self.cnames = DataTypeNames()
        self.analyzeManager = AnalyzeManager(self.service)
        self.packageManager = packageManager
        print('init pack ', self.packageManager)

    def start_gensisManager_master(self, task_queue, result_queue):
        def _get_task_queue():
            return task_queue
        def _get_result_queue():
            return result_queue

        BaseManager.register('get_task_queue', callable = _get_task_queue)
        BaseManager.register('get_result_queue', callable = _get_result_queue)
        # BaseManager.register('_get_ownership_queue', callable = _get_ownership_result_queue)

        manager = BaseManager(address = (self.host_ip, self.host_port), authkey = self.authkey.encode('utf-8'))
        print('[GS] Usernet service created.')
        return manager

    def setStatTool(self, statTool):
        self.statTool = statTool

    def package_manager_process_distribute(self, datas, task_queue, wait_queue):
        # check logger for breakpoint(new packages)
        if datas is None:
            sys.exit('[GS] Error: no packages to start.')

        initData = datas[self.cnames.data]
        initStat = datas[self.cnames.stat]
        # init packages with found packages or new packages
        self.packageManager.add_new_packages(initData)
        print('aaaaa', self.packageManager)
        if initStat != None:
            self.analyzeManager.initStatFromOldStat(initStat)

        print('[GS] Package distributor thread running.......')
        while True:
            time.sleep(1)

            while self.packageManager.has_new_package():
                # get new package from manager and send it to task queue awaiting for slaves
    #            new_package = packageManager.get_new_package()
                # NEED PACKAGE GENERATOR
                # PUT IN A PACKAGE
    #            task_queue.put(new_package)
                # add work end conditions. end code not completed
                break

            while not wait_queue.empty():
                print('[GS] Added a failed package to task queue.')
                package = wait_queue.get()
                self.packageManager.remove_visited_package(package)

    def package_manager_process_collect(self, conn_queue):
        print('[GS] Package collector thread running.......')
        print('ccc', self.packageManager)
        while True:
            while not conn_queue.empty():
                try:
                    datas = conn_queue.get()
                    package_manager.add_new_packages(datas)
                except BaseException as e:
                    time.sleep(0.5)

in Gensis.py

packageManager = PackageManager(service)
    print('ssss', packageManager)
    manager = GensisManager(service, packageManager)



datas = manager.loadInitData()
package_manager_process_distribute = Process(target = manager.package_manager_process_distribute, args = (datas, task_queue, wait_queue,))
package_manager_process_collect = Process(target = manager.package_manager_process_collect, args = (conn_queue,))
result_analysis_process = Process(target = manager.result_analysis_process, args = (result_queue, conn_queue, store_queue, wait_queue,))
store_process = Process(target = manager.store_process, args = (store_queue,))

package_manager_process_distribute.start()
package_manager_process_collect.start()
result_analysis_process.start()
store_process.start()

package_manager_process_distribute.join()
package_manager_process_collect.join()
result_analysis_process.join()
store_process.join()
Tony Wang
  • 1
  • 1
  • 1
    Please can you also share the code that sets up the threads/tasks? – Tom Dalton Jul 08 '19 at 10:52
  • I have posted the setup code. thanks! – Tony Wang Jul 09 '19 at 02:11
  • Can you show us how/why you think they are using different package manager objects? – Tom Dalton Jul 09 '19 at 11:26
  • When I print the hex(id(self.packageManager)) in GensisManager.py, they print out different address. Also, when I calling self.packageManager from threads other than distributing thread(the first one), they would see the contents as nil, which does not reflect the content change of that in the first thread – Tony Wang Jul 10 '19 at 01:51
  • Hmm, that does seem odd. I'd probably add something like https://stackoverflow.com/questions/1156023/print-current-call-stack-from-a-method-in-python-code into the package manager class init ,so that you can see when and where the other package managers are being created. – Tom Dalton Jul 11 '19 at 22:11
  • Thanks I ll try that. This is one of the weirdest situation i have never met. so weird – Tony Wang Jul 12 '19 at 06:20

0 Answers0