I am writing a Python based application for which I am using socket programming.
I am following below approach.
Created 1 TCP/IP server, 1 controller TCP/IP client thread and 3 TCP/IP Clients threads.
I want the application to work like this. Whenever controller sends a message it gets broadcast to all 3 TCP/IP clients. Upon received message from controller the client threads perform some task and send data to the server.
Now server has to send this data to controller thread.
The communication part for of the clients and controller is working fine.
Only problem which I am facing is server is putting all the data received from clients to controller socket together.
I want server should put 1 Client thread's data on controller socket, Wait for that data to be picked up. Then place next thread's data.
So far I am using SOCK_STREAM for sockets.
Library:-
#!/usr/bin/python
import select, socket, sys, Queue, errno
usable_port_start = 40000
Internal_ip = "127.0.0.1"
class getTCPports(object):
def __init__(self,starting_port=usable_port_start,address=Internal_ip):
super(getTCPports, self).__init__()
self.IP_address = address
i = 1
delta = 0
while i <= 1:
delta += 2
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
try_port=starting_port + delta
s.bind((self.IP_address,try_port))
self.free_port=try_port
i+=1
except socket.error as e:
if e.errno == errno.EADDRINUSE:
print("Port" , try_port , "is already in use")
else:
# something else raised the socket.error exception
print(e)
s.close()
class IPCLib(getTCPports):
server_port = 0
controller_port = 0
client_map = {}
def __init__(self):
super(IPCLib,self).__init__()
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.IP_address, self.free_port))
self.inputs = [self.socket]
self.is_alive = True
self.broadcast_list = []
@classmethod
def modify_server_port(cls,port):
cls.server_port = port
@classmethod
def modify_client_port(cls,identity,port):
if 0 <= identity.find("CONTROLLER"):
cls.controller_port = port
elif 0 <= identity.find("CLIENT"):
cls.client_map[identity] = port
def start_TCP_server(self):
self.socket.setblocking(0)
self.socket.listen(10)
self.modify_server_port(self.free_port)
while self.is_alive:
inputready,outputready,exceptready = select.select(self.inputs,[],[])
for s in inputready: #check each socket that select() said has available data
if s == self.socket: #if select returns our server socket, there is a new
#remote socket trying to connect
client, address = s.accept()
self.inputs.append(client) #add it to the socket list so we can check it now
self.broadcast_list.append(client)
#print 'new client added%s'%str(address)
else:
# select has indicated that these sockets have data available to recv
data = s.recv(4096)
if data:
#print '%s Received From Client(on server)-> %s'%(data,s.getpeername()[1])
#Uncomment below to echo the recv'd data back
#to the sender... loopback!
if s.getpeername()[1]==self.controller_port:
self.broadcast(data)
else: #if sender is monitoring clients, send data to only controller
self.send_to_controller(data)
else:#if recv() returned NULL, that usually means the sender wants
#to close the socket.
s.close()
self.inputs.remove(s)
#if running is ever set to zero, we will call this
server.close()
def start_TCP_client(self,identity):
self.modify_client_port(identity,self.free_port)
self.socket.connect((self.IP_address,self.server_port))
def stop_TCP_client(self):
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
def broadcast(self,message):
for client in self.broadcast_list:
if client.getpeername()[1]!=self.controller_port:
try:
client.send(message)
except:
client.close()
# if the link is broken, we remove the client
remove(clients)
def send_to_controller(self,message):
for client in self.broadcast_list:
if client.getpeername()[1]==self.controller_port:
try:
client.send(message)
except:
client.close()
# if the link is broken, we remove the client
remove(clients)
def send_data(self,data):
self.socket.send(data)
def receive_data(self):
message = self.socket.recv(4096)
return message
Driver Program:-
#!/usr/bin/python
from IPCLib import *
import threading
import os
import time
def run_server():
i1=IPCLib()
print("Task assigned to thread: {}".format(threading.current_thread().name))
print("ID of process running task: {}".format(os.getpid()))
i1.start_TCP_server()
def run_controller(identity):
i1=IPCLib()
print("Task assigned to thread: {}".format(threading.current_thread().name))
print("ID of process running task: {}".format(os.getpid()))
i1.start_TCP_client(identity)
print("server port " , i1.server_port)
print("controller port", i1.controller_port)
print("bsc info", i1.client_map)
time.sleep(1)
while i1.is_alive:
i1.send_data("hello")
print"Next Clock"
sender_map={}
sender_list = []
sender_list = i1.client_map.values()
for sender in sender_list:
sender_map[sender] = False
i=1
#print any(sender_map.values())
while any(value == False for value in sender_map.values()):
print("Loop Iteration %s"%i)
data = i1.receive_data()
temp = data.split(",")
port = temp.pop(0)
sender_map[int(port)] = True
data = ",".join(temp)
print("Data %s received from port %s"%(data,port))
print sender_map
i+=1
print sender_list
time.sleep(1)
i1.stop_TCP_client()
def run_monitors(identity):
i1=IPCLib()
print("Task assigned to thread: {}".format(threading.current_thread().name))
print("ID of process running task: {}".format(os.getpid()))
i1.start_TCP_client(identity)
print("server port " , i1.server_port)
print("controller port", i1.controller_port)
print("bsc info", i1.client_map)
while i1.is_alive:
if i1.receive_data():
output = "%d"%i1.free_port
output = output + "," + "Hello"
i1.send_data(output)
i1.stop_TCP_client()
# creating thread
t1 = threading.Thread(target=run_server, name='server')
t3 = threading.Thread(target=run_monitors, name='Client1',args=("CLIENT-1",))
t4 = threading.Thread(target=run_monitors, name='Client2',args=("CLIENT-2",))
t5 = threading.Thread(target=run_monitors, name='Client3',args=("CLIENT-3",))
t6 = threading.Thread(target=run_monitors, name='Clinet4',args=("CLIENT-4",))
#make threads deamons
t1.daemon = True
t3.daemon = True
t4.daemon = True
t5.daemon = True
t6.daemon = True
# starting threads
try:
t1.start()
time.sleep(0.1)
t3.start()
time.sleep(0.1)
t4.start()
time.sleep(0.1)
t5.start()
time.sleep(0.1)
t6.start()
time.sleep(0.1)
run_controller("CONTROLLER")
except KeyboardInterrupt:
t1.is_alive = False
t3.is_alive = False
t4.is_alive = False
t5.is_alive = False
t6.is_alive = False
How can I force server to wait till the time there is already some data on the socket?