Source code for bleak.backends.bluezdbus.manager

"""
BlueZ D-Bus manager module
--------------------------

This module contains code for the global BlueZ D-Bus object manager that is
used internally by Bleak.
"""

import sys
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    if sys.platform != "linux":
        assert False, "This backend is only available on Linux"

import asyncio
import contextlib
import logging
import os
from collections import defaultdict
from collections.abc import Callable, Coroutine, MutableMapping
from typing import Any, NamedTuple, Optional, cast
from weakref import WeakKeyDictionary

from dbus_fast import BusType, Message, MessageType, Variant, unpack_variants
from dbus_fast.aio.message_bus import MessageBus

from bleak.args.bluez import OrPatternLike
from bleak.backends.bluezdbus import defs
from bleak.backends.bluezdbus.advertisement_monitor import AdvertisementMonitor
from bleak.backends.bluezdbus.defs import (
    Device1,
    GattCharacteristic1,
    GattDescriptor1,
    GattService1,
)
from bleak.backends.bluezdbus.signals import MatchRules, add_match
from bleak.backends.bluezdbus.utils import (
    assert_reply,
    device_path_from_characteristic_path,
    extract_service_handle_from_path,
    get_dbus_authenticator,
)
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.descriptor import BleakGATTDescriptor
from bleak.backends.service import BleakGATTService, BleakGATTServiceCollection
from bleak.exc import BleakDBusError, BleakError

logger = logging.getLogger(__name__)

AdvertisementCallback = Callable[[str, Device1], None]
"""
A callback that is called when advertisement data is received.

Args:
    arg0: The D-Bus object path of the device.
    arg1: The D-Bus properties of the device object.
"""


DevicePropertiesChangedCallback = Callable[[Optional[Any]], None]
"""
A callback that is called when the properties of a device change in BlueZ.

Args:
    arg0: The new property value.
"""


