diff options
| author | Yung Shen <yung.shen@canonical.com> | 2016-04-01 18:42:16 +0800 |
|---|---|---|
| committer | Yung Shen <yung.shen@canonical.com> | 2016-04-01 18:42:16 +0800 |
| commit | 534f008a894110e909f390ed368c166571c35c81 (patch) | |
| tree | 8e11c661135d17502dbfe135ba084b92d86888f9 | |
| parent | 15f19d4d7c5f20f122c739298c9b81f53b35508d (diff) | |
bt_connect: rewrite based on bt_helper
| -rwxr-xr-x | bin/bt_connect | 265 |
1 files changed, 95 insertions, 170 deletions
diff --git a/bin/bt_connect b/bin/bt_connect index d70462b..658dab6 100755 --- a/bin/bt_connect +++ b/bin/bt_connect @@ -21,201 +21,126 @@ # You should have received a copy of the GNU General Public License # along with Checkbox. If not, see <http://www.gnu.org/licenses/>. +# # possibility to lockup, if it times out -# TODO: 5. better exception handling (wrong PIN, authentication rejected) +# TODO: +# 4. adapting bluez5 stack based on: https://github.com/kissiel/bt_helper +# 5. better exception handling (wrong PIN, authentication rejected) # 6. different PIN selection # 7. MAC address validator # 8. Use logging for information # 9. PEP8 -try: - from gi.repository import GObject -except ImportError: - import gobject as GObject +import sys import time -import dbus -import dbus.service -from dbus.mainloop.glib import DBusGMainLoop -from argparse import ArgumentParser - -PIN = '0000' -CLASS_CODE = {'mouse': [0x580, 0x2580], 'keyboard': [0x540, 0x2540]} -mainloop = GObject.MainLoop() - - -class Rejected(dbus.DBusException): - _dbus_error_name = "org.bluez.Error.Rejected" - - -class Agent(dbus.service.Object): - exit_on_release = True - - def set_exit_on_release(self, exit_on_release): - self.exit_on_release = exit_on_release - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Release(self): - if self.exit_on_release: - mainloop.quit() - - @dbus.service.method("org.bluez.Agent", - in_signature="os", out_signature="") - def Authorize(self, device, uuid): - print("Authorize {}, {}".format(device, uuid)) - authorize = raw_input("Authorize connection (yes/no): ") - if (authorize == "yes"): - return - raise Rejected("Connection rejected by user") - - @dbus.service.method("org.bluez.Agent", - in_signature="o", out_signature="s") - def RequestPinCode(self, device): - print("Sending PIN: {} to your device.".format(PIN)) - if TARGET is not 'mouse': - print("For keyboard, please type this PIN on it and hit Enter.") - # need to figure out how to send 0000 / 1111 / 1234 - return PIN - # return raw_input("Enter PIN Code: ") - - @dbus.service.method("org.bluez.Agent", - in_signature="o", out_signature="u") - def RequestPasskey(self, device): - print("RequestPasskey {}".format(device)) - passkey = raw_input("Enter passkey: ") - return dbus.UInt32(passkey) - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def DisplayPasskey(self, device, passkey): - print("Please enter the passkey: {} on your device".format(passkey)) - - @dbus.service.method("org.bluez.Agent", - in_signature="ou", out_signature="") - def RequestConfirmation(self, device, passkey): - print("RequestConfirmation {}, {})".format(device, passkey)) - confirm = raw_input("Confirm passkey (yes/no): ") - if (confirm == "yes"): - return - raise Rejected("Passkey doesn't match") - - @dbus.service.method("org.bluez.Agent", - in_signature="s", out_signature="") - def ConfirmModeChange(self, mode): - print("ConfirmModeChange {}".format(mode)) - authorize = raw_input("Authorize mode change (yes/no): ") - if (authorize == "yes"): - return - raise Rejected("Mode change by user") - - @dbus.service.method("org.bluez.Agent", - in_signature="", out_signature="") - def Cancel(self): - print("Cancel") - - -def create_device_reply(device): - print("New device {}".format(device)) - mainloop.quit() +import pdb +import bt_helper - -def create_device_error(error): - if error._dbus_error_name == 'org.bluez.Error.AuthenticationFailed': - print("ERROR: Passcode Authentication Failed, wrong passcode?") - else: - print("Creating device failed: {}".format(error)) - mainloop.quit() - - -def property_changed(name, value): - """handler function for "PropertyChanged" signal.""" - if (name == "Discovering" and not value): - mainloop.quit() - - -def device_found(address, properties): - global DEVICE_MAC - """handler function for "DeviceFound" signal.""" - if properties['Class'] in CLASS_CODE[TARGET]: - print("Device found: {}".format(properties['Name'])) - print("MAC address: {}".format(properties['Address'])) - DEVICE_MAC = properties['Address'] +from argparse import ArgumentParser def main(): """Add argument parser here and do most of the job.""" - global TARGET - global DEVICE_MAC - DEVICE_MAC = None parser = ArgumentParser(description="Bluetooth auto paring and connect. Please select one option.") group = parser.add_mutually_exclusive_group(required=True) group.add_argument("--mac", type=str, help="Pair with a given MAC, not using scan result,") group.add_argument("--mouse", action="store_const", - const="mouse", dest='target', + const="input-mouse", dest='target', help="Pair with the last mouse found from scan result.") group.add_argument("--keyboard", action="store_const", - const="keyboard", dest='target', + const="input-keyboard", dest='target', help="Pair with the last keyboard found from scan result.") - parser.add_argument("-i", "--interface", type=str, - help="Device interface, e.g. hci0") - parser.add_argument("-c", "--capability", type=str, default="DisplayYesNo") + # TODO: TBD, + # not many use case will have multiple bt interface and besides dbus should handle different naming. + #parser.add_argument("-i", "--interface", type=str, + # help="Device interface, e.g. hci0") + # + # TODO: TBD, not sure about this part + #parser.add_argument("-c", "--capability", type=str, default="DisplayYesNo") + # args = parser.parse_args() + + manager = bt_helper.BtManager() - DBusGMainLoop(set_as_default=True) - bus = dbus.SystemBus() - manager = dbus.Interface(bus.get_object("org.bluez", "/"), - "org.bluez.Manager") - if args.interface: - path = manager.FindAdapter(args.interface) - else: - path = manager.DefaultAdapter() - - adapter = dbus.Interface(bus.get_object("org.bluez", path), - "org.bluez.Adapter") + # Power on device and scaning in advance. + manager.ensure_adapters_powered() + manager.scan() - path = "/test/agent" - agent = Agent(bus, path) - if not args.mac: - # Activate scan and auto pairing - TARGET = args.target - print("Trying to scan and pair with a {}.".format(TARGET)) - bus.add_signal_receiver(device_found, - signal_name="DeviceFound", - dbus_interface="org.bluez.Adapter") - bus.add_signal_receiver(property_changed, - signal_name="PropertyChanged", - dbus_interface="org.bluez.Adapter") - adapter.StartDiscovery() - mainloop.run() - adapter.StopDiscovery() - if not DEVICE_MAC: + if args.mac: + DEVICE_MAC = args.mac + print("Trying to pair with {}.".format(DEVICE_MAC)) + DEVICE = list(manager.get_bt_devices(filters={'Address': DEVICE_MAC})) + + if not DEVICE: print("ERROR: No pairable device found, terminating") return 1 + + for paireddevice in list(manager.get_bt_devices(filters={'Address': DEVICE_MAC, 'Paired': True})): + paireddevice.unpair() + print("Already paired, please reset the device to pairing mode..") + # Give tester some more awareness + for i in range(0,11,1): + sys.stdout.write("\r"+str(i)) + sys.stdout.flush() + time.sleep(1) + print("\n") + + # Todo: + # if device is not listed should return status code: 1 + + for device in list(manager.get_bt_devices(filters={'Address': DEVICE_MAC})): + device.pair() + print("Device paired") + return 0 + else: - DEVICE_MAC = args.mac - - # Try to remove the Device entry if exist - try: - device = adapter.FindDevice(DEVICE_MAC) - print("Device already exist, remove it first") - adapter.RemoveDevice(device) - except dbus.exceptions.DBusException: - print("Creating device entry") - agent.set_exit_on_release(False) - print("Paring device") - adapter.CreatePairedDevice(DEVICE_MAC, path, args.capability, - reply_handler=create_device_reply, - error_handler=create_device_error) - - mainloop.run() - - time.sleep(3) - print("Connecting device...") - device = adapter.FindDevice(DEVICE_MAC) - hid = dbus.Interface(bus.get_object("org.bluez", device), - "org.bluez.Input") - hid.Connect() + TARGET = args.target + print("Listing existing device") + #Listing device based on RSSI + devices = sorted(manager.get_bt_devices( + category=bt_helper.BT_ANY, filters={'Paired': False, 'Icon': TARGET}), + key=lambda x: int(x.rssi or -255), reverse=True) + + if not devices: + print("Error: No devices detected") + return 1 + + print('Detected devices (sorted by RSSI; highest first).') + # let's assing numbers to devices + devices = dict(enumerate(devices, 1)) + + # TODO: TBD, + # may considering unpair every paired TARGET devices first? + for paireddevice in list(manager.get_bt_devices(filters={'Icon': TARGET, 'Paired': True})): + print("Unpairing known device: {}".format(paireddevice)) + paireddevice.unpair() + + for num, dev in devices.items(): + print('{}. {} (RSSI: {})'.format(num, dev, dev.rssi)) + chosen = False + while not chosen: + num = input('Which one would you like to connect to? (0 to exit) ') + #TODO: enter to default + if num == '0': + return 1 + chosen = num.isnumeric() and int(num) in devices.keys() + print('{} chosen.'.format(devices[int(num)])) + print("Pairing selected device..") + + pdb.set_trace() + + # TODO: + # capture .pair() is failing + # however it's only returning None even it's failing + # https://github.com/kissiel/bt_helper/issues/11 + try: + devices[int(num)].pair() + except: + print("ERROR: something wrong..") + return 1 + else: + return 0 if __name__ == "__main__": - main() + sys.exit(main()) |
