0

I am writing a Python based application for which I am using socket programming.

I am following below approach.

Created 1 TCP/IP server, 1 controller TCP/IP client thread and 3 TCP/IP Clients threads.

I want the application to work like this. Whenever controller sends a message it gets broadcast to all 3 TCP/IP clients. Upon received message from controller the client threads perform some task and send data to the server.

Now server has to send this data to controller thread.

The communication part for of the clients and controller is working fine.

Only problem which I am facing is server is putting all the data received from clients to controller socket together.

I want server should put 1 Client thread's data on controller socket, Wait for that data to be picked up. Then place next thread's data.

So far I am using SOCK_STREAM for sockets.

Library:-

#!/usr/bin/python
import select, socket, sys, Queue, errno
usable_port_start = 40000
Internal_ip = "127.0.0.1"

class getTCPports(object):
    def __init__(self,starting_port=usable_port_start,address=Internal_ip):
        super(getTCPports, self).__init__()
        self.IP_address = address
        i = 1
        delta = 0
        while i <= 1:
            delta += 2
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            try:
                try_port=starting_port + delta
                s.bind((self.IP_address,try_port))
                self.free_port=try_port
                i+=1
            except socket.error as e:
                if e.errno == errno.EADDRINUSE:
                    print("Port" , try_port , "is already in use")
                else:
                    # something else raised the socket.error exception
                    print(e)
            s.close()

class IPCLib(getTCPports):
    server_port = 0
    controller_port = 0
    client_map = {}
    def __init__(self):
        super(IPCLib,self).__init__()
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind((self.IP_address, self.free_port))
        self.inputs = [self.socket]
        self.is_alive = True
        self.broadcast_list = []
    @classmethod
    def modify_server_port(cls,port):
        cls.server_port = port
    @classmethod
    def modify_client_port(cls,identity,port):
        if 0 <= identity.find("CONTROLLER"):
            cls.controller_port = port
        elif 0 <= identity.find("CLIENT"):
            cls.client_map[identity] = port

    def start_TCP_server(self):
        self.socket.setblocking(0)
        self.socket.listen(10)
        self.modify_server_port(self.free_port)
        while self.is_alive: 
            inputready,outputready,exceptready = select.select(self.inputs,[],[]) 
            for s in inputready: #check each socket that select() said has available data
                if s == self.socket: #if select returns our server socket, there is a new 
                                #remote socket trying to connect
                    client, address = s.accept() 
                    self.inputs.append(client) #add it to the socket list so we can check it now
                    self.broadcast_list.append(client)
                    #print 'new client added%s'%str(address) 
                else: 
                    # select has indicated that these sockets have data available to recv
                    data = s.recv(4096) 
                    if data:
                        #print '%s Received From Client(on server)-> %s'%(data,s.getpeername()[1])
                        #Uncomment below to echo the recv'd data back 
                        #to the sender... loopback!
                        if s.getpeername()[1]==self.controller_port:
                            self.broadcast(data)
                        else: #if sender is monitoring clients, send data to only controller
                            self.send_to_controller(data)
                    else:#if recv() returned NULL, that usually means the sender wants
                         #to close the socket. 
                        s.close() 
                        self.inputs.remove(s) 
        #if running is ever set to zero, we will call this
        server.close()
    def start_TCP_client(self,identity):
        self.modify_client_port(identity,self.free_port)
        self.socket.connect((self.IP_address,self.server_port))
    def stop_TCP_client(self):
        self.socket.shutdown(socket.SHUT_RDWR)
        self.socket.close()
    def broadcast(self,message):
        for client in self.broadcast_list:
            if client.getpeername()[1]!=self.controller_port:
                try:
                    client.send(message)
                except:
                    client.close()
                    # if the link is broken, we remove the client
                    remove(clients)
    def send_to_controller(self,message):
        for client in self.broadcast_list:
            if client.getpeername()[1]==self.controller_port:
                try:
                    client.send(message)
                except:
                    client.close()
                    # if the link is broken, we remove the client
                    remove(clients)
    def send_data(self,data):
        self.socket.send(data)
    def receive_data(self):
        message = self.socket.recv(4096)
        return message

Driver Program:-

#!/usr/bin/python
from IPCLib import *
import threading
import os
import time

def run_server():
        i1=IPCLib()
        print("Task assigned to thread: {}".format(threading.current_thread().name))
        print("ID of process running task: {}".format(os.getpid()))
        i1.start_TCP_server()
def run_controller(identity):
        i1=IPCLib()
        print("Task assigned to thread: {}".format(threading.current_thread().name))
        print("ID of process running task: {}".format(os.getpid()))
        i1.start_TCP_client(identity)
        print("server port " , i1.server_port)
        print("controller port", i1.controller_port)
        print("bsc info", i1.client_map)
        time.sleep(1)
        while i1.is_alive:
            i1.send_data("hello")
            print"Next Clock"
            sender_map={}
            sender_list = []
            sender_list = i1.client_map.values()
            for sender in sender_list:
                sender_map[sender] = False
            i=1
            #print any(sender_map.values())
            while any(value == False for value in sender_map.values()):
                print("Loop Iteration %s"%i)
                data = i1.receive_data()
                temp = data.split(",")
                port = temp.pop(0)
                sender_map[int(port)] = True
                data = ",".join(temp)
                print("Data %s received from port %s"%(data,port))
                print sender_map
                i+=1
            print sender_list
            time.sleep(1)
        i1.stop_TCP_client()
def run_monitors(identity):
        i1=IPCLib()
        print("Task assigned to thread: {}".format(threading.current_thread().name))
        print("ID of process running task: {}".format(os.getpid()))
        i1.start_TCP_client(identity)
        print("server port " , i1.server_port)
        print("controller port", i1.controller_port)
        print("bsc info", i1.client_map)
        while i1.is_alive:
            if i1.receive_data():
                output = "%d"%i1.free_port
                output = output + "," + "Hello"
                i1.send_data(output)
        i1.stop_TCP_client()

# creating thread
t1 = threading.Thread(target=run_server, name='server')
t3 = threading.Thread(target=run_monitors, name='Client1',args=("CLIENT-1",))  
t4 = threading.Thread(target=run_monitors, name='Client2',args=("CLIENT-2",))  
t5 = threading.Thread(target=run_monitors, name='Client3',args=("CLIENT-3",))  
t6 = threading.Thread(target=run_monitors, name='Clinet4',args=("CLIENT-4",))  

#make threads deamons
t1.daemon = True
t3.daemon = True
t4.daemon = True
t5.daemon = True
t6.daemon = True

# starting threads
try:
        t1.start()
        time.sleep(0.1)
        t3.start()
        time.sleep(0.1)
        t4.start()
        time.sleep(0.1)
        t5.start()
        time.sleep(0.1)
        t6.start()
        time.sleep(0.1)
        run_controller("CONTROLLER")
except KeyboardInterrupt:
        t1.is_alive = False
        t3.is_alive = False
        t4.is_alive = False
        t5.is_alive = False
        t6.is_alive = False

How can I force server to wait till the time there is already some data on the socket?

halfer
  • 19,824
  • 17
  • 99
  • 186
Sourabh Jaiswal
  • 101
  • 2
  • 10
  • There is no simple way to do that in TCP. TCP delivers data as a stream and does not preserve message boundaries. You should frame the data yourself. See, for example, https://stackoverflow.com/a/17668009/1076479 – Gil Hamilton Apr 30 '18 at 16:36
  • Thanks for pointing me to the right guide. Will surely implement this approach. – Sourabh Jaiswal Apr 30 '18 at 17:51

0 Answers0