summaryrefslogtreecommitdiff
path: root/bin
diff options
authorYung Shen <yung.shen@canonical.com>2016-04-21 15:32:21 +0000
committerSylvain Pineau <>2016-04-21 15:32:21 +0000
commite3ab30321c5559abb9ccd9f62e41ada71637001c (patch)
tree14dff51b08dc6744e79051f74ec7cb3fdafc2e1b /bin
parentefa600740585db92f856a4a644c055c611dc7736 (diff)
parent360891070e424f9615496884d32df08838357f3d (diff)
"automatic merge of lp:~kaxing/checkbox/bt4-bluez5/ by tarmac [r=sylvain-pineau][bug=][author=kaxing]"
Diffstat (limited to 'bin')
-rwxr-xr-xbin/bt_connect135
-rw-r--r--bin/bt_helper.py300
2 files changed, 435 insertions, 0 deletions
diff --git a/bin/bt_connect b/bin/bt_connect
new file mode 100755
index 0000000..0ef5169
--- /dev/null
+++ b/bin/bt_connect
@@ -0,0 +1,135 @@
+#!/usr/bin/env python3
+#
+# This file is part of Checkbox.
+#
+# Copyright 2016 Canonical Ltd.
+#
+# Authors:
+# Po-Hsu Lin <po-hsu.lin@canonical.com>
+# Yung Shen <yung.shen@canonical.com>
+#
+# Checkbox is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# Checkbox is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
+
+import sys
+import time
+
+import bt_helper
+
+from argparse import ArgumentParser
+
+
+def unpair_all(devices, manager):
+ """ Unpairing paired devices and scanning again for rerun jobs."""
+ for dev in devices:
+ try:
+ print("INFO: Unpairing", dev)
+ dev.unpair()
+ except bt_helper.BtException as exc:
+ print("Warning: Unpairing failed", exc)
+ else:
+ # print(flush=True) to bypass plainbox output buffer,
+ # see LP: #1569808 for more details.
+ print("Please reset the device to pairing mode in 13 seconds",
+ flush=True)
+ time.sleep(13)
+ print("INFO: Re-scaning for devices in pairing mode", flush=True)
+ manager.scan()
+
+
+def main():
+ """Add argument parser here and do most of the job."""
+ parser = ArgumentParser(description=("Bluetooth auto paring and connect. "
+ "Please select one option."))
+ group = parser.add_mutually_exclusive_group(required=True)
+ group.add_argument("--mac", type=str,
+ help="Pair with a given MAC, not using scan result,")
+ group.add_argument("--mouse", action="store_const",
+ const="input-mouse", dest="target",
+ help="List and pair with mouse devices")
+ group.add_argument("--keyboard", action="store_const",
+ const="input-keyboard", dest="target",
+ help="List and pair with keyboard devices")
+ args = parser.parse_args()
+
+ manager = bt_helper.BtManager()
+ # Power on bluetooth adapter and scanning devices in advance.
+ manager.ensure_adapters_powered()
+ manager.scan()
+
+ if args.mac:
+ # TODO check MAC format
+ print("INFO: Trying to pair with {}".format(args.mac))
+ device = list(manager.get_bt_devices(filters={'Address': args.mac}))
+ paired_device = list(manager.get_bt_devices(
+ filters={'Address': args.mac, 'Paired': True}))
+ if not device:
+ print("ERROR: No pairable device found, terminating")
+ return 1
+
+ unpair_all(paired_device, manager)
+
+ for dev in device:
+ try:
+ dev.pair()
+ except bt_helper.BtException as exc:
+ print("ERROR: Unable to pair: ", exc)
+ return 1
+ else:
+ print("INFO: Device paired")
+ return 0
+ else:
+ print("INFO: Listing targeting devices")
+ # Listing device based on RSSI
+ paired_targets = list(manager.get_bt_devices(category=bt_helper.BT_ANY,
+ filters={'Paired': True, 'Icon': args.target}))
+ if not paired_targets:
+ print("INFO: No paired targeting devices found")
+ manager.scan()
+ else:
+ unpair_all(paired_targets, manager)
+
+ target_devices = sorted(manager.get_bt_devices(
+ category=bt_helper.BT_ANY, filters={
+ 'Paired': False, 'Icon': args.target}),
+ key=lambda x: int(x.rssi or -255), reverse=True)
+ if not target_devices:
+ print("ERROR: No target devices found, terminating")
+ return 1
+ print("INFO: Detected devices (sorted by RSSI; highest first).")
+ # let's assing numbers to devices
+ devices = dict(enumerate(target_devices, 1))
+ for num, dev in devices.items():
+ print("{}. {} (RSSI: {})".format(num, dev, dev.rssi))
+ chosen = False
+ while not chosen:
+ print("Which one would you like to connect to? (0 to exit)")
+ num = input()
+ # TODO: enter as default to 1st device
+ if num == '0':
+ return 1
+ chosen = num.isnumeric() and int(num) in devices.keys()
+ print("INFO: {} chosen.".format(devices[int(num)]))
+ print("INFO: Pairing selected device..")
+ try:
+ devices[int(num)].pair()
+ except bt_helper.BtException as exc:
+ print("ERROR: something wrong: ", exc)
+ return 1
+ else:
+ print("Paired successfully.")
+ return 0
+ # capture all other silence failures
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/bin/bt_helper.py b/bin/bt_helper.py
new file mode 100644
index 0000000..91f879a
--- /dev/null
+++ b/bin/bt_helper.py
@@ -0,0 +1,300 @@
+# Copyright 2016 Canonical Ltd.
+# Written by:
+# Maciej Kisielewski <maciej.kisielewski@canonical.com>
+#
+# This is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# This file is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this file. If not, see <http://www.gnu.org/licenses/>.
+"""
+This module provides a set of abstractions to ease the process of automating
+typical Bluetooth task like scanning for devices and pairing with them.
+
+It talks with BlueZ stack using dbus.
+"""
+import logging
+
+import dbus
+import dbus.service
+import dbus.mainloop.glib
+from gi.repository import GObject
+
+logger = logging.getLogger(__file__)
+logger.addHandler(logging.StreamHandler())
+
+IFACE = 'org.bluez.Adapter1'
+ADAPTER_IFACE = 'org.bluez.Adapter1'
+DEVICE_IFACE = 'org.bluez.Device1'
+AGENT_IFACE = 'org.bluez.Agent1'
+
+dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+# To get additional Bluetoot CoDs, check
+# https://www.bluetooth.com/specifications/assigned-numbers/baseband
+BT_ANY = 0
+BT_KEYBOARD = int('0x2540', 16)
+
+
+class BtException(Exception):
+ pass
+
+
+class BtManager:
+ """ Main point of contact with dbus factoring bt objects. """
+ def __init__(self, verbose=False):
+ if verbose:
+ logger.setLevel(logging.DEBUG)
+ self._bus = dbus.SystemBus()
+ self._bt_root = self._bus.get_object('org.bluez', '/')
+ self._manager = dbus.Interface(
+ self._bt_root, 'org.freedesktop.DBus.ObjectManager')
+ self._main_loop = GObject.MainLoop()
+ self._register_agent()
+
+ def _register_agent(self):
+ path = "/bt_helper/agent"
+ BtAgent(self._bus, path)
+ obj = self._bus.get_object('org.bluez', "/org/bluez")
+ agent_manager = dbus.Interface(obj, "org.bluez.AgentManager1")
+ agent_manager.RegisterAgent(path, 'NoInputNoOutput')
+ logger.info("Agent registered")
+
+ def _get_objects_by_iface(self, iface_name):
+ for path, ifaces in self._manager.GetManagedObjects().items():
+ if ifaces.get(iface_name):
+ yield self._bus.get_object('org.bluez', path)
+
+ def get_bt_adapters(self):
+ """Yield BtAdapter objects for each BT adapter found."""
+ for adapter in self._get_objects_by_iface(ADAPTER_IFACE):
+ yield BtAdapter(dbus.Interface(adapter, ADAPTER_IFACE), self)
+
+ def get_bt_devices(self, category=BT_ANY, filters={}):
+ """Yields BtDevice objects currently known to the system.
+
+ filters - specifies the characteristics of that a BT device must have
+ to be yielded. The keys of filters dictionary represent names of
+ parameters (as specified by the bluetooth DBus Api and represented by
+ DBus proxy object), and its values must match proxy values.
+ I.e. {'Paired': False}. For a full list of Parameters see:
+ http://git.kernel.org/cgit/bluetooth/bluez.git/tree/doc/device-api.txt
+
+ Note that this function returns objects corresponding to BT devices
+ that were seen last time scanning was done."""
+ for device in self._get_objects_by_iface(DEVICE_IFACE):
+ obj = self.get_object_by_path(device.object_path)[DEVICE_IFACE]
+ try:
+ if category != BT_ANY:
+ if obj['Class'] != category:
+ continue
+ rejected = False
+ for filter in filters:
+ if obj[filter] != filters[filter]:
+ rejected = True
+ break
+ if rejected:
+ continue
+ yield BtDevice(dbus.Interface(device, DEVICE_IFACE), self)
+ except KeyError as exc:
+ logger.info('Property %s not found on device %s',
+ exc, device.object_path)
+ continue
+
+ def get_prop_iface(self, obj):
+ return dbus.Interface(self._bus.get_object(
+ 'org.bluez', obj.object_path), 'org.freedesktop.DBus.Properties')
+
+ def get_object_by_path(self, path):
+ return self._manager.GetManagedObjects()[path]
+
+ def get_proxy_by_path(self, path):
+ return self._bus.get_object('org.bluez', path)
+
+ def wait(self):
+ self._main_loop.run()
+
+ def quit_loop(self):
+ self._main_loop.quit()
+
+ def ensure_adapters_powered(self):
+ for adapter in self.get_bt_adapters():
+ adapter.ensure_powered()
+
+ def scan(self, timeout=10):
+ """Scan for BT devices visible to all adapters.'"""
+ self._bus.add_signal_receiver(
+ interfaces_added,
+ dbus_interface="org.freedesktop.DBus.ObjectManager",
+ signal_name="InterfacesAdded")
+ self._bus.add_signal_receiver(
+ properties_changed,
+ dbus_interface="org.freedesktop.DBus.Properties",
+ signal_name="PropertiesChanged",
+ arg0="org.bluez.Device1",
+ path_keyword="path")
+ for adapter in self._get_objects_by_iface(ADAPTER_IFACE):
+ try:
+ dbus.Interface(adapter, ADAPTER_IFACE).StopDiscovery()
+ except dbus.exceptions.DBusException:
+ pass
+ dbus.Interface(adapter, ADAPTER_IFACE).StartDiscovery()
+ GObject.timeout_add_seconds(timeout, self._scan_timeout)
+ self._main_loop.run()
+
+ def get_devices(self, timeout=10, rescan=True):
+ """Scan for and list all devices visible to all adapters."""
+ if rescan:
+ self.scan(timeout)
+ return list(self.get_bt_devices())
+
+ def _scan_timeout(self):
+ for adapter in self._get_objects_by_iface(ADAPTER_IFACE):
+ dbus.Interface(adapter, ADAPTER_IFACE).StopDiscovery()
+ self._main_loop.quit()
+
+
+class BtAdapter:
+ def __init__(self, dbus_iface, bt_mgr):
+ self._if = dbus_iface
+ self._bt_mgr = bt_mgr
+ self._prop_if = bt_mgr.get_prop_iface(dbus_iface)
+
+ def set_bool_prop(self, prop_name, value):
+ self._prop_if.Set(IFACE, prop_name, dbus.Boolean(value))
+
+ def ensure_powered(self):
+ """Turn the adapter on, and do nothing if already on."""
+ powered = self._prop_if.Get(IFACE, 'Powered')
+ logger.info('Powering on {}'.format(
+ self._if.object_path.split('/')[-1]))
+ if powered:
+ logger.info('Device already powered')
+ return
+ try:
+ self.set_bool_prop('Powered', True)
+ logger.info('Powered on')
+ except Exception as exc:
+ logging.error('Failed to power on - {}'.format(
+ exc.get_dbus_message()))
+
+
+class BtDevice:
+ def __init__(self, dbus_iface, bt_mgr):
+ self._if = dbus_iface
+ self._obj = bt_mgr.get_object_by_path(
+ self._if.object_path)[DEVICE_IFACE]
+ self._bt_mgr = bt_mgr
+ self._prop_if = bt_mgr.get_prop_iface(dbus_iface)
+ self._pair_outcome = None
+
+ def __str__(self):
+ return "{} ({})".format(self.name, self.address)
+
+ def __repr__(self):
+ return "<BtDevice name:{}, address:{}>".format(self.name, self.address)
+
+ def pair(self):
+ """Pair the device.
+
+ This function will try pairing with the device and block until device
+ is paired, error occured or default timeout elapsed (whichever comes
+ first).
+ """
+ self._prop_if.Set(DEVICE_IFACE, 'Trusted', True)
+ self._if.Pair(
+ reply_handler=self._pair_ok, error_handler=self._pair_error)
+ self._bt_mgr.wait()
+ if self._pair_outcome:
+ raise BtException(self._pair_outcome)
+ try:
+ self._if.Connect()
+ except dbus.exceptions.DBusException as exc:
+ logging.error('Failed to connect - {}'.format(
+ exc.get_dbus_message()))
+
+ def unpair(self):
+ self._if.Disconnect()
+ adapter = self._bt_mgr.get_proxy_by_path(self._obj['Adapter'])
+ dbus.Interface(adapter, ADAPTER_IFACE).RemoveDevice(self._if)
+
+ @property
+ def name(self):
+ return self._obj.get('Name', '<Unnamed>')
+
+ @property
+ def address(self):
+ return self._obj['Address']
+
+ @property
+ def rssi(self):
+ return self._obj.get('RSSI', None)
+
+ def _pair_ok(self):
+ logger.info('%s successfully paired', self.name)
+ self._pair_outcome = None
+ self._bt_mgr.quit_loop()
+
+ def _pair_error(self, error):
+ logger.warning('Pairing of %s device failed. %s', self.name, error)
+ self._pair_outcome = error
+ self._bt_mgr.quit_loop()
+
+
+class Rejected(dbus.DBusException):
+ _dbus_error_name = "org.bluez.Error.Rejected"
+
+
+class BtAgent(dbus.service.Object):
+ """Agent authenticating everything that is possible."""
+ @dbus.service.method(AGENT_IFACE, in_signature="os", out_signature="")
+ def AuthorizeService(self, device, uuid):
+ logger.info("AuthorizeService (%s, %s)", device, uuid)
+
+ @dbus.service.method(AGENT_IFACE, in_signature="o", out_signature="u")
+ def RequestPasskey(self, device):
+ logger.info("RequestPasskey (%s)", device)
+ passkey = input("Enter passkey: ")
+ return dbus.UInt32(passkey)
+
+ @dbus.service.method(AGENT_IFACE, in_signature="o", out_signature="s")
+ def RequestPinCode(self, device):
+ logger.info("RequestPinCode (%s)", device)
+ return input("Enter PIN Code: ")
+
+ @dbus.service.method(AGENT_IFACE, in_signature="ouq", out_signature="")
+ def DisplayPasskey(self, device, passkey, entered):
+ print("DisplayPasskey (%s, %06u entered %u)" %
+ (device, passkey, entered), flush=True)
+
+ @dbus.service.method(AGENT_IFACE, in_signature="os", out_signature="")
+ def DisplayPinCode(self, device, pincode):
+ logger.info("DisplayPinCode (%s, %s)", device, pincode)
+ print('Type following pin on your device: {}'.format(pincode),
+ flush=True)
+
+ @dbus.service.method(AGENT_IFACE, in_signature="ou", out_signature="")
+ def RequestConfirmation(self, device, passkey):
+ logger.info("RequestConfirmation (%s, %06d)", device, passkey)
+
+ @dbus.service.method(AGENT_IFACE, in_signature="o", out_signature="")
+ def RequestAuthorization(self, device):
+ logger.info("RequestAuthorization (%s)", device)
+
+ @dbus.service.method(AGENT_IFACE, in_signature="", out_signature="")
+ def Cancel(self):
+ logger.info("Cancelled")
+
+
+def properties_changed(interface, changed, invalidated, path):
+ logger.info('Property changed for device @ %s. Change: %s', path, changed)
+
+
+def interfaces_added(path, interfaces):
+ logger.info('Added new bt interfaces: %s @ %s', interfaces, path)