Skip to content

Interacting with KLAS

KLAS communicates with the system using D-Bus. You can interact with KLAS by using the provided D-Bus APIs to send telemetries and configurations, as well as to manage firmware updates.

The following examples demonstrate how to use the D-Bus APIs with Python and the pydbus library, but any D-Bus capable language or tool can be used.

KLAS is assumed to be running and connected to Kamea for these examples to work, to provision KLAS, refer to the Getting Started Guide and Provisioning example

Telemetries example

D-Bus API reference: Telemetries D-Bus API

This example sends a simple telemetry payload containing a single key-value pair to Kamea.

#!/usr/bin/env python3

from pydbus import SessionBus

# D-BUS Configuration
BUS_NAME = "com.witekio.klas"
OBJECT_PATH = "/com/witekio/klas/telemetries"
INTERFACE_NAME = "com.witekio.klas.Telemetries"

bus = SessionBus()

try:
    remote_obj = bus.get(BUS_NAME, OBJECT_PATH)
    telemetry_iface = remote_obj[INTERFACE_NAME]

    remote_obj.sendUserTelemetriesJson('{"telemetry_key": "telemetry_value"}')

except Exception as e:
    print(f"Error: {e}")

The sent telemetry can be viewed in the Kamea dashboard:

Telemetries

Configurations example

D-Bus API reference: Configurations D-Bus API

This example listens for configuration updates from Kamea and sends a sample configuration to Kamea. It also reads and prints the initial desired configuration upon startup.

#!/usr/bin/env python3

import json
from pydbus import SessionBus
from gi.repository import GLib

# D-BUS Configuration
BUS_NAME = "com.witekio.klas"
OBJECT_PATH = "/com/witekio/klas/configuration"
INTERFACE_NAME = "com.witekio.klas.Configuration"


def on_received_configuration(config_data):
    """Callback function for receiving configuration data"""
    print(f"\n[SIGNAL] New Configuration Received: {config_data}")

    try:
        data = json.loads(config_data)
        for key, value in data.items():
            print(f"Config Key: {key}, Config Value: {value}")

    except json.JSONDecodeError:
        print("Error: Received signal data was not valid JSON")
    except Exception as e:
        print(f"Error processing config: {e}")


bus = SessionBus()

try:
    remote_obj = bus.get(BUS_NAME, OBJECT_PATH)
    config_iface = remote_obj[INTERFACE_NAME]

    if not remote_obj.connected:
        raise Exception("Failed to connect to the configuration service.")

    # Setup signal handlers
    remote_obj.receivedDesiredJson.connect(on_received_configuration)
    print("Listening for configuration updates... (Press Ctrl+C to stop)")

    # Send an example configuration JSON
    remote_obj.sendUserConfigurationJson('{"example_key": "example_value"}')

    # Read and print the initial desired configuration
    print(remote_obj.desiredJson)

    loop = GLib.MainLoop()
    loop.run()
except KeyboardInterrupt:
    print("\nExiting...")
except Exception as e:
    print(f"Error: {e}")

The sent configuration can be viewed in the Kamea dashboard:

Configurations

Firmware update example

D-Bus API reference: Firmware Update D-Bus API

This example demonstrates how to complete a firmware update using KLAS's D-Bus API.

#!/usr/bin/env python3

from pydbus import SessionBus
from gi.repository import GLib

# D-BUS Configuration
BUS_NAME = "com.witekio.klas"
OBJECT_PATH = "/com/witekio/klas/firmware"
INTERFACE_NAME = "com.witekio.klas.Firmware"


def on_firmware_signal(firmware_versions):
    """Callback function for the signal"""
    if not firmware_versions:
        return

    print(f"\n[SIGNAL] New Firmware Available: {firmware_versions}")

    try:
        name, version = firmware_versions[0]
    except (IndexError, ValueError, TypeError) as e:
        print(f"Invalid firmware_versions payload: {firmware_versions} ({e})")
        return

    print(f"Name: {name}, Version: {version}")
    confirm_data = (name, version, True, "")
    remote_obj.acceptFirmware(confirm_data)

    print(f"Accepted firmware installation: {name} - {version}")


def on_firmware_file_available(firmware_file):
    """Callback function for firmware file availability signal"""
    if not firmware_file:
        return

    print(f"\n[SIGNAL] Firmware File Available: {firmware_file}")

    try:
        name, version, filepath = firmware_file[0]
    except (IndexError, ValueError, TypeError) as e:
        print(f"Invalid firmware_file payload: {firmware_file} ({e})")
        return

    # --- internal installation step here ---

    print(f"Name: {name}, Version: {version}, Filepath: {filepath}")
    confirm_data = (name, version, True, "")
    remote_obj.confirmFirmwareInstall(confirm_data)

    print(f"Confirmed firmware installation: {name} - {version}")


bus = SessionBus()

try:
    remote_obj = bus.get(BUS_NAME, OBJECT_PATH)
    firmware_iface = remote_obj[INTERFACE_NAME]

    # Read a property
    statuses = remote_obj.firmwareStatuses
    print(f"Current Firmware Statuses: {statuses}")

    # Setup signal handlers
    remote_obj.newFirmwareVersionsAvailable.connect(on_firmware_signal)
    remote_obj.newFirmwareFilesAvailable.connect(on_firmware_file_available)
    print("Listening for signals... (Press Ctrl+C to stop)")

    # Initial call to handle existing firmware versions
    on_firmware_signal(remote_obj.availableFirmwareVersions)

    loop = GLib.MainLoop()
    loop.run()

except KeyboardInterrupt:
    print("\nExiting...")
except Exception as e:
    print(f"Error: {e}")

It is important to note that even if a reboot is required for a firmware update to take effect, KLAS will keep track of the firmware status and wait for the confirmation of the installation via D-Bus after the reboot.

During the firmware update process, the firmware status can be monitored in the Kamea dashboard as described in the Firmware Update Management section.