2

I have recently faced a problem of running something on multiple threads/cores. My setup: (Cpython with GIL) python 3.6.3 (anaconda) OS: windows 10 CPU: i7 8700 (6 cores/12 threads) GPU: 1080, 1070 tensorflow==1.8.0 tensorflow-gpu==1.8.0 keras==2.1.5

And there is no bottleneck for sure. RAM usage 6/24 GB Disk usage: 0%

The problem is that threading module seems to use only half of my cores/threads according to task manager, which shows CPU load of only 50% instead of 100.

Here is my code

class Environment(Thread):
    stop_signal = False

    def __init__(self, testing=False, eps_start=EPS_START, eps_end=EPS_STOP, eps_steps=EPS_STEPS):
        Thread.__init__(self)
        self.testing = testing
        self.env = Market(1000, train_data, testing=testing)

        self.agent = Agent(eps_start, eps_end, eps_steps)

    def runEpisode(self):
        s = self.env.reset()

        R = 0
        done = False

        while not done:         
            time.sleep(THREAD_DELAY) # yield 

            a = self.agent.act(s)
            s_, r, done, info = self.env.step(a)

            if done: # terminal state
                s_ = None

            self.agent.train(s, a, r, s_)

            s = s_
            R += r

        print("Total reward:", R)

    def run(self):
        while not self.stop_signal:
            self.runEpisode()
            if self.testing: break

    def stop(self):
        self.stop_signal = True

class Optimizer(Thread):
    stop_signal = False

    def __init__(self):
        Thread.__init__(self)

    def run(self):
        while not self.stop_signal:
            brain.optimize()

    def stop(self):
        self.stop_signal = True


if __name__ == '__main__':

    #-- main
    env_test = Environment(testing=True, eps_start=0., eps_end=0.)
    NUM_STATE = env_test.env.observation_space.shape[0]
    NUM_ACTIONS = env_test.env.action_space.n
    NONE_STATE = np.zeros(NUM_STATE)

    brain = Brain() # brain is global in A3C

    envs = [Environment() for i in range(THREADS)]
    opts = [Optimizer() for i in range(OPTIMIZERS)]

    start_time = time.time()

    for o in opts:
        o.start()

    for e in envs:
        e.start()

    time.sleep(RUN_TIME)

    for e in envs:
        e.stop()

    for e in envs:
        e.join()

    for o in opts:
        o.stop()

    for o in opts:
        o.join()

    print("Training finished in ", time.time() - start_time)

    brain.model.save('dense.h5')
user8075709
  • 115
  • 2
  • 9
  • Does [this thread](https://stackoverflow.com/questions/4496680/python-threads-all-executing-on-a-single-core) helps ? – IMCoins May 30 '18 at 12:56
  • No. I have tried using it. – user8075709 May 30 '18 at 12:58
  • It's surprising to me that it's even using half of your cores. I was under the impression that multithreaded programs execute inside a single process, and so would only operate on a single core. If you're getting 50% CPU load, that's in defiance of my expectations. Very curious. – Kevin May 30 '18 at 13:00
  • @Kevin Indeed threads within a single process *can* run in parallel, hence in multiple CPUs/cores. What are assigned to cores are threads, not processes. One simplistic way to see it is that processes are "containers" for threads sharing an address space, but what really run code and get CPU time are threads. – Alejandro May 30 '18 at 13:09
  • @Kevin I'm not really sure if it is actually running on multiple cores or it is just loading batches into gpu – user8075709 May 30 '18 at 13:09
  • If there is no way this can be solved on high level, can I just change my interpreter to ironpython without rewriting anything? – user8075709 May 30 '18 at 13:12
  • [, , , , , , , ] These are my workers, and number of thread starts with 7, can this be a problem? How to reset it? – user8075709 May 31 '18 at 09:27

1 Answers1

0

Turns out the solution is using multiprocessing module. But I'am having problem integrating it. I am getting BrokenPipeError: [Errno 32] Broken pipe. I think it is in someway related to this line brain = Brain(), since brain is a global variable

class Environment():
    stop_signal = False

    def __init__(self, testing=False, data=train_data, eps_start=EPS_START, eps_end=EPS_STOP, eps_steps=EPS_STEPS):
        self.testing = testing
        self.env = Market(1000, data, testing=testing)
        self.agent = Agent(eps_start, eps_end, eps_steps)

    def runEpisode(self):
        s = self.env.reset()
        done = False

        while True:         
            time.sleep(THREAD_DELAY) # yield 

            a = self.agent.act(s)
            s_, r, done, info = self.env.step(a)

            if done: # terminal state
                s_ = None

            self.agent.train(s, a, r, s_)

            s = s_

            if done or self.stop_signal:
                break

        print("Total reward:", self.env.total)



    def run(self):
        while not self.stop_signal:
            self.runEpisode()
            if self.testing: break

    def stop(self):
        self.stop_signal = True

class Optimizer():
    stop_signal = False

    def run(self):
        while not self.stop_signal:
            brain.optimize()

    def stop(self):
        self.stop_signal = True

np.seterr(all='raise')

env_test = Environment(testing=True, data=test_data, eps_start=0., eps_end=0.)
NUM_STATE = env_test.env.observation_space.shape[0]
NUM_ACTIONS = env_test.env.action_space.n
NONE_STATE = np.zeros(NUM_STATE)

brain = Brain(True) # brain is global in A3C

envs = [Environment() for i in range(THREADS)]
opts = [Optimizer() for i in range(OPTIMIZERS)]

if __name__ == '__main__':
    start = time.time()
    env_threads, opt_threads = [], []

    for env in envs:
        p = Process(target=env.run)
        p.start()
        env_threads.append(p)

    for opt in opts:
        p = Process(target=env.run)
        p.start()
        opt_threads.append(p)

    time.sleep(RUN_TIME)

    for p in env_threads:
        p.stop()

    for p in env_threads:
        p.join()

    for p in opt_threads:
        p.stop()

    for p in opt_threads:
        p.join()

    print('finished in ', time.time() - start)

brain.model.save('dense.h5')
user8075709
  • 115
  • 2
  • 9