4

I write a asyncio program like this. A forever run loop start 4 event as the same time. Every event will run the rpc service. In the nameko service, I implement the service with time.sleep(10).

I confused why the service is finished every 10 seconds. I think the service should finish at the same time. How can I let the job finished at the same time?

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def job(x):
    try:
        with ClusterRpcProxy(CONFIG) as rpc:
            res = rpc.helloworldService.helloworld(x)
            print(res)
    except Exception as e:
        print(f"{e}")


async def do_sleep(x, queue):
        try:
             await job(x)
             queue.put("ok")
        except Exception as e:
            print(f"{e}")


def consumer():
    asyncio.run_coroutine_threadsafe(do_sleep('10', queue), new_loop)
    asyncio.run_coroutine_threadsafe(do_sleep('11', queue), new_loop)
    asyncio.run_coroutine_threadsafe(do_sleep('12', queue), new_loop)
    asyncio.run_coroutine_threadsafe(do_sleep('13', queue), new_loop)


if __name__ == '__main__':
    print(time.ctime())
    new_loop = asyncio.new_event_loop()

    loop_thread = Thread(target=start_loop, args=(new_loop,))
    loop_thread.setDaemon(True)
    loop_thread.start()

    CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}
    queue = Queue()
    sema = asyncio.Semaphore(2)

    consumer_thread = Thread(target=consumer)
    consumer_thread.setDaemon(True)
    consumer_thread.start()

    while True:
        msg = queue.get()
        print("current:", time.ctime())

The nameko rpc service is:

class HelloWorld:
    name = 'helloworldService'

    @rpc
    def helloworld(self,str):
        time.sleep(10)
        return 'hello_'+str

And the output is follows:

hello_10
current: Sat Jan 26 13:04:57 2019
hello_11
current: Sat Jan 26 13:05:07 2019
hello_12
current: Sat Jan 26 13:05:17 2019
hello_13
current: Sat Jan 26 13:05:28 2019
Benyamin Jafari
  • 27,880
  • 26
  • 135
  • 150
  • Your `job` function does not contain an `await`, so it cannot run in parallel with anything else. You need to switch to an RPC library that supports async calls. – user4815162342 Jan 26 '19 at 07:46

1 Answers1

0

You have to use an awaitable sleep instead of un-awaitable time.sleep(). So your nameko RPC service will be as the following:

import asyncio

class HelloWorld:
    name = 'helloworldService'

    @rpc
    async def helloworld(self,str):  # Note
        await asyncio.sleep(10)  # Note
        return 'hello_'+str

And piece of your server code:

async def job(x):
    try:
        with ClusterRpcProxy(CONFIG) as rpc:
            res = await rpc.helloworldService.helloworld(x)  # Note
            print(res)
    except Exception as e:
        print(f"{e}")

[NOTE]

  • But your RPC library should be implemented by asyncio too.
  • This is an Async asyncio RPC library (aiorpc).
Benyamin Jafari
  • 27,880
  • 26
  • 135
  • 150