0

I need to implement a server that receives a request specifying a set of topics via control channel (req-rep), and then as response sends a URL pointing to a publisher socket that will be opened for this specific client or a rejection message (because of insufficient privileges).

I managed to implement a version that can handle only one client at the time (with two endless loops), but I do not know which pattern to use to handle multiple clients concurrently.

It's important for me that the sockets for different clients stay separate.

Here's simplified code:

import zmq

context = zmq.Context()

upstream_port = 10000
upstream_host = 'localhost'
control_channel_port = 11000
upstream_addr = 'tcp://{}:{}'.format(upstream_host, upstream_port)

def should_grant(request):
    '''
    Permission checking - irrelevant to the question
    '''
    return True


def bind_downstream():
    downstream = context.socket(zmq.PUB)
    addr = 'tcp://*'
    port = downstream.bind_to_random_port(addr)
    return downstream, port


def bind_control_channel():
    control_channel_sock = context.socket(zmq.REP)
    control_channel_sock.bind('tcp://*:{}'.format(control_channel_port))
    return control_channel_sock


def connect_upstream(topics):
    raw_data = context.socket(zmq.SUB)
    for t in topics:
        raw_data.setsockopt_unicode(zmq.SUBSCRIBE, unicode(t))
    raw_data.connect(upstream_addr)
    return raw_data


if __name__ == '__main__':
    print("Binding control channel socket on {}".format('tcp://*:{}'.format(control_channel_port)))
    control_channel = bind_control_channel()
    while True:
        request = control_channel.recv_json()
        print("Received request {}".format(request))
        if should_grant(request):
            (downstream_sock, downstream_port) = bind_downstream()
            print("Downstream socket open on {}".format('tcp://*:{}'.format(downstream_port)))

            print("Connecting to upstream on {}".format(upstream_addr))
            upstream_sock = connect_upstream(request['topics'])

            control_channel.send_json({'status': 'ok', 'port': downstream_port})
            while True:
                parts = upstream_sock.recv_multipart() # Simple forwarding
                downstream_sock.send_multipart(parts)
        else:
            control_channel.send_json({'status': 'rejected'})
Patryk Koryzna
  • 475
  • 4
  • 13

1 Answers1

1

The correct way to do this would be to use threads.

Your main program or thread would handle the control channel loop. As soon as a connection appears, you would create the upstream and downstream sockets but handle the actual transfer in a thread. I am not sure if the code below works as I do not have a client that would work with it, but give it a go and see what happens. You will get the idea nevertheless.

from threading import Thread
....
....
class ClientManager(Thread):
    def __init__(self, ups, downs):
        super(ClientManager, self).__init__(self)
        self.upstream_socket = ups
        self.downstream_socket = downs

    def run(self):
        while True:
            _parts = self.upstream_socket.recv_multipart()
            self.downstream_socket.send_multipart(_parts)

if __name__ == '__main__':
    print("Binding control channel socket on {}".format('tcp://*:{}'.format(control_channel_port)))
    control_channel = bind_control_channel()
    while True:
        request = control_channel.recv_json()
        print("Received request {}".format(request))
        if should_grant(request):
            (downstream_sock, downstream_port) = bind_downstream()
            print("Downstream socket open on {}".format('tcp://*:{}'.format(downstream_port)))

            print("Connecting to upstream on {}".format(upstream_addr))
            upstream_sock = connect_upstream(request['topics'])

            control_channel.send_json({'status': 'ok', 'port': downstream_port})
            _nct = ClientManager(upstream_sock, downstream_sock)
            _nct.daemon = True
            _nct.start()
    else:
        control_channel.send_json({'status': 'rejected'})
Hannu
  • 11,685
  • 4
  • 35
  • 51
  • This didn't work as-is - failed on assert saying that "group must be None"; had to use https://stackoverflow.com/questions/660961/overriding-python-threading-thread-run but otherwise this seems to work all right! – Patryk Koryzna Oct 24 '17 at 16:09
  • It does the same thing. I prefer subclassing Thread - but it does not make any difference in your case. At least the problem is solved now. – Hannu Oct 25 '17 at 09:50
  • With all due respect, the sketched approach is poor / wrong in two directions - **First :** ZeroMQ explicitly expresses, that its design is knowingly not thread-safe and **`Socket()`**-instance's AccessPoints ought never be passed to some other thread. **+ Second :** the python GIL-stepped thread-pool interleaved execution does not create any better performance than a monolythic pure-`[SERIAL]` code execution ( which could remain also avoid a need for "***ill-shared***" `Socket()`-instances ). You might want to revise the approach, so as to avoid the design to violate the known best practices. – user3666197 Apr 11 '18 at 19:03