0

Task is to automate pairing and connection process between Arduino and Raspberry Pi over Bluetooth using D-BUS API based on python script.

The Bluetooth module connected to Arduino is: Grove - Serial Bluetooth v3.0.

I am able to automate pairing process. The pairing script does the following in order:

  1. It looks for an Bluetooth module named Slave by creating adapter object and using StartDiscovery method.(naming is done in the Arduino).
  2. Registers Bluetooth agent.
  3. Creates device object and pairs via Pair method if the device is not already paired.

The part of the code that does above steps given below:

register_agent()
start_discovery() 
time.sleep(10) 
for i in range(len(address_list)):
    new_dbus_device = get_device(address_list[i])
    dev_path = new_dbus_device.object_path
    device_properties = dbus.Interface(new_dbus_device, "org.freedesktop.DBus.Properties")
    pair_status = device_properties.Get("org.bluez.Device1", "Paired")
    if not pair_status:
        new_dbus_device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000)

Here is what register_agent() does as requested:

def register_agent():    
    agent = Agent(bus, path)
    capability = "NoInputNoOutput"
    obj = bus.get_object(BUS_NAME, "/org/bluez");
    manager = dbus.Interface(obj, "org.bluez.AgentManager1")
    manager.RegisterAgent(path, capability)

However when I try to call Connect method as documented in device-api of Bluez, it always fails. I have created a custom serial port profile and tried ConnectProfile method but it failed again.

The communication over Bluetooth works if I use deprecated rfcomm tool, or it works if I use python socket module. However I want to avoid using rfcomm due to being deprecated. Regarding using python socket library, the connection works only in rfcomm channel 1, others channels produce Connection Refused error.

Adding MRE, to clarify the question better:

import dbus
import dbus.service
import dbus.mainloop.glib
import sys
import time
import subprocess

from bluezutils import *
from bluetooth import *
from gi.repository import GObject, GLib
from dbus.mainloop.glib import DBusGMainLoop

DBusGMainLoop(set_as_default=True) 

path = "/test/agent"
AGENT_INTERFACE = 'org.bluez.Agent1'
BUS_NAME = 'org.bluez'
bus = dbus.SystemBus() 

device_obj = None
dev_path = None

def set_trusted(path2):
    props = dbus.Interface(bus.get_object("org.bluez", path2),
                    "org.freedesktop.DBus.Properties")
    props.Set("org.bluez.Device1", "Trusted", True)

class Rejected(dbus.DBusException):
    _dbus_error_name = "org.bluez.Error.Rejected"
    
class Agent(dbus.service.Object):
    exit_on_release = True

    def set_exit_on_release(self, exit_on_release):
        self.exit_on_release = exit_on_release

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="", out_signature="")
    def Release(self):
        print("Release")
        if self.exit_on_release:
            mainloop.quit()

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="os", out_signature="")
    def AuthorizeService(self, device, uuid):
        print("AuthorizeService (%s, %s)" % (device, uuid))
        return 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="o", out_signature="s")
    def RequestPinCode(self, device):
        set_trusted(device)
        return "0000" 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="o", out_signature="u")
    def RequestPasskey(self, device): 
        set_trusted(device)
        return dbus.UInt32("0000") 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="ou", out_signature="")
    def RequestConfirmation(self, device, passkey):
        set_trusted(device)
        return 
        
    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="o", out_signature="")
    def RequestAuthorization(self, device):
        return 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="", out_signature="")
    def Cancel(self):
        print("Cancel")

def pair_reply():
    print("Device paired and trusted")
    set_trusted(dev_path) 
    
def pair_error(error):
    err_name = error.get_dbus_name()
    if err_name == "org.freedesktop.DBus.Error.NoReply" and device_obj:
        print("Timed out. Cancelling pairing")
        device_obj.CancelPairing()
    else:
        print("Creating device failed: %s" % (error))
    mainloop.quit() 
    
def register_agent():    
    agent = Agent(bus, path)
    capability = "NoInputNoOutput"
    obj = bus.get_object(BUS_NAME, "/org/bluez");
    manager = dbus.Interface(obj, "org.bluez.AgentManager1")
    manager.RegisterAgent(path, capability)
    
def start_discovery():
    global pi_adapter
    pi_adapter = find_adapter() 
    scan_filter = dict({"DuplicateData": False}) 
    pi_adapter.SetDiscoveryFilter(scan_filter)
    pi_adapter.StartDiscovery()
    
def stop_discovery():
    pi_adapter.StopDiscovery()
    
