2

I am using asyncio on Windows and have a reference to a transport object of a named pipe:

class DataPipeHandler(asyncio.Protocol):
    def connection_made(self, trans):
        self.trans = trans # <<== this is a reference to a transport object of type _ProactorDuplexPipeTransport

loop = asyncio.get_event_loop()
server = loop.start_serving_pipe(lambda: DataPipeHandler(), r'\\.\pipe\test-pipe')

now I would like to use self.trans to synchronously write and then read data from the named pipe. How can I do this?

Its important for me to do this synchronously as this is kind of RPC call I am doing using the pipe (writing something and getting back response quickly) and I do want to block all the other activities of the even loop until this "pipe RPC call" returns. If I don't block all other activities of the event loop until this RPC call is done I will have unwanted side effects as the loop will continue to process other events I don't want it to process yet.

What I want to do (the write to pipe and then read) is very similar to someone who is calling urllib2.urlopen(urllib2.Request('http://www.google.com')).read() from the event loop thread - here too all the event loop activities will be blocked until we get response from a remote http server.

I know that I can call self.trans.write(data) but this does not write the data synchronously (as I understand it does not block)

Thanks.

EDIT: Following the first comment let me add:

I understand I should never block event loop and that I can use synchronization primitives for accomplishing what I want. But lets say you have event loop that is doing 10 different activities in parallel and one of them is doing some kind of RPC (as describe above) and all other 9 activities should be blocked until this RPC is done. so I have 2 options:

(1) add synchronization primitives (lock/semaphore/condition) as you suggested to all these 10 activities for synchronizing them.

(2) implement this RPC by blocking write and then blocking read from/to the pipe. (assuming I trust the other side of the pipe)

I know this is not the usual way of using event loops but in my specific case I think (2) is better. (simpler logic)

user3402399
  • 31
  • 1
  • 3
  • 3
    I think you are doing something wrong. You should never block event loop. If you need preventing some operations from execution please use synchronization primitives like `asyncio.Lock`, `asyncio.Semapthore`, `asyncio.Condition` etc. – Andrew Svetlov Jul 02 '15 at 10:03
  • Lets say you have 11th activity which should never be blocked. With synchronization primitives you still may block your 9 activities and keep 10th and 11th running. – Andrew Svetlov Jul 07 '15 at 10:55
  • I understand using synchronization primitives gives me more flexibility in terms of controlling which activity will be blocked and which will not be blocked. But in my case I have the fixed situation I described above. (need to block all other event loop activities when doing the RPC) – user3402399 Jul 07 '15 at 11:01
  • Ok. Extract socket from transport instance (`sock = transport.get_extra_info('socket')`) and do what you want via conventional synchronous calls. – Andrew Svetlov Jul 07 '15 at 13:04
  • For pipe it's actually `pipe = transport.get_extra_info('pipe')` call. – Andrew Svetlov Jul 07 '15 at 13:06
  • Thanks ! I'll give it a try – user3402399 Jul 07 '15 at 14:37

2 Answers2

1

I think you have to use threading synchronization primitives to make sure the whole loop (current thread) is blocked. I think the best offer is using threading queue and join features.

Kamyar
  • 2,494
  • 2
  • 22
  • 33
0

@Andrew Svetlov

As you said, asyncio Synchronization Primitives is the normal choice.

But how to wrap and expose the event loop coroutines as synchronization function APIs is my problem. I don't want API consumer to handle asyncio conception


Update: Found the cool answer: how can I package a coroutine as normal function in event loop?.

Community
  • 1
  • 1
micfan
  • 800
  • 8
  • 12