[docs] class DeviceConditionCallback(NamedTuple): """ Encapsulates a :data:`DevicePropertiesChangedCallback` and the property name being watched. """ callback: DevicePropertiesChangedCallback """ The callback. """ property_name: str """ The name of the property to watch. """
DeviceRemovedCallback = Callable[[str], None] """ A callback that is called when a device is removed from BlueZ. Args: arg0: The D-Bus object path of the device. """
[docs] class DeviceRemovedCallbackAndState(NamedTuple): """ Encapsulates an :data:`DeviceRemovedCallback` and some state. """ callback: DeviceRemovedCallback """ The callback. """ adapter_path: str """ The D-Bus object path of the adapter associated with the callback. """
DeviceConnectedChangedCallback = Callable[[bool], None] """ A callback that is called when a device's "Connected" property changes. Args: arg0: The current value of the "Connected" property. """ CharacteristicValueChangedCallback = Callable[[str, bytes], None] """ A callback that is called when a characteristics's "Value" property changes. Args: arg0: The D-Bus object path of the characteristic. arg1: The current value of the "Value" property. """
[docs] class DeviceWatcher(NamedTuple): device_path: str """ The D-Bus object path of the device. """ on_connected_changed: DeviceConnectedChangedCallback """ A callback that is called when a device's "Connected" property changes. """ on_characteristic_value_changed: CharacteristicValueChangedCallback """ A callback that is called when a characteristics's "Value" property changes. """
# set of org.bluez.Device1 property names that come from advertising data _ADVERTISING_DATA_PROPERTIES = { "AdvertisingData", "AdvertisingFlags", "ManufacturerData", "Name", "ServiceData", "UUIDs", }
[docs] class BlueZManager: """ BlueZ D-Bus object manager. Use :func:`bleak.backends.bluezdbus.get_global_bluez_manager` to get the global instance. """ def __init__(self) -> None: self._bus: Optional[MessageBus] = None self._bus_lock = asyncio.Lock() # dict of object path: dict of interface name: dict of property name: property value self._properties: dict[str, dict[str, dict[str, Any]]] = {} # set of available adapters for quick lookup self._adapters = set[str]() # The BlueZ APIs only maps children to parents, so we need to keep maps # to quickly find the children of a parent D-Bus object. # map of device d-bus object paths to set of service d-bus object paths self._service_map: dict[str, set[str]] = {} # map of service d-bus object paths to set of characteristic d-bus object paths self._characteristic_map: dict[str, set[str]] = {} # map of characteristic d-bus object paths to set of descriptor d-bus object paths self._descriptor_map: dict[str, set[str]] = {} self._advertisement_callbacks: defaultdict[str, list[AdvertisementCallback]] = ( defaultdict(list) ) self._device_removed_callbacks: list[DeviceRemovedCallbackAndState] = [] self._device_watchers: dict[str, set[DeviceWatcher]] = {} self._condition_callbacks: dict[str, set[DeviceConditionCallback]] = {} self._services_cache: dict[str, BleakGATTServiceCollection] = {} def _check_adapter(self, adapter_path: str) -> None: """ Raises: BleakError: if adapter is not present in BlueZ """ if adapter_path not in self._properties: raise BleakError(f"adapter '{adapter_path.split('/')[-1]}' not found") def _check_device(self, device_path: str) -> None: """ Raises: BleakError: if device is not present in BlueZ """ if device_path not in self._properties: raise BleakError(f"device '{device_path.split('/')[-1]}' not found") def _get_device_property( self, device_path: str, interface: str, property_name: str ) -> Any: self._check_device(device_path) device_properties = self._properties[device_path] try: interface_properties = device_properties[interface] except KeyError: raise BleakError( f"Interface {interface} not found for device '{device_path}'" ) try: value = interface_properties[property_name] except KeyError: raise BleakError( f"Property '{property_name}' not found for '{interface}' in '{device_path}'" ) return value
[docs] async def async_init(self) -> None: """ Connects to the D-Bus message bus and begins monitoring signals. It is safe to call this method multiple times. If the bus is already connected, no action is performed. """ async with self._bus_lock: if self._bus and self._bus.connected: return self._services_cache = {} # We need to create a new MessageBus each time as # dbus-next will destroy the underlying file descriptors # when the previous one is closed in its finalizer. bus = MessageBus(bus_type=BusType.SYSTEM, auth=get_dbus_authenticator()) await bus.connect() try: # Add signal listeners bus.add_message_handler(self._parse_msg) reply: Optional[Message] rules = MatchRules( interface=defs.OBJECT_MANAGER_INTERFACE, member="InterfacesAdded", arg0path="/org/bluez/", ) reply = await add_match(bus, rules) assert_reply(reply) rules = MatchRules( interface=defs.OBJECT_MANAGER_INTERFACE, member="InterfacesRemoved", arg0path="/org/bluez/", ) reply = await add_match(bus, rules) assert_reply(reply) rules = MatchRules( interface=defs.PROPERTIES_INTERFACE, member="PropertiesChanged", path_namespace="/org/bluez", ) reply = await add_match(bus, rules) assert_reply(reply) # get existing objects after adding signal handlers to avoid # race condition reply = await bus.call( Message( destination=defs.BLUEZ_SERVICE, path="/", member="GetManagedObjects", interface=defs.OBJECT_MANAGER_INTERFACE, ) ) assert reply assert_reply(reply) # dictionaries are cleared in case AddInterfaces was received first # or there was a bus reset and we are reconnecting self._properties.clear() self._service_map.clear() self._characteristic_map.clear() self._descriptor_map.clear() for path, interfaces in reply.body[0].items(): props = unpack_variants(interfaces) self._properties[path] = props if defs.ADAPTER_INTERFACE in props: self._adapters.add(path) service_props = cast( GattService1, props.get(defs.GATT_SERVICE_INTERFACE) ) if service_props: self._service_map.setdefault( service_props["Device"], set() ).add(path) char_props = cast( GattCharacteristic1, props.get(defs.GATT_CHARACTERISTIC_INTERFACE), ) if char_props: self._characteristic_map.setdefault( char_props["Service"], set() ).add(path) desc_props = cast( GattDescriptor1, props.get(defs.GATT_DESCRIPTOR_INTERFACE) ) if desc_props: self._descriptor_map.setdefault( desc_props["Characteristic"], set() ).add(path) if logger.isEnabledFor(logging.DEBUG): logger.debug("initial properties: %s", self._properties) except BaseException: # if setup failed, disconnect bus.disconnect() raise # Everything is setup, so save the bus self._bus = bus
[docs] def get_default_adapter(self) -> str: """ Gets the D-Bus object path of of the first powered Bluetooth adapter. Returns: Name of the first found powered adapter on the system, i.e. "/org/bluez/hciX". Raises: BleakError: if there are no Bluetooth adapters or if none of the adapters are powered """ if not any(self._adapters): raise BleakError("No Bluetooth adapters found.") for adapter_path in self._adapters: if cast( defs.Adapter1, self._properties[adapter_path][defs.ADAPTER_INTERFACE] )["Powered"]: return adapter_path raise BleakError("No powered Bluetooth adapters found.")
[docs] async def active_scan( self, adapter_path: str, filters: dict[str, Variant], advertisement_callback: AdvertisementCallback, device_removed_callback: DeviceRemovedCallback, ) -> Callable[[], Coroutine[Any, Any, None]]: """ Configures the advertisement data filters and starts scanning. Args: adapter_path: The D-Bus object path of the adapter to use for scanning. filters: A dictionary of filters to pass to ``SetDiscoveryFilter``. advertisement_callback: A callable that will be called when new advertisement data is received. device_removed_callback: A callable that will be called when a device is removed from BlueZ. Returns: An async function that is used to stop scanning and remove the filters. Raises: BleakError: if the adapter is not present in BlueZ """ async with self._bus_lock: assert self._bus # If the adapter doesn't exist, then the message calls below would # fail with "method not found". This provides a more informative # error message. self._check_adapter(adapter_path) self._advertisement_callbacks[adapter_path].append(advertisement_callback) device_removed_callback_and_state = DeviceRemovedCallbackAndState( device_removed_callback, adapter_path ) self._device_removed_callbacks.append(device_removed_callback_and_state) try: # Apply the filters reply = await self._bus.call( Message( destination=defs.BLUEZ_SERVICE, path=adapter_path, interface=defs.ADAPTER_INTERFACE, member="SetDiscoveryFilter", signature="a{sv}", body=[filters], ) ) assert reply assert_reply(reply) # Start scanning reply = await self._bus.call( Message( destination=defs.BLUEZ_SERVICE, path=adapter_path, interface=defs.ADAPTER_INTERFACE, member="StartDiscovery", ) ) assert reply assert_reply(reply) async def stop() -> None: # need to remove callbacks first, otherwise we get TxPower # and RSSI properties removed during stop which causes # incorrect advertisement data callbacks self._advertisement_callbacks[adapter_path].remove( advertisement_callback ) self._device_removed_callbacks.remove( device_removed_callback_and_state ) async with self._bus_lock: assert self._bus reply = await self._bus.call( Message( destination=defs.BLUEZ_SERVICE, path=adapter_path, interface=defs.ADAPTER_INTERFACE, member="StopDiscovery", ) ) assert reply try: assert_reply(reply) except BleakDBusError as ex: if ex.dbus_error != "org.bluez.Error.NotReady": raise else: # remove the filters reply = await self._bus.call( Message( destination=defs.BLUEZ_SERVICE, path=adapter_path, interface=defs.ADAPTER_INTERFACE, member="SetDiscoveryFilter", signature="a{sv}", body=[{}], ) ) assert reply assert_reply(reply) return stop except BaseException: # if starting scanning failed, don't leak the callbacks self._advertisement_callbacks[adapter_path].remove( advertisement_callback ) self._device_removed_callbacks.remove(device_removed_callback_and_state) raise
[docs] async def passive_scan( self, adapter_path: str, filters: list[OrPatternLike], advertisement_callback: AdvertisementCallback, device_removed_callback: DeviceRemovedCallback, ) -> Callable[[], Coroutine[Any, Any, None]]: """ Configures the advertisement data filters and starts scanning. Args: adapter_path: The D-Bus object path of the adapter to use for scanning. filters: A list of "or patterns" to pass to ``org.bluez.AdvertisementMonitor1``. advertisement_callback: A callable that will be called when new advertisement data is received. device_removed_callback: A callable that will be called when a device is removed from BlueZ. Returns: An async function that is used to stop scanning and remove the filters. Raises: BleakError: if the adapter is not present in BlueZ """ async with self._bus_lock: assert self._bus # If the adapter doesn't exist, then the message calls below would # fail with "method not found". This provides a more informative # error message. self._check_adapter(adapter_path) self._advertisement_callbacks[adapter_path].append(advertisement_callback) device_removed_callback_and_state = DeviceRemovedCallbackAndState( device_removed_callback, adapter_path ) self._device_removed_callbacks.append(device_removed_callback_and_state) try: monitor = AdvertisementMonitor(filters) # this should be a unique path to allow multiple python interpreters # running bleak and multiple scanners within a single interpreter monitor_path = f"/org/bleak/{os.getpid()}/{id(monitor)}" reply = await self._bus.call( Message( destination=defs.BLUEZ_SERVICE, path=adapter_path, interface=defs.ADVERTISEMENT_MONITOR_MANAGER_INTERFACE, member="RegisterMonitor", signature="o", body=[monitor_path], ) ) assert reply if ( reply.message_type == MessageType.ERROR and reply.error_name == "org.freedesktop.DBus.Error.UnknownMethod" ): raise BleakError( "passive scanning on Linux requires BlueZ >= 5.56 with --experimental enabled and Linux kernel >= 5.10" ) assert_reply(reply) # It is important to export after registering, otherwise BlueZ # won't use the monitor self._bus.export(monitor_path, monitor) async def stop() -> None: # need to remove callbacks first, otherwise we get TxPower # and RSSI properties removed during stop which causes # incorrect advertisement data callbacks self._advertisement_callbacks[adapter_path].remove( advertisement_callback ) self._device_removed_callbacks.remove( device_removed_callback_and_state ) async with self._bus_lock: assert self._bus self._bus.unexport(monitor_path, monitor) reply = await self._bus.call( Message( destination=defs.BLUEZ_SERVICE, path=adapter_path, interface=defs.ADVERTISEMENT_MONITOR_MANAGER_INTERFACE, member="UnregisterMonitor", signature="o", body=[monitor_path], ) ) assert reply assert_reply(reply) return stop except BaseException: # if starting scanning failed, don't leak the callbacks self._advertisement_callbacks[adapter_path].remove( advertisement_callback ) self._device_removed_callbacks.remove(device_removed_callback_and_state) raise
[docs] def add_device_watcher( self, device_path: str, on_connected_changed: DeviceConnectedChangedCallback, on_characteristic_value_changed: CharacteristicValueChangedCallback, ) -> DeviceWatcher: """ Registers a device watcher to receive callbacks when device state changes or events are received. Args: device_path: The D-Bus object path of the device. on_connected_changed: A callback that is called when the device's "Connected" state changes. on_characteristic_value_changed: A callback that is called whenever a characteristic receives a notification/indication. Returns: A device watcher object that acts a token to unregister the watcher. Raises: BleakError: if the device is not present in BlueZ """ self._check_device(device_path) watcher = DeviceWatcher( device_path, on_connected_changed, on_characteristic_value_changed ) self._device_watchers.setdefault(device_path, set()).add(watcher) return watcher
[docs] def remove_device_watcher(self, watcher: DeviceWatcher) -> None: """ Unregisters a device watcher. Args: The device watcher token that was returned by :meth:`add_device_watcher`. """ device_path = watcher.device_path self._device_watchers[device_path].remove(watcher) if not self._device_watchers[device_path]: del self._device_watchers[device_path]
[docs] async def get_services( self, device_path: str, use_cached: bool, requested_services: Optional[set[str]] ) -> BleakGATTServiceCollection: """ Builds a new :class:`BleakGATTServiceCollection` from the current state. Args: device_path: The D-Bus object path of the Bluetooth device. use_cached: When ``True`` if there is a cached :class:`BleakGATTServiceCollection`, the method will not wait for ``"ServicesResolved"`` to become true and instead return the cached service collection immediately. requested_services: When given, only return services with UUID that is in the list of requested services. Returns: A new :class:`BleakGATTServiceCollection`. Raises: BleakError: if the device is not present in BlueZ """ self._check_device(device_path) if use_cached: services = self._services_cache.get(device_path) if services is not None: logger.debug("Using cached services for %s", device_path) return services await self._wait_for_services_discovery(device_path) services = BleakGATTServiceCollection() for service_path in self._service_map.get(device_path, set()): service_props = cast( GattService1, self._properties[service_path][defs.GATT_SERVICE_INTERFACE], ) service = BleakGATTService( (service_path, service_props), extract_service_handle_from_path(service_path), service_props["UUID"], ) if ( requested_services is not None and service.uuid not in requested_services ): continue services.add_service(service) for char_path in self._characteristic_map.get(service_path, set()): char_props = cast( GattCharacteristic1, self._properties[char_path][defs.GATT_CHARACTERISTIC_INTERFACE], ) char = BleakGATTCharacteristic( (char_path, char_props), extract_service_handle_from_path(char_path), char_props["UUID"], char_props["Flags"], # "MTU" property was added in BlueZ 5.62, otherwise fall # back to minimum MTU according to Bluetooth spec. lambda: char_props.get("MTU", 23) - 3, service, ) services.add_characteristic(char) for desc_path in self._descriptor_map.get(char_path, set()): desc_props = cast( GattDescriptor1, self._properties[desc_path][defs.GATT_DESCRIPTOR_INTERFACE], ) desc = BleakGATTDescriptor( (desc_path, desc_props), int(desc_path[-4:], 16), desc_props["UUID"], char, ) services.add_descriptor(desc) self._services_cache[device_path] = services return services
[docs] def get_device_name(self, device_path: str) -> str: """ Gets the value of the "Name" property for a device. Args: device_path: The D-Bus object path of the device. Returns: The current property value. Raises: BleakError: if the device is not present in BlueZ """ return self._get_device_property(device_path, defs.DEVICE_INTERFACE, "Name")
[docs] def get_device_address(self, device_path: str) -> str: """ Gets the value of the "Address" property for a device. Args: device_path: The D-Bus object path of the device. Returns: The current property value. Raises: BleakError: if the device is not present in BlueZ """ return self._get_device_property(device_path, defs.DEVICE_INTERFACE, "Address")
[docs] def is_connected(self, device_path: str) -> bool: """ Gets the value of the "Connected" property for a device. Args: device_path: The D-Bus object path of the device. Returns: The current property value or ``False`` if the device does not exist in BlueZ. """ try: return self._properties[device_path][defs.DEVICE_INTERFACE]["Connected"] except KeyError: return False
[docs] def is_paired(self, device_path: str) -> bool: """ Gets the value of the "Paired" property for a device. Args: device_path: The D-Bus object path of the device. Returns: The current property value or ``False`` if the device does not exist in BlueZ. """ try: return self._properties[device_path][defs.DEVICE_INTERFACE]["Paired"] except KeyError: return False
async def _wait_for_services_discovery(self, device_path: str) -> None: """ Waits for the device services to be discovered. If a disconnect happens before the completion a BleakError exception is raised. Raises: BleakError: if the device is not present in BlueZ """ self._check_device(device_path) with contextlib.ExitStack() as stack: services_discovered_wait_task = asyncio.create_task( self._wait_condition(device_path, "ServicesResolved", True) ) stack.callback(services_discovered_wait_task.cancel) device_disconnected_wait_task = asyncio.create_task( self._wait_condition(device_path, "Connected", False) ) stack.callback(device_disconnected_wait_task.cancel) # in some cases, we can get "InterfaceRemoved" without the # "Connected" property changing, so we need to race against both # conditions device_removed_wait_task = asyncio.create_task( self._wait_removed(device_path) ) stack.callback(device_removed_wait_task.cancel) done, _ = await asyncio.wait( { services_discovered_wait_task, device_disconnected_wait_task, device_removed_wait_task, }, return_when=asyncio.FIRST_COMPLETED, ) # check for exceptions for task in done: task.result() if not done.isdisjoint( {device_disconnected_wait_task, device_removed_wait_task} ): raise BleakError("failed to discover services, device disconnected") async def _wait_removed(self, device_path: str) -> None: """ Waits for the device interface to be removed. If the device is not present in BlueZ, this returns immediately. Args: device_path: The D-Bus object path of a Bluetooth device. """ if device_path not in self._properties: return event = asyncio.Event() def callback(o: str) -> None: if o == device_path: event.set() device_removed_callback_and_state = DeviceRemovedCallbackAndState( callback, self._properties[device_path][defs.DEVICE_INTERFACE]["Adapter"] ) with contextlib.ExitStack() as stack: self._device_removed_callbacks.append(device_removed_callback_and_state) stack.callback( self._device_removed_callbacks.remove, device_removed_callback_and_state ) await event.wait() async def _wait_condition( self, device_path: str, property_name: str, property_value: Any ) -> None: """ Waits for a condition to become true. Args: device_path: The D-Bus object path of a Bluetooth device. property_name: The name of the property to test. property_value: A value to compare the current property value to. Raises: BleakError: if the device is not present in BlueZ """ value = self._get_device_property( device_path, defs.DEVICE_INTERFACE, property_name ) if value == property_value: return event = asyncio.Event() def _wait_condition_callback(new_value: Optional[Any]) -> None: """Callback for when a property changes.""" if new_value == property_value: event.set() condition_callbacks = self._condition_callbacks device_callbacks = condition_callbacks.setdefault(device_path, set()) callback = DeviceConditionCallback(_wait_condition_callback, property_name) device_callbacks.add(callback) try: # can be canceled await event.wait() finally: device_callbacks.remove(callback) if not device_callbacks: del condition_callbacks[device_path] def _parse_msg(self, message: Message) -> None: """ Handles callbacks from dbus_fast. """ if message.message_type != MessageType.SIGNAL: return if logger.isEnabledFor(logging.DEBUG): logger.debug( "received D-Bus signal: %s.%s (%s): %s", message.interface, message.member, message.path, message.body, ) # type hints obj_path: str interfaces_and_props: dict[str, dict[str, Variant]] interfaces: list[str] interface: str changed: dict[str, Variant] invalidated: list[str] if message.member == "InterfacesAdded": obj_path, interfaces_and_props = message.body for interface, props in interfaces_and_props.items(): unpacked_props = unpack_variants(props) self._properties.setdefault(obj_path, {})[interface] = unpacked_props if interface == defs.GATT_SERVICE_INTERFACE: service_props = cast(GattService1, unpacked_props) self._service_map.setdefault(service_props["Device"], set()).add( obj_path ) elif interface == defs.GATT_CHARACTERISTIC_INTERFACE: char_props = cast(GattCharacteristic1, unpacked_props) self._characteristic_map.setdefault( char_props["Service"], set() ).add(obj_path) elif interface == defs.GATT_DESCRIPTOR_INTERFACE: desc_props = cast(GattDescriptor1, unpacked_props) self._descriptor_map.setdefault( desc_props["Characteristic"], set() ).add(obj_path) elif interface == defs.ADAPTER_INTERFACE: self._adapters.add(obj_path) # If this is a device and it has advertising data properties, # then it should mean that this device just started advertising. # Previously, we just relied on RSSI updates to determine if # a device was actually advertising, but we were missing "slow" # devices that only advertise once and then go to sleep for a while. elif interface == defs.DEVICE_INTERFACE: self._run_advertisement_callbacks( obj_path, cast(Device1, unpacked_props) ) elif message.member == "InterfacesRemoved": obj_path, interfaces = message.body for interface in interfaces: try: del self._properties[obj_path][interface] except KeyError: pass if interface == defs.ADAPTER_INTERFACE: try: self._adapters.remove(obj_path) except KeyError: pass elif interface == defs.DEVICE_INTERFACE: self._services_cache.pop(obj_path, None) try: del self._service_map[obj_path] except KeyError: pass for callback, adapter_path in self._device_removed_callbacks: if obj_path.startswith(adapter_path): callback(obj_path) elif interface == defs.GATT_SERVICE_INTERFACE: device_path = obj_path[: obj_path.rfind("/")] try: self._service_map[device_path].remove(obj_path) except KeyError: pass try: del self._characteristic_map[obj_path] except KeyError: pass elif interface == defs.GATT_CHARACTERISTIC_INTERFACE: try: del self._descriptor_map[obj_path] except KeyError: pass # Remove empty properties when all interfaces have been removed. # This avoids wasting memory for people who have noisy devices # with private addresses that change frequently. if obj_path in self._properties and not self._properties[obj_path]: del self._properties[obj_path] elif message.member == "PropertiesChanged": interface, changed, invalidated = message.body message_path = message.path assert message_path is not None try: self_interface = self._properties[message_path][interface] except KeyError: # This can happen during initialization. The "PropertiesChanged" # handler is attached before "GetManagedObjects" is called # and so self._properties may not yet be populated. # This is not a problem. We just discard the property value # since "GetManagedObjects" will return a newer value. pass else: # update self._properties first self_interface.update(unpack_variants(changed)) for name in invalidated: try: del self_interface[name] except KeyError: # sometimes there BlueZ tries to remove properties # that were never added pass # then call any callbacks so they will be called with the # updated state if interface == defs.DEVICE_INTERFACE: # handle advertisement watchers device_path = message_path self._run_advertisement_callbacks( device_path, cast(Device1, self_interface) ) # handle device condition watchers callbacks = self._condition_callbacks.get(device_path) if callbacks: for item in callbacks: name = item.property_name if name in changed: item.callback(self_interface.get(name)) # handle device connection change watchers if "Connected" in changed: new_connected = self_interface["Connected"] watchers = self._device_watchers.get(device_path) if watchers: # callbacks may remove the watcher, hence the copy for watcher in watchers.copy(): watcher.on_connected_changed(new_connected) elif interface == defs.GATT_CHARACTERISTIC_INTERFACE: # handle characteristic value change watchers if "Value" in changed: new_value = self_interface["Value"] device_path = device_path_from_characteristic_path(message_path) watchers = self._device_watchers.get(device_path) if watchers: for watcher in watchers: watcher.on_characteristic_value_changed( message_path, new_value ) def _run_advertisement_callbacks(self, device_path: str, device: Device1) -> None: """ Runs any registered advertisement callbacks. Args: device_path: The D-Bus object path of the remote device. device: The current D-Bus properties of the device. """ adapter_path = device["Adapter"] for callback in self._advertisement_callbacks[adapter_path]: callback(device_path, device.copy())
_global_instances: MutableMapping[Any, BlueZManager] = WeakKeyDictionary()
[docs] async def get_global_bluez_manager() -> BlueZManager: """ Gets an existing initialized global BlueZ manager instance associated with the current event loop, or initializes a new instance. """ loop = asyncio.get_running_loop() try: instance = _global_instances[loop] except KeyError: instance = _global_instances[loop] = BlueZManager() await instance.async_init() return instance