Source code for ble2wled.states

"""Beacon state management with timeout and fade-out.

This module provides the BeaconState class for tracking BLE beacons with
automatic timeout and fade-out effects. Beacons update their \"life\" value
from 1.0 (fully visible) to 0.0 (faded out) as they age.

Example:
    Track beacons and get their current state::

        state = BeaconState(timeout_seconds=6.0, fade_out_seconds=4.0)
        state.update('beacon_1', -50)  # Update with RSSI
        beacons = state.snapshot()     # Get current beacons
        for beacon_id, (rssi, life) in beacons.items():
            print(f\"{beacon_id}: RSSI={rssi}, life={life}\")
"""

import threading
import time
from typing import Dict, Tuple


[docs] class BeaconState: """Manages the state of detected beacons with automatic timeout and fade-out. This class maintains a thread-safe collection of beacons with their RSSI values and life metrics. Beacons automatically timeout when not updated and fade out gracefully over a configurable period. Attributes: timeout (float): Seconds before a beacon is considered timed out. fade_out (float): Seconds to fade out after timeout. """
[docs] def __init__(self, timeout_seconds: float = 5.0, fade_out_seconds: float = 3.0): """Initialize beacon state tracker. Args: timeout_seconds (float): Duration before a beacon is considered timed out (no updates received). Default is 5.0 seconds. fade_out_seconds (float): Duration to fade out the beacon after timeout. Default is 3.0 seconds. Example: Create a beacon state tracker with 6-second timeout and 4-second fade:: state = BeaconState(timeout_seconds=6.0, fade_out_seconds=4.0) """ self._lock = threading.Lock() self.timeout = timeout_seconds self.fade_out = fade_out_seconds self._beacons: Dict[str, dict] = {}
[docs] def update(self, beacon_id: str, rssi: int) -> None: """Update beacon with signal strength. Updates or creates a beacon entry with the current timestamp and RSSI. Thread-safe operation. Args: beacon_id (str): Unique beacon identifier (e.g., 'iBeacon:2686f39c-bada-4658-854a-a62e7e5e8b8d-1-0'). rssi (int): Received Signal Strength Indicator in dBm (typically -30 to -100). Example: Update beacon with RSSI value:: state.update('beacon_1', -50) # Update beacon_1 with -50 dBm """ now = time.time() with self._lock: self._beacons[beacon_id] = { "rssi": rssi, "last_seen": now, "life": 1.0, }
[docs] def snapshot(self) -> Dict[str, Tuple[int, float]]: """Get current active beacons with RSSI and life values. Returns a thread-safe snapshot of all active beacons. Automatically removes beacons that have completely faded out. Each beacon entry includes its RSSI and life value (0.0 = invisible, 1.0 = fully visible). Returns: dict: Dictionary mapping beacon_id (str) to (rssi, life) tuples. rssi (int): Signal strength in dBm. life (float): Beacon visibility (0.0 to 1.0). Example: Get current beacon snapshot:: beacons = state.snapshot() if 'beacon_1' in beacons: rssi, life = beacons['beacon_1'] print(f"Beacon 1: {rssi} dBm, {life*100:.0f}% visible") """ now = time.time() active = {} with self._lock: to_remove = [] for beacon_id, data in self._beacons.items(): age = now - data["last_seen"] if age <= self.timeout: data["life"] = 1.0 else: decay = (age - self.timeout) / self.fade_out data["life"] = max(0.0, 1.0 - decay) if data["life"] <= 0.0: to_remove.append(beacon_id) else: active[beacon_id] = (data["rssi"], data["life"]) for beacon_id in to_remove: del self._beacons[beacon_id] return active