Skip to content
Snippets Groups Projects
Commit cfce62e4 authored by Edward Betts's avatar Edward Betts
Browse files

import python-otbr-api_2.6.0.orig.tar.gz

parents
No related branches found
No related tags found
No related merge requests found
Showing
with 1044 additions and 0 deletions
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: daily
open-pull-requests-limit: 10
- package-ecosystem: pip
directory: "/"
schedule:
interval: weekly
open-pull-requests-limit: 10
categories:
- title: "⬆️ Dependencies"
collapse-after: 1
labels:
- "dependencies"
template: |
## What's Changed
$CHANGES
# This workflows will upload a Python Package using Twine when a release is created
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries
name: Upload Python Package
on:
release:
types:
- published
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4.1.1
- name: Set up Python
uses: actions/setup-python@v5.0.0
with:
python-version: '3.x'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install build wheel twine
- name: Build and publish
env:
TWINE_USERNAME: __token__
TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }}
run: |
python -m build
twine upload dist/*
name: Release Drafter
on:
push:
branches:
- main
jobs:
update_release_draft:
runs-on: ubuntu-latest
steps:
# Drafts your next Release notes as Pull Requests are merged into "main"
- uses: release-drafter/release-drafter@v6.0.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions
name: Run Tests
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4.1.1
- name: Set up Python 3.9
uses: actions/setup-python@v5.0.0
with:
python-version: 3.9
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-test.txt
- name: Lint with flake8
run: |
flake8 python_otbr_api tests
- name: Check formatting with black
run: |
black python_otbr_api tests --check --diff
- name: Lint with mypy
run: |
mypy python_otbr_api tests
- name: Lint with pylint
run: |
pylint python_otbr_api tests
- name: Run tests
run: |
pytest tests
.DS_Store
.idea
*.log
tmp/
*.py[cod]
*.egg
htmlcov
.projectile
.venv/
venv/
.mypy_cache/
*.egg-info/
# Visual Studio Code
.vscode/*
dist
LICENSE 0 → 100644
MIT License
Copyright (c) 2023 ESPHome
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# Python OTBR API
Python package to interact with an OTBR via its REST API
[build-system]
requires = ["setuptools>=65.6"]
build-backend = "setuptools.build_meta"
[project]
name = "python-otbr-api"
version = "2.6.0"
license = {text = "MIT"}
description = "API to interact with an OTBR via its REST API"
readme = "README.md"
authors = [
{name = "The Home Assistant Authors", email = "hello@home-assistant.io"}
]
requires-python = ">=3.9.0"
dependencies = [
"aiohttp",
"bitstruct",
"cryptography",
"typing_extensions",
"voluptuous",
]
[project.urls]
"Homepage" = "https://github.com/home-assistant-libs/python-otbr-api"
[tool.pylint.BASIC]
class-const-naming-style = "any"
good-names = [
"c",
"i",
]
[tool.pytest.ini_options]
asyncio_mode = "auto"
[tool.setuptools]
platforms = ["any"]
zip-safe = true
include-package-data = true
[tool.setuptools.packages.find]
include = ["python_otbr_api*"]
[tool.setuptools.package-data]
"*" = ["py.typed"]
"""API to interact with the Open Thread Border Router REST API."""
from __future__ import annotations
from http import HTTPStatus
import json
import aiohttp
import voluptuous as vol # type:ignore[import]
from .models import ActiveDataSet, PendingDataSet, Timestamp
# 5 minutes as recommended by
# https://github.com/openthread/openthread/discussions/8567#discussioncomment-4468920
PENDING_DATASET_DELAY_TIMER = 5 * 60 * 1000
class OTBRError(Exception):
"""Raised on error."""
class FactoryResetNotSupportedError(OTBRError):
"""Raised when attempting to factory reset a router which does not support it."""
class GetBorderAgentIdNotSupportedError(OTBRError):
"""Raised when attempting to get the agent ID if the router does not support it."""
class ThreadNetworkActiveError(OTBRError):
"""Raised on attempts to modify the active dataset when thread network is active."""
class OTBR: # pylint: disable=too-few-public-methods
"""Class to interact with the Open Thread Border Router REST API."""
def __init__(
self, url: str, session: aiohttp.ClientSession, timeout: int = 10
) -> None:
"""Initialize."""
self._session = session
self._url = url
self._timeout = timeout
async def factory_reset(self) -> None:
"""Factory reset the router."""
response = await self._session.delete(
f"{self._url}/node",
timeout=aiohttp.ClientTimeout(total=10),
)
if response.status == HTTPStatus.METHOD_NOT_ALLOWED:
raise FactoryResetNotSupportedError
if response.status != HTTPStatus.OK:
raise OTBRError(f"unexpected http status {response.status}")
async def get_border_agent_id(self) -> bytes:
"""Get the border agent ID."""
response = await self._session.get(
f"{self._url}/node/ba-id",
timeout=aiohttp.ClientTimeout(total=self._timeout),
)
if response.status == HTTPStatus.NOT_FOUND:
raise GetBorderAgentIdNotSupportedError
if response.status != HTTPStatus.OK:
raise OTBRError(f"unexpected http status {response.status}")
try:
return bytes.fromhex(await response.json())
except ValueError as exc:
raise OTBRError("unexpected API response") from exc
async def set_enabled(self, enabled: bool) -> None:
"""Enable or disable the router."""
response = await self._session.put(
f"{self._url}/node/state",
json="enable" if enabled else "disable",
timeout=aiohttp.ClientTimeout(total=10),
)
if response.status != HTTPStatus.OK:
raise OTBRError(f"unexpected http status {response.status}")
async def get_active_dataset(self) -> ActiveDataSet | None:
"""Get current active operational dataset.
Returns None if there is no active operational dataset.
Raises if the http status is 400 or higher or if the response is invalid.
"""
response = await self._session.get(
f"{self._url}/node/dataset/active",
timeout=aiohttp.ClientTimeout(total=self._timeout),
)
if response.status == HTTPStatus.NO_CONTENT:
return None
if response.status != HTTPStatus.OK:
raise OTBRError(f"unexpected http status {response.status}")
try:
return ActiveDataSet.from_json(await response.json())
except (json.JSONDecodeError, vol.Error) as exc:
raise OTBRError("unexpected API response") from exc
async def get_active_dataset_tlvs(self) -> bytes | None:
"""Get current active operational dataset in TLVS format, or None.
Returns None if there is no active operational dataset.
Raises if the http status is 400 or higher or if the response is invalid.
"""
response = await self._session.get(
f"{self._url}/node/dataset/active",
headers={"Accept": "text/plain"},
timeout=aiohttp.ClientTimeout(total=self._timeout),
)
if response.status == HTTPStatus.NO_CONTENT:
return None
if response.status != HTTPStatus.OK:
raise OTBRError(f"unexpected http status {response.status}")
try:
return bytes.fromhex(await response.text("ASCII"))
except ValueError as exc:
raise OTBRError("unexpected API response") from exc
async def get_pending_dataset_tlvs(self) -> bytes | None:
"""Get current pending operational dataset in TLVS format, or None.
Returns None if there is no pending operational dataset.
Raises if the http status is 400 or higher or if the response is invalid.
"""
response = await self._session.get(
f"{self._url}/node/dataset/pending",
headers={"Accept": "text/plain"},
timeout=aiohttp.ClientTimeout(total=self._timeout),
)
if response.status == HTTPStatus.NO_CONTENT:
return None
if response.status != HTTPStatus.OK:
raise OTBRError(f"unexpected http status {response.status}")
try:
return bytes.fromhex(await response.text("ASCII"))
except ValueError as exc:
raise OTBRError("unexpected API response") from exc
async def create_active_dataset(self, dataset: ActiveDataSet) -> None:
"""Create active operational dataset.
The passed in ActiveDataSet does not need to be fully populated, any fields
not set will be automatically set by the open thread border router.
Raises if the http status is 400 or higher or if the response is invalid.
"""
response = await self._session.put(
f"{self._url}/node/dataset/active",
json=dataset.as_json(),
timeout=aiohttp.ClientTimeout(total=self._timeout),
)
if response.status == HTTPStatus.CONFLICT:
raise ThreadNetworkActiveError
if response.status not in (HTTPStatus.CREATED, HTTPStatus.OK):
raise OTBRError(f"unexpected http status {response.status}")
async def delete_active_dataset(self) -> None:
"""Delete active operational dataset."""
response = await self._session.delete(
f"{self._url}/node/dataset/active",
timeout=aiohttp.ClientTimeout(total=self._timeout),
)
if response.status == HTTPStatus.CONFLICT:
raise ThreadNetworkActiveError
if response.status != HTTPStatus.OK:
raise OTBRError(f"unexpected http status {response.status}")
async def create_pending_dataset(self, dataset: PendingDataSet) -> None:
"""Create pending operational dataset.
The passed in PendingDataSet does not need to be fully populated, any fields
not set will be automatically set by the open thread border router.
Raises if the http status is 400 or higher or if the response is invalid.
"""
response = await self._session.put(
f"{self._url}/node/dataset/pending",
json=dataset.as_json(),
timeout=aiohttp.ClientTimeout(total=self._timeout),
)
if response.status == HTTPStatus.CONFLICT:
raise ThreadNetworkActiveError
if response.status not in (HTTPStatus.CREATED, HTTPStatus.OK):
raise OTBRError(f"unexpected http status {response.status}")
async def delete_pending_dataset(self) -> None:
"""Delete pending operational dataset."""
response = await self._session.delete(
f"{self._url}/node/dataset/pending",
timeout=aiohttp.ClientTimeout(total=self._timeout),
)
if response.status == HTTPStatus.CONFLICT:
raise ThreadNetworkActiveError
if response.status != HTTPStatus.OK:
raise OTBRError(f"unexpected http status {response.status}")
async def set_active_dataset_tlvs(self, dataset: bytes) -> None:
"""Set current active operational dataset.
Raises if the http status is 400 or higher or if the response is invalid.
"""
response = await self._session.put(
f"{self._url}/node/dataset/active",
data=dataset.hex(),
headers={"Content-Type": "text/plain"},
timeout=aiohttp.ClientTimeout(total=10),
)
if response.status == HTTPStatus.CONFLICT:
raise ThreadNetworkActiveError
if response.status not in (HTTPStatus.CREATED, HTTPStatus.OK):
raise OTBRError(f"unexpected http status {response.status}")
async def set_channel(
self, channel: int, delay: int = PENDING_DATASET_DELAY_TIMER
) -> None:
"""Change the channel
The channel is changed by creating a new pending dataset based on the active
dataset.
"""
if not 11 <= channel <= 26:
raise OTBRError(f"invalid channel {channel}")
if not (dataset := await self.get_active_dataset()):
raise OTBRError("router has no active dataset")
if dataset.active_timestamp and dataset.active_timestamp.seconds is not None:
dataset.active_timestamp.seconds += 1
else:
dataset.active_timestamp = Timestamp(False, 1, 0)
dataset.channel = channel
pending_dataset = PendingDataSet(active_dataset=dataset, delay=delay)
await self.create_pending_dataset(pending_dataset)
async def get_extended_address(self) -> bytes:
"""Get extended address (EUI-64).
Raises if the http status is not 200 or if the response is invalid.
"""
response = await self._session.get(
f"{self._url}/node/ext-address",
headers={"Accept": "application/json"},
timeout=aiohttp.ClientTimeout(total=self._timeout),
)
if response.status != HTTPStatus.OK:
raise OTBRError(f"unexpected http status {response.status}")
try:
return bytes.fromhex(await response.json())
except ValueError as exc:
raise OTBRError("unexpected API response") from exc
"""Utility function to decode fields in _meshcop._udp.local. mDNS services.
The implementation is based on the Open Thread implementation:
https://github.com/openthread/ot-br-posix/blob/8a8b2411abcf68659c25bb97672bdd2e5e724dcc/src/border_agent/border_agent.cpp#L109
"""
from dataclasses import dataclass
from enum import IntEnum
import bitstruct # type: ignore[import]
from typing_extensions import Self
class ConnectionMode(IntEnum):
"""Connection mode."""
DISABLED = 0
PSKC = 1
PSKD = 2
VENDOR = 3
X509 = 4
class ThreadInterfaceStatus(IntEnum):
"""Thread interface status."""
NOT_INITIALIZED = 0
INITIALIZED = 1
ACTIVE = 2
class Availability(IntEnum):
"""Availability."""
INFREQUENT = 0
HIGH = 1
STATE_BITMAP_FORMAT = "u23u1u1u2u2u3"
@dataclass
class StateBitmap:
"""State bitmap."""
connection_mode: ConnectionMode
thread_interface_status: ThreadInterfaceStatus
availability: Availability
is_active: bool
is_primary: bool
@classmethod
def from_bytes(cls, data: bytes) -> Self:
"""Decode from bytes."""
if len(data) != 4:
raise ValueError("Incorrect length")
(
padding,
is_primary,
is_active,
availability,
thread_if_status,
connection_mode,
) = bitstruct.unpack(STATE_BITMAP_FORMAT, data)
if padding != 0:
raise ValueError(f"Could not decode '{data.hex}'")
return cls(
connection_mode=ConnectionMode(connection_mode),
thread_interface_status=ThreadInterfaceStatus(thread_if_status),
availability=Availability(availability),
is_active=is_active,
is_primary=is_primary,
)
"""Data models."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import voluptuous as vol # type:ignore[import]
@dataclass
class Timestamp:
"""Timestamp."""
SCHEMA = vol.Schema(
{
vol.Optional("Authoritative"): bool,
vol.Optional("Seconds"): int,
vol.Optional("Ticks"): int,
}
)
authoritative: bool | None = None
seconds: int | None = None
ticks: int | None = None
def as_json(self) -> dict:
"""Serialize to JSON."""
result: dict[str, Any] = {}
if self.authoritative is not None:
result["Authoritative"] = self.authoritative
if self.seconds is not None:
result["Seconds"] = self.seconds
if self.ticks is not None:
result["Ticks"] = self.ticks
return result
@classmethod
def from_json(cls, json_data: Any) -> Timestamp:
"""Deserialize from JSON."""
cls.SCHEMA(json_data)
return cls(
json_data.get("Authoritative"),
json_data.get("Seconds"),
json_data.get("Ticks"),
)
@dataclass
class SecurityPolicy: # pylint: disable=too-many-instance-attributes
"""Security policy."""
SCHEMA = vol.Schema(
{
vol.Optional("AutonomousEnrollment"): bool,
vol.Optional("CommercialCommissioning"): bool,
vol.Optional("ExternalCommissioning"): bool,
vol.Optional("NativeCommissioning"): bool,
vol.Optional("NetworkKeyProvisioning"): bool,
vol.Optional("NonCcmRouters"): bool,
vol.Optional("ObtainNetworkKey"): bool,
vol.Optional("RotationTime"): int,
vol.Optional("Routers"): bool,
vol.Optional("TobleLink"): bool,
}
)
autonomous_enrollment: bool | None = None
commercial_commissioning: bool | None = None
external_commissioning: bool | None = None
native_commissioning: bool | None = None
network_key_provisioning: bool | None = None
non_ccm_routers: bool | None = None
obtain_network_key: bool | None = None
rotation_time: int | None = None
routers: bool | None = None
to_ble_link: bool | None = None
def as_json(self) -> dict:
"""Serialize to JSON."""
result: dict[str, Any] = {}
if self.autonomous_enrollment is not None:
result["AutonomousEnrollment"] = self.autonomous_enrollment
if self.commercial_commissioning is not None:
result["CommercialCommissioning"] = self.commercial_commissioning
if self.external_commissioning is not None:
result["ExternalCommissioning"] = self.external_commissioning
if self.native_commissioning is not None:
result["NativeCommissioning"] = self.native_commissioning
if self.network_key_provisioning is not None:
result["NetworkKeyProvisioning"] = self.network_key_provisioning
if self.non_ccm_routers is not None:
result["NonCcmRouters"] = self.non_ccm_routers
if self.obtain_network_key is not None:
result["ObtainNetworkKey"] = self.obtain_network_key
if self.rotation_time is not None:
result["RotationTime"] = self.rotation_time
if self.routers is not None:
result["Routers"] = self.routers
if self.to_ble_link is not None:
result["TobleLink"] = self.to_ble_link
return result
@classmethod
def from_json(cls, json_data: Any) -> SecurityPolicy:
"""Deserialize from JSON."""
cls.SCHEMA(json_data)
return cls(
json_data.get("AutonomousEnrollment"),
json_data.get("CommercialCommissioning"),
json_data.get("ExternalCommissioning"),
json_data.get("NativeCommissioning"),
json_data.get("NetworkKeyProvisioning"),
json_data.get("NonCcmRouters"),
json_data.get("ObtainNetworkKey"),
json_data.get("RotationTime"),
json_data.get("Routers"),
json_data.get("TobleLink"),
)
@dataclass
class ActiveDataSet: # pylint: disable=too-many-instance-attributes
"""Operational dataset."""
SCHEMA = vol.Schema(
{
vol.Optional("ActiveTimestamp"): dict,
vol.Optional("ChannelMask"): int,
vol.Optional("Channel"): int,
vol.Optional("ExtPanId"): str,
vol.Optional("MeshLocalPrefix"): str,
vol.Optional("NetworkKey"): str,
vol.Optional("NetworkName"): str,
vol.Optional("PanId"): int,
vol.Optional("PSKc"): str,
vol.Optional("SecurityPolicy"): dict,
}
)
active_timestamp: Timestamp | None = None
channel_mask: int | None = None
channel: int | None = None
extended_pan_id: str | None = None
mesh_local_prefix: str | None = None
network_key: str | None = None
network_name: str | None = None
pan_id: int | None = None
psk_c: str | None = None
security_policy: SecurityPolicy | None = None
def as_json(self) -> dict:
"""Serialize to JSON."""
result: dict[str, Any] = {}
if self.active_timestamp is not None:
result["ActiveTimestamp"] = self.active_timestamp.as_json()
if self.channel_mask is not None:
result["ChannelMask"] = self.channel_mask
if self.channel is not None:
result["Channel"] = self.channel
if self.extended_pan_id is not None:
result["ExtPanId"] = self.extended_pan_id
if self.mesh_local_prefix is not None:
result["MeshLocalPrefix"] = self.mesh_local_prefix
if self.network_key is not None:
result["NetworkKey"] = self.network_key
if self.network_name is not None:
result["NetworkName"] = self.network_name
if self.pan_id is not None:
result["PanId"] = self.pan_id
if self.psk_c is not None:
result["PSKc"] = self.psk_c
if self.security_policy is not None:
result["SecurityPolicy"] = self.security_policy.as_json()
return result
@classmethod
def from_json(cls, json_data: Any) -> ActiveDataSet:
"""Deserialize from JSON."""
cls.SCHEMA(json_data)
active_timestamp = None
security_policy = None
if "ActiveTimestamp" in json_data:
active_timestamp = Timestamp.from_json(json_data["ActiveTimestamp"])
if "SecurityPolicy" in json_data:
security_policy = SecurityPolicy.from_json(json_data["SecurityPolicy"])
return ActiveDataSet(
active_timestamp,
json_data.get("ChannelMask"),
json_data.get("Channel"),
json_data.get("ExtPanId"),
json_data.get("MeshLocalPrefix"),
json_data.get("NetworkKey"),
json_data.get("NetworkName"),
json_data.get("PanId"),
json_data.get("PSKc"),
security_policy,
)
@dataclass
class PendingDataSet: # pylint: disable=too-many-instance-attributes
"""Operational dataset."""
SCHEMA = vol.Schema(
{
vol.Optional("ActiveDataset"): dict,
vol.Optional("Delay"): int,
vol.Optional("PendingTimestamp"): dict,
}
)
active_dataset: ActiveDataSet | None = None
delay: int | None = None
pending_timestamp: Timestamp | None = None
def as_json(self) -> dict:
"""Serialize to JSON."""
result: dict[str, Any] = {}
if self.active_dataset is not None:
result["ActiveDataset"] = self.active_dataset.as_json()
if self.delay is not None:
result["Delay"] = self.delay
if self.pending_timestamp is not None:
result["PendingTimestamp"] = self.pending_timestamp.as_json()
return result
@classmethod
def from_json(cls, json_data: Any) -> PendingDataSet:
"""Deserialize from JSON."""
cls.SCHEMA(json_data)
active_dataset = None
pending_timestamp = None
if "ActiveDataset" in json_data:
active_dataset = ActiveDataSet.from_json(json_data["ActiveDataset"])
if "PendingTimestamp" in json_data:
pending_timestamp = Timestamp.from_json(json_data["PendingTimestamp"])
return PendingDataSet(
active_dataset,
json_data.get("Delay"),
pending_timestamp,
)
"""Calculate Thread PSKc.
Based on https://github.com/openthread/ot-br-posix/blob/main/src/utils/pskc.cpp
"""
import struct
from cryptography.hazmat.primitives import cmac
from cryptography.hazmat.primitives.ciphers import algorithms
AES_128_KEY_LEN = 16
ITERATION_COUNTS = 16384
BLKSIZE = 16
SALT_PREFIX = "Thread".encode()
def _derive_key(passphrase: str) -> bytes:
"""Derive key from passphrase according to RFC 4615."""
passphrase_bytes = passphrase.encode()
if len(passphrase_bytes) == AES_128_KEY_LEN:
return passphrase_bytes
c = cmac.CMAC(algorithms.AES128(b"\0" * AES_128_KEY_LEN))
c.update(passphrase_bytes)
return c.finalize()
def compute_pskc(ext_pan_id: bytes, network_name: str, passphrase: str) -> bytes:
"""Compute Thread PSKc."""
salt = SALT_PREFIX + ext_pan_id + network_name.encode()
key = _derive_key(passphrase)
block_counter = 1
prf_input = salt + struct.pack("!L", block_counter)
# Calculate U_1
c = cmac.CMAC(algorithms.AES128(key))
c.update(prf_input)
prf_output = c.finalize()
pskc = bytearray(prf_output)
for _ in range(ITERATION_COUNTS - 1):
prf_input = prf_output
# Calculate U_i
c = cmac.CMAC(algorithms.AES128(key))
c.update(prf_input)
prf_output = c.finalize()
# xor
for i in range(BLKSIZE):
pskc[i] ^= prf_output[i]
return pskc
"""Parse datasets TLV encoded as specified by Thread."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import IntEnum
import struct
class TLVError(Exception):
"""TLV error."""
class MeshcopTLVType(IntEnum):
"""Types."""
CHANNEL = 0
PANID = 1
EXTPANID = 2
NETWORKNAME = 3
PSKC = 4
NETWORKKEY = 5
NETWORK_KEY_SEQUENCE = 6
MESHLOCALPREFIX = 7
STEERING_DATA = 8
BORDER_AGENT_RLOC = 9
COMMISSIONER_ID = 10
COMM_SESSION_ID = 11
SECURITYPOLICY = 12
GET = 13
ACTIVETIMESTAMP = 14
COMMISSIONER_UDP_PORT = 15
STATE = 16
JOINER_DTLS = 17
JOINER_UDP_PORT = 18
JOINER_IID = 19
JOINER_RLOC = 20
JOINER_ROUTER_KEK = 21
PROVISIONING_URL = 32
VENDOR_NAME_TLV = 33
VENDOR_MODEL_TLV = 34
VENDOR_SW_VERSION_TLV = 35
VENDOR_DATA_TLV = 36
VENDOR_STACK_VERSION_TLV = 37
UDP_ENCAPSULATION_TLV = 48
IPV6_ADDRESS_TLV = 49
PENDINGTIMESTAMP = 51
DELAYTIMER = 52
CHANNELMASK = 53
COUNT = 54
PERIOD = 55
SCAN_DURATION = 56
ENERGY_LIST = 57
# Seen in a dataset imported through iOS companion app
APPLE_TAG_UNKNOWN = 74
DISCOVERYREQUEST = 128
DISCOVERYRESPONSE = 129
JOINERADVERTISEMENT = 241
@dataclass
class MeshcopTLVItem:
"""Base class for TLV items."""
tag: int
data: bytes
def __str__(self) -> str:
"""Return a string representation."""
return self.data.hex()
@dataclass
class Channel(MeshcopTLVItem):
"""Channel."""
channel: int = field(init=False)
def __post_init__(self) -> None:
"""Decode the channel."""
self.channel = int.from_bytes(self.data, "big")
if not self.channel:
raise TLVError(f"invalid channel '{self.channel}'")
@dataclass
class NetworkName(MeshcopTLVItem):
"""Network name."""
name: str = field(init=False)
def __post_init__(self) -> None:
"""Decode the name."""
try:
self.name = self.data.decode()
except UnicodeDecodeError as err:
raise TLVError(f"invalid network name '{self.data.hex()}'") from err
def __str__(self) -> str:
return self.name
@dataclass
class Timestamp(MeshcopTLVItem):
"""Timestamp."""
authoritative: bool = field(init=False)
seconds: int = field(init=False)
ticks: int = field(init=False)
def __post_init__(self) -> None:
"""Decode the timestamp."""
# The timestamps are packed in 8 bytes:
# [seconds 48 bits][ticks 15 bits][authoritative flag 1 bit]
unpacked: int = struct.unpack("!Q", self.data)[0]
self.authoritative = bool(unpacked & 1)
self.seconds = unpacked >> 16
self.ticks = (unpacked >> 1) & 0x7FF
def _encode_item(item: MeshcopTLVItem) -> bytes:
"""Encode a dataset item to TLV format."""
data_len = len(item.data)
return struct.pack(f"!BB{data_len}s", item.tag, data_len, item.data)
def encode_tlv(items: dict[MeshcopTLVType, MeshcopTLVItem]) -> str:
"""Encode a TLV encoded dataset to a hex string.
Raises if the TLV is invalid.
"""
result = b""
for item in items.values():
result += _encode_item(item)
return result.hex()
def _parse_item(tag: MeshcopTLVType, data: bytes) -> MeshcopTLVItem:
"""Parse a TLV encoded dataset item."""
if tag == MeshcopTLVType.ACTIVETIMESTAMP:
return Timestamp(tag, data)
if tag == MeshcopTLVType.CHANNEL:
return Channel(tag, data)
if tag == MeshcopTLVType.NETWORKNAME:
return NetworkName(tag, data)
return MeshcopTLVItem(tag, data)
def parse_tlv(data: str) -> dict[MeshcopTLVType, MeshcopTLVItem]:
"""Parse a TLV encoded dataset.
Raises if the TLV is invalid.
"""
try:
data_bytes = bytes.fromhex(data)
except ValueError as err:
raise TLVError("invalid tlvs") from err
result = {}
pos = 0
while pos < len(data_bytes):
try:
tag = MeshcopTLVType(data_bytes[pos])
except ValueError as err:
raise TLVError(f"unknown type {data_bytes[pos]}") from err
pos += 1
_len = data_bytes[pos]
pos += 1
val = data_bytes[pos : pos + _len]
if len(val) < _len:
raise TLVError(f"expected {_len} bytes for {tag.name}, got {len(val)}")
pos += _len
if tag in result:
raise TLVError(f"duplicated tag {tag.name}")
result[tag] = _parse_item(tag, val)
return result
black==23.12.1
flake8==7.0.0
mypy==1.8.0
pylint==3.0.3
pytest-asyncio==0.23.4
pytest==7.4.4
[flake8]
# To work with Black
max-line-length = 88
# E203: Whitespace before ':'
extend-ignore = E203
"""Tests."""
"""Test fixtures."""
from collections.abc import Generator
import pytest
from tests.test_util.aiohttp import AiohttpClientMocker, mock_aiohttp_client
@pytest.fixture
def aioclient_mock() -> Generator[AiohttpClientMocker, None, None]:
"""Fixture to mock aioclient calls."""
with mock_aiohttp_client() as mock_session:
yield mock_session
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment