0

Im trying to implement multi threading (parallel processing) with python and using mutex threading. I have first process that check the Pressure Value and the modem update(in the code implemented with odom_callback and callback_modem functions), and second process that calls ROS SERVICES ( in the code implemented with ros_serice_server server and imu_client client functions). Here is the implementation code in python

#!/usr/bin/env python3

from __future__ import print_function
import rospy
import numpy as np
from os import system
import time
import threading
import Microcontroller_Manager_Serial as Serial
import IMU_Functions as IMU
import Motors_Functions as Motor
import Pressure_Functions as Pressure
from geometry_msgs.msg import Vector3
import Modem_Functions as Modem
import threading 
import time
import serial
import serial.tools.list_ports
from time import sleep
from std_msgs.msg import Float32
from std_msgs.msg import String
from demo_teleop.srv import ImuValue,ImuValueResponse

P0 = 1.01325 #Default Pressure 
mutex = threading.Lock()
Communication_Mode_ = 0
pub_pressure = rospy.Publisher('depth',Vector3,queue_size=1)
pub_modem = rospy.Publisher('modem_data',Float32,queue_size=1)

def handle_ros_services(req):
    mutex.acquire(blocking=True)
    print("Server Read Data:")
    global T0
    data_received = Pressure.Pressure_Get_Final_Values(1,1)
    #print("Server Read Data:")
    T0 = (np.int16((data_received[6]<<24) | (data_received[7]<<16) | (data_received[8]<<8) | (data_received[9])))/10000
    T=T0
    temperature = T
    current_x_orientation_s = temperature
    print("Returning Service Temperature Data", current_x_orientation_s)
    return ImuValueResponse(current_x_orientation_s, True)
    mutex.release()

def ros_serice_server():
    s = rospy.Service('imu_value', ImuValue, handle_ros_services)
    print("Ready to get_value")

def odom_callback():
    # reentrang processing
    mutex.acquire(blocking=True)
    # work serial port here, e.g. send msg to serial port
    global P0
    data_received = Pressure.Pressure_Get_Final_Values(1,1)
    #P1 = (np.int16((data_received_pressure[6]<<24) | (data_received_pressure[7]<<16) | (data_received_pressure[8]<<8) | (data_received_pressure[9])))/10000
    P1 = (np.int16((data_received[6]<<24) | (data_received[7]<<16) | (data_received[8]<<8) | (data_received[9])))/10000
    #P0 = (np.int16((data_received_pressure[6]<<24) | (data_received_pressure[7]<<16) | (data_received_pressure[8]<<8) | (data_received_pressure[9])))/10000
    P0 = (np.int16((data_received[6]<<24) | (data_received[7]<<16) | (data_received[8]<<8) | (data_received[9])))/10000
    P = P0 # Relative Measured Pressure
    feedback =Vector3()
    feedback.x = 0    #Angular Velocity
    feedback.y = 0
    feedback.z = P/9.81 #Depth
    pressure = feedback.z
    print("Pressure : ", pressure)
    pub_pressure.publish(feedback)
    # reentrant processing
    mutex.release()
    
def callback_modem(event):
    # reentrant processing
    mutex.acquire(blocking=True)
    # work serial port here, e.g. check for incoming data
    event = Serial.Serial_Port_Receive_Data(20,0.2)
    if (event == 1): # Received data from acoustic modem
        modem_data= event
        pub_modem.publish(modem_data)
        print("received ")
    else:
        print("not received...... ")
    mutex.release()
 
if __name__ == '__main__':
    # initialize serial port here
    Serial.Serial_Port_Standard()
    rospy.init_node('imu_value')
    ros_serice_server()
    rospy.Timer(rospy.Duration(1), callback_modem) 
    while not rospy.is_shutdown():
        try:
            odom_callback()
        except:
            print('pass')

And the client node

#!/usr/bin/env python3

from __future__ import print_function
import rospy
import sys
import numpy as np
from os import system
import threading
import Microcontroller_Manager_Serial as Serial
import IMU_Functions as IMU
import Motors_Functions as Motor
import Pressure_Functions as Pressure
from geometry_msgs.msg import Vector3
import Modem_Functions as Modem
import time
import serial
import serial.tools.list_ports

from time import sleep
from std_msgs.msg import Float32
from std_msgs.msg import String
from demo_teleop.srv import *

mutex = threading.Lock()
Communication_Mode_ = 0
pub_modem = rospy.Publisher('modem_data',Float32,queue_size=1)

def imu_client():
    mutex.acquire(blocking=True)
    rospy.wait_for_service('imu_value')
    imu_value = rospy.ServiceProxy('imu_value', ImuValue)
    print("Request call send")
    resp1 = imu_value(0.05)
    return resp1.current_x_orientation_s
    mutex.release()

if __name__ == "__main__":
    rospy.init_node('client_node_f')
    while not rospy.is_shutdown():
        try:
            print("entering client")
            value = imu_client()
            print(value)
            time.sleep(1)
        except:
            print('pass')

So the output is following. The output of the first process with the ROS Services Server is

Pressure :  0.10602446483180428
Server Read Data:
Returning Service Temperature Data 1.0401

And then after calling the client I got

entering client
Request call send
1.0401
entering client

The problem is that after calling the ROS SERVICE client node the process stop so doesn't continue with the first process (Pressure value and modem update) . The ROS SERVICES process should be call only on demand and should HALT the first process (Pressure and modem) and then is should resume with the work. So, do I need to implement SEMAPHORES for the ROS SERVICES call ? If yes how it should be in the code. So I do need kind of synchronization , right?Please any help?

Macedon971
  • 303
  • 2
  • 11
  • Based on this application I don't believe there is really any reason that you should be uses locks at all here. – BTables Mar 29 '22 at 20:40

1 Answers1

3

Your problem is:

def handle_ros_services(req):
    mutex.acquire(blocking=True)
    ...
    return ImuValueResponse(current_x_orientation_s, True)
    mutex.release()

Because of the return statement, the release is never executed.

You need at the end:

    value = ImValueResponse(...)
    mutex.release()
    return value

Even better would be to use your mutex as part of a with statement:

with mutex:
    do anything you want, knowing that the lock will be released
    at the end, even if you return or throw an exception.
Frank Yellin
  • 9,127
  • 1
  • 12
  • 22
  • THANKS a lot. Let me check if that works. But, I though should use semaphores like in this case https://stackoverflow.com/questions/31508574/semaphores-on-python . What is the difference? – Macedon971 Mar 29 '22 at 07:02
  • Updated my comment slightly. You want mutual exclusion. Seems like you want a lock. A semaphore is more useful when you have multiple identical resources (disk drives, spaces on a bus, etc.) then when you have a single resource that only one person can use at once. I don't completely understand your code, so I'm not sure if you should be using the same Mutex in all your functions, or if each should get its own mutex. – Frank Yellin Mar 29 '22 at 07:33
  • Sorry. Actually the first way you suggested is what I want. Semaphores is not that what I need. So thanks a lot . It works perfectly now – Macedon971 Mar 29 '22 at 09:07
  • what do you mean by if each should get its own mutex? – Macedon971 Mar 29 '22 at 14:22
  • Right now, you've got a single mutex that's protecting `imu_client`, `callback_modem`, and `odom_callback`. Not only will you never have two different instances of `imu_client` running at the same time, but you will never have `imu_client` and `callback_modem` running at the same time, because they use the same mutex. Was this your intent? Do the three functions need protection from each other? Or could each function use a different mutex? – Frank Yellin Mar 29 '22 at 17:01
  • I need to check exactly if these three functions need protection from each other. What happened if each function use a different mutex and how to implement that if need it? – Macedon971 Mar 29 '22 at 18:57
  • You currently have one mutex named `mutex`. Instead you'd create three mutexes, and each function would use acquire and release a different mutex. – Frank Yellin Mar 29 '22 at 21:36
  • I added this as a comment above, but just something to note. While this answers the question of why things aren't working, no locks are actually needed at all in OP's code. – BTables Mar 29 '22 at 23:17