0

I use pika to communicate with rabbitmq.I consume a queue,when receive a message,i start a pykka's actor to run my logic,sometimes my logic may use the pika connection to publish message.but i found that it will cost 5 seconds to create a channel in actor.Here is the code:

def on_conn_open(connection):
    connection.channel(lambda ch:ch.basic_consume(on_message,"q1"))
def on_message(channel, basic_deliver, properties, body):
    channel.basic_ack(basic_deliver.delivery_tag)
    body=body.decode(encoding = 'utf8')
    print(channel,body)
    if body=="go":
        log.debug("start...")
        actor.tell({})
conn=pika.SelectConnection(pika.URLParameters('amqp://guest:guest@127.0.0.1:5672'),on_open_callback=on_conn_open,stop_ioloop_on_close=False) 

class TTT(pykka.ThreadingActor):
    def on_receive(self, message):
        conn.channel(lambda ch:log.debug("ok"))

actor=TTT.start()
conn.ioloop.start()

Here is the console:

2016-11-11 15:04:52,292 test        : DEBUG    start...
2016-11-11 15:04:52,292 pika.connection: DEBUG    Creating channel 3
2016-11-11 15:04:52,292 pika.callback: DEBUG    Added: {'only': <Channel number=3 CLOSED conn=<SelectConnection OPEN socket=('127.0.0.1', 2314)->('127.0.0.1', 5672) params=<URLParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>>, 'calls': 1, 'one_shot': True, 'callback': <bound method Connection._on_channel_cleanup of <SelectConnection OPEN socket=('127.0.0.1', 2314)->('127.0.0.1', 5672) params=<URLParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>>, 'arguments': None}
2016-11-11 15:04:52,292 pika.callback: DEBUG    Added: {'only': None, 'one_shot': False, 'callback': <bound method Channel._on_getempty of <Channel number=3 OPENING conn=<SelectConnection OPEN socket=('127.0.0.1', 2314)->('127.0.0.1', 5672) params=<URLParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>>>, 'arguments': None}
2016-11-11 15:04:52,292 pika.callback: DEBUG    Added: {'only': None, 'one_shot': False, 'callback': <bound method Channel._on_cancel of <Channel number=3 OPENING conn=<SelectConnection OPEN socket=('127.0.0.1', 2314)->('127.0.0.1', 5672) params=<URLParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>>>, 'arguments': None}
2016-11-11 15:04:52,292 pika.callback: DEBUG    Added: {'only': None, 'one_shot': False, 'callback': <bound method Channel._on_flow of <Channel number=3 OPENING conn=<SelectConnection OPEN socket=('127.0.0.1', 2314)->('127.0.0.1', 5672) params=<URLParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>>>, 'arguments': None}
2016-11-11 15:04:52,292 pika.callback: DEBUG    Added: {'only': None, 'calls': 1, 'one_shot': True, 'callback': <bound method Channel._on_close of <Channel number=3 OPENING conn=<SelectConnection OPEN socket=('127.0.0.1', 2314)->('127.0.0.1', 5672) params=<URLParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>>>, 'arguments': None}
2016-11-11 15:04:52,292 pika.channel: DEBUG    Entering blocking state on frame <Channel.Open(['out_of_band='])>; acceptable_replies=[<class 'pika.spec.Channel.OpenOk'>]
2016-11-11 15:04:52,292 pika.channel: DEBUG    Adding on_synchronous_complete callback
2016-11-11 15:04:52,292 pika.callback: DEBUG    Added: {'only': None, 'calls': 1, 'one_shot': True, 'callback': <bound method Channel._on_synchronous_complete of <Channel number=3 OPENING conn=<SelectConnection OPEN socket=('127.0.0.1', 2314)->('127.0.0.1', 5672) params=<URLParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>>>, 'arguments': None}
2016-11-11 15:04:52,292 pika.channel: DEBUG    Adding passed-in callback
2016-11-11 15:04:52,292 pika.callback: DEBUG    Added: {'only': None, 'calls': 1, 'one_shot': True, 'callback': <bound method Channel._on_openok of <Channel number=3 OPENING conn=<SelectConnection OPEN socket=('127.0.0.1', 2314)->('127.0.0.1', 5672) params=<URLParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>>>, 'arguments': None}
2016-11-11 15:04:57,298 pika.callback: DEBUG    Processing 3:Channel.OpenOk
2016-11-11 15:04:57,298 pika.callback: DEBUG    Processing use of oneshot callback
2016-11-11 15:04:57,298 pika.callback: DEBUG    0 registered uses left
2016-11-11 15:04:57,298 pika.callback: DEBUG    Removing callback #0: {'only': None, 'calls': 0, 'one_shot': True, 'callback': <bound method Channel._on_synchronous_complete of <Channel number=3 OPENING conn=<SelectConnection OPEN socket=('127.0.0.1', 2314)->('127.0.0.1', 5672) params=<URLParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>>>, 'arguments': None}
2016-11-11 15:04:57,298 pika.callback: DEBUG    Processing use of oneshot callback
2016-11-11 15:04:57,298 pika.callback: DEBUG    0 registered uses left
2016-11-11 15:04:57,298 pika.callback: DEBUG    Removing callback #0: {'only': None, 'calls': 0, 'one_shot': True, 'callback': <bound method Channel._on_openok of <Channel number=3 OPENING conn=<SelectConnection OPEN socket=('127.0.0.1', 2314)->('127.0.0.1', 5672) params=<URLParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>>>, 'arguments': None}
2016-11-11 15:04:57,298 pika.callback: DEBUG    Calling <bound method Channel._on_synchronous_complete of <Channel number=3 OPENING conn=<SelectConnection OPEN socket=('127.0.0.1', 2314)->('127.0.0.1', 5672) params=<URLParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>>> for "3:Channel.OpenOk"
2016-11-11 15:04:57,298 pika.channel: DEBUG    0 blocked frames
2016-11-11 15:04:57,298 pika.callback: DEBUG    Calling <bound method Channel._on_openok of <Channel number=3 OPENING conn=<SelectConnection OPEN socket=('127.0.0.1', 2314)->('127.0.0.1', 5672) params=<URLParameters host=127.0.0.1 port=5672 virtual_host=/ ssl=False>>>> for "3:Channel.OpenOk"
2016-11-11 15:04:57,298 test        : DEBUG    ok

why pika create a channel will always cost 5 seconds when use in pykka?Please help me to solve this

frh10
  • 1
  • 2

1 Answers1

0

It seems that pika's is not really suited for threads, so there is no problem with pika+pykka but with pika+threads in general, try using a GeventActor or EventletActor instead of ThreadActor and you'll see some improvements.

From pika's github:

Since threads aren't appropriate to every situation, it doesn't require threads. It takes care not to forbid them, either. The same goes for greenlets, callbacks, continuations and generators. It is not necessarily thread-safe however, and your mileage will vary.

Yoav Glazner
  • 7,936
  • 1
  • 19
  • 36