def get_device(dev_str):
    # use [Service] and [Object path]:
    device_proxy_object = bus.get_object("org.bluez","/org/bluez/hci0/dev_"+dev_str)
    # use [Interface]:
    device1 = dbus.Interface(device_proxy_object,"org.bluez.Device1")
    return device1

def char_changer(text):
    text = text.replace(':', r'_')
    return text

def slave_finder(device_name):
    
    global sublist_normal
    sublist_normal = []
    sublist= []
    address = []
    edited_address = None
    sub = subprocess.run(["hcitool scan"], text = True, shell = True, capture_output=True)
    print(sub.stdout) #string type
    sublist = sub.stdout.split()
    for i in range(len(sublist)):
        if sublist[i] == device_name:
            print(sublist[i-1])
            sublist_normal.append(sublist[i-1])
            edited_address = char_changer(sublist[i-1])
            address.append(edited_address)
    return address
    
def remove_all_paired():
    for i in range(len(sublist_normal)):
        sub = subprocess.run(["bluetoothctl remove " + sublist_normal[i]], text = True, shell = True, capture_output=True)
        time.sleep(1)
    
    
if __name__ == '__main__':
    
    
    pair_status = None
    address_list = slave_finder('Slave') #Arduino bluetooth module named as "Slave", here we are finding it.
    time.sleep(2)
    remove_all_paired() #rfcomm requires repairing after release
    print(sublist_normal)
    if address_list: 
        register_agent()
        start_discovery() 
        time.sleep(10) 
        for i in range(len(address_list)):
            new_dbus_device = get_device(address_list[i])
            dev_path = new_dbus_device.object_path
            device_properties = dbus.Interface(new_dbus_device, "org.freedesktop.DBus.Properties")
            pair_status = device_properties.Get("org.bluez.Device1", "Paired")
            if not pair_status:
                new_dbus_device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000)
    
    
    mainloop = GLib.MainLoop()
    mainloop.run()

sudo btmon output:

Bluetooth monitor ver 5.50
= Note: Linux version 5.4.83-v7l+ (armv7l)                             0.832473
= Note: Bluetooth subsystem version 2.22                               0.832478
= New Index: DC:A6:32:58:FE:13 (Primary,UART,hci0)              [hci0] 0.832481
= Open Index: DC:A6:32:58:FE:13                                 [hci0] 0.832484
= Index Info: DC:A6:32:5.. (Cypress Semiconductor Corporation)  [hci0] 0.832487
@ MGMT Open: bluetoothd (privileged) version 1.14             {0x0001} 0.832490
@ MGMT Open: btmon (privileged) version 1.14                  {0x0002} 0.832540

So the question is why Connect and ConnectProfile methods are always failing, what do I need to do establish bluetooth communication based on D-BUS API between Arduino and Raspberry Pi?

Mr. Panda
  • 485
  • 3
  • 14
  • You have not detailed what `register_agent()` is doing. I am assuming your are registering your own agent; something along the lines of [simple-agent](https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test/simple-agent)? – ukBaz Jan 27 '21 at 14:38
  • Edited the question. Yes exactly, it does what you said. – Mr. Panda Jan 27 '21 at 14:57
  • Thanks for updating the question. And what is in the `Agent` code? What is in `pair_reply` and `pair_error`? What outputs are you getting in the terminal and from `sudo btmon`? Can I draw your attention to [How to create a Minimal, Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example) – ukBaz Jan 27 '21 at 16:09
  • I have added them as well. getdevice() is a method from [bluezutils](https://github.com/bluez/bluez/blob/master/test/bluezutils.py) module. So when I call **new_dbus_device.Connect()** it generates an error. – Mr. Panda Jan 27 '21 at 19:13

1 Answers1

0

I think you had answered your own question. The reason that a Connect doesn't work is because you have no Serial Port Profile (SPP) running on the Raspberry Pi. If you have btmon running you will see that the client disconnects because there is no matching profile with what the Arduino is offering.

The port number used in the Python Socket connection needs to match the port number on the Arduino Bluetooth module. This is probably why only 1 is working.

For reference I tested this with a Raspberry Pi and HC-06 I had laying around. I removed all the scanning code to try to get to the minimum runnable code. Here is the code I used:

import socket
from time import sleep
import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib

BUS_NAME = 'org.bluez'
AGENT_IFACE = 'org.bluez.Agent1'
AGNT_MNGR_IFACE = 'org.bluez.AgentManager1'
ADAPTER_IFACE = 'org.bluez.Adapter1'
AGENT_PATH = '/ukBaz/bluezero/agent'
AGNT_MNGR_PATH = '/org/bluez'
DEVICE_IFACE = 'org.bluez.Device1'
CAPABILITY = 'KeyboardDisplay'
my_adapter_address = '11:22:33:44:55:66'
my_device_path = '/org/bluez/hci0/dev_00_00_12_34_56_78'
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()


class Agent(dbus.service.Object):

    @dbus.service.method(AGENT_IFACE,
                         in_signature='o', out_signature='s')
    def RequestPinCode(self, device):
        print(f'RequestPinCode {device}')
        return '0000'


class Device:
    def __init__(self, device_path):
        dev_obj = bus.get_object(BUS_NAME, device_path)
        self.methods = dbus.Interface(dev_obj, DEVICE_IFACE)
        self.props = dbus.Interface(dev_obj, dbus.PROPERTIES_IFACE)
        self._port = 1
        self._client_sock = socket.socket(socket.AF_BLUETOOTH,
                                          socket.SOCK_STREAM,
                                          socket.BTPROTO_RFCOMM)

    def connect(self):
        # self.methods.Connect()
        self._client_sock.bind((my_adapter_address, self._port))
        self._client_sock.connect((self.address, self._port))

    def disconnect(self):
        self.methods.Disconnect()

    def pair(self):
        self.methods.Pair(reply_handler=self._pair_reply,
                          error_handler=self._pair_error)

    def _pair_reply(self):
        print(f'Device trusted={self.trusted}, connected={self.connected}')
        self.trusted = True
        print(f'Device trusted={self.trusted}, connected={self.connected}')
        while self.connected:
            sleep(0.5)
        self.connect()
        print('Successfully paired and connected')

    def _pair_error(self, error):
        err_name = error.get_dbus_name()
        print(f'Creating device failed: {err_name}')

    @property
    def trusted(self):
        return bool(self.props.Get(DEVICE_IFACE, 'Trusted'))

    @trusted.setter
    def trusted(self, value):
        self.props.Set(DEVICE_IFACE, 'Trusted', bool(value))

    @property
    def paired(self):
        return bool(self.props.Get(DEVICE_IFACE, 'Paired'))

    @property
    def connected(self):
        return bool(self.props.Get(DEVICE_IFACE, 'Connected'))

    @property
    def address(self):
        return str(self.props.Get(DEVICE_IFACE, 'Address'))


if __name__ == '__main__':
    agent = Agent(bus, AGENT_PATH)

    agnt_mngr = dbus.Interface(bus.get_object(BUS_NAME, AGNT_MNGR_PATH),
                               AGNT_MNGR_IFACE)
    agnt_mngr.RegisterAgent(AGENT_PATH, CAPABILITY)

    device = Device(my_device_path)
    if device.paired:
        device.connect()
    else:
        device.pair()


    mainloop = GLib.MainLoop()
    try:
        mainloop.run()
    except KeyboardInterrupt:
        agnt_mngr.UnregisterAgent(AGENT_PATH)
        mainloop.quit()

ukBaz
  • 6,985
  • 2
  • 8
  • 31
  • In the test code, you are using python socket modules' **connect** and **bind** methods, which I have mentioned as working fine. Sorry I did not mention but, I have edited the service file as mentioned [here](https://www.raspberrypi.org/forums/viewtopic.php?p=947185#p947185), so there is SPP already, which I can also confirm with the output of `sdptool browse local`. What I am curious is why the **Connect** method of [device-api](https://github.com/bluez/bluez/blob/master/doc/device-api.txt) is failing. – Mr. Panda Jan 28 '21 at 10:18
  • Also, regarding rfcomm port, here is the [spreadsheet](https://files.seeedstudio.com/wiki/Bluetooth_Shield_V2/res/Bluetooth_en.pdf) of the module I have. I don't see any AT commands that let me configure the **rfcomm** port. – Mr. Panda Jan 28 '21 at 10:23
  • The `-C` and `sdptool` are to bypass the D-Bus daemon by using the old deprecated API. I have had the Profile working for a Server but I have never got it working for a client. Doesn't seem to be worth the effort when socket library seems to do everything that is needed. Sounds like my answer didn't add anything you didn't know already – ukBaz Jan 28 '21 at 11:14
  • It definitely added, now I know it does not worth the effort, so I will go for **socket** as well. Did you check the spreadsheet? I am still trying to find a way to configure the **rfcomm** port from Arduino by sending the required AT command. – Mr. Panda Jan 28 '21 at 11:21
  • I had a quick scan of the datasheet and there was nothing obvious. There was a link to another list of Bluetooth commands but my firewall wouldn't let me download it. – ukBaz Jan 28 '21 at 11:37
  • That link for BLE AT commands is broken. Thank you, I will contact the module's providers regarding this. – Mr. Panda Jan 28 '21 at 11:43