Modem
Module: modem
Source: modules/modem.py
Depends on: nrf9151 (C extension module)
The Modem class wraps the nrf9151 C module to provide AT-command communication with the onboard nRF9151 modem over UART1. It implements the singleton pattern — only one instance exists at runtime.
All methods are blocking: each call occupies the thread until the modem replies or the timeout expires. Use ModemAsync instead if your application runs a uasyncio event loop.
from modem import Modem
m = Modem()Modem()
Section titled “Modem()”Initialises the modem (nrf9151.init()) and flushes up to 256 bytes from the RX buffer. Subsequent calls return the same instance.
Low-level I/O
Section titled “Low-level I/O”m.send(data)Send raw bytes to the modem without waiting for a response.
| Parameter | Type | Description |
|---|---|---|
data | bytes or str | Raw data to write to UART1. |
m.read(num_bytes)Read raw bytes from the modem.
| Parameter | Type | Description |
|---|---|---|
num_bytes | int | Number of bytes to read. Must be > 0. |
Returns bytes. Raises ValueError if num_bytes is not a positive integer.
wait_response
Section titled “wait_response”m.wait_response(expected="OK", num_bytes=256, timeout_ms=1000)Poll the modem until expected appears in the accumulated response, "ERROR" is received, or the timeout expires. Blocks the current thread.
| Parameter | Type | Default | Description |
|---|---|---|---|
expected | str | "OK" | String to wait for in the response. |
num_bytes | int | 256 | Max bytes read per iteration. |
timeout_ms | int | 1000 | Timeout in milliseconds. |
Returns the full response str, "ERROR", or a timeout message.
Raises ValueError if timeout_ms or num_bytes ≤ 0.
send_cmd
Section titled “send_cmd”m.send_cmd(command, expected="OK", num_bytes=256, timeout_ms=1000, is_bool=True)Send an AT command (appends \r\n automatically) and wait for a response.
| Parameter | Type | Default | Description |
|---|---|---|---|
command | str | — | AT command string — \r\n is appended automatically. |
expected | str | "OK" | Substring to look for in the modem response. |
num_bytes | int | 256 | Max bytes read per iteration inside wait_response. |
timeout_ms | int | 1000 | Timeout in milliseconds. |
is_bool | bool | True | True → returns bool; False → returns raw response str. |
ok = m.send_cmd("AT") # True if response contains "OK"ver = m.send_cmd("AT+CGMR", is_bool=False) # raw string, e.g. "mfw_nrf9151_1.0.0\r\nOK"Modem control
Section titled “Modem control”m.CFUN(mode)Set the modem’s radio functionality level via AT+CFUN.
| Parameter | Type | Description |
|---|---|---|
mode | int | One of: 0, 1, 2, 4, 20, 21, 30, 31, 40, 41, 44. |
Returns True on success, False otherwise. Raises ValueError for invalid modes.
Uses a 5-second timeout internally.
| Mode | Meaning |
|---|---|
0 | Minimum functionality (RF off) |
1 | Full functionality |
4 | Flight mode |
2, 20–44 | Nordic-specific power states |
Nordic reference: MQTT client AT commands — nCS 3.0.0
mqtt_cfg
Section titled “mqtt_cfg”m.mqtt_cfg(client_id, keep_alive=60, clean_session=0)Configure the MQTT client (AT#XMQTTCFG).
| Parameter | Type | Default | Description |
|---|---|---|---|
client_id | str | — | MQTT client identifier. |
keep_alive | int | 60 | Keep-alive interval in seconds. Must be > 0. |
clean_session | int | 0 | 0 = persistent session, 1 = clean session. |
Returns True on success. Raises ValueError for invalid parameters.
get_mqtt_cfg
Section titled “get_mqtt_cfg”m.get_mqtt_cfg()Read the current MQTT configuration (AT#XMQTTCFG?).
Returns the raw #XMQTTCFG: … response str, or an empty string on error.
mqtt_conn
Section titled “mqtt_conn”m.mqtt_conn(op, username, password, url, port, sec_tag=None)Connect or disconnect from an MQTT broker (AT#XMQTTCONN).
| Parameter | Type | Description |
|---|---|---|
op | int | 0 = disconnect, 1 = connect IPv4, 2 = connect IPv6. |
username | str | MQTT username. |
password | str | MQTT password. |
url | str | Broker hostname or IP. |
port | int | TCP port (1–65535). |
sec_tag | int | Optional TLS security tag. Omit for unencrypted connections. |
Returns True on success. Uses a 5-second timeout internally.
is_mqtt_conn
Section titled “is_mqtt_conn”m.is_mqtt_conn()Query the broker connection status (AT#XMQTTCON?).
Returns True if the modem is currently connected to an MQTT broker, False otherwise.
mqtt_publish
Section titled “mqtt_publish”m.mqtt_publish(topic, msg, qos=0, retain=0)Publish a message (AT#XMQTTPUB).
| Parameter | Type | Default | Description |
|---|---|---|---|
topic | str | — | MQTT topic string. |
msg | str | — | Message payload. |
qos | int | 0 | QoS level: 0, 1, or 2. |
retain | int | 0 | Retain flag: 0 or 1. |
Returns True on success. Uses a 5-second timeout internally.
mqtt_subscribe
Section titled “mqtt_subscribe”m.mqtt_subscribe(topic, qos=0)Subscribe to a topic (AT#XMQTTSUB).
| Parameter | Type | Default | Description |
|---|---|---|---|
topic | str | — | MQTT topic filter (wildcards supported). |
qos | int | 0 | QoS level: 0, 1, or 2. |
Returns True on success. Uses a 5-second timeout internally.
Certificate management
Section titled “Certificate management”The modem stores credentials (certificates, keys, PSKs) in its internal secure storage using the AT%CMNG command set. Each credential is identified by a sec_tag (a numeric identifier) and a cert_type (the kind of credential stored under that sec_tag).
Nordic reference:
AT%CMNG— Security credentials management
Credential types
Section titled “Credential types”cert_type | Meaning | Readable |
|---|---|---|
0 | Root CA certificate | yes |
1 | Client certificate | no |
2 | Client private key | no |
3 | Pre-shared key (PSK) | no |
4 | PSK identity | yes |
5 | Public key | yes |
6 | Device identity public key | yes |
7 | Reserved | — |
8 | Endorsement key | yes |
9 | Ownership key | yes |
10 | Nordic identity root CA | yes |
11 | Nordic base public key | yes |
13 | Asset encryption key | yes |
sec_tag must be an integer in the range 0 – 2 552 147 483 647. The same sec_tag can hold multiple credential types.
write_certificate
Section titled “write_certificate”m.write_certificate(sec_tag, cert_type, content, psw=None, sha256=None)Store a credential in the modem’s secure storage (AT%CMNG=0).
| Parameter | Type | Default | Description |
|---|---|---|---|
sec_tag | int | — | Security tag (0 – 2 552 147 483 647). |
cert_type | int | — | Credential type (see table above). |
content | str | — | PEM-encoded certificate, key, or raw PSK string. |
psw | str | None | Optional password (e.g. for PKCS#12 bundles). |
sha256 | str | None | Optional SHA-256 hash for integrity verification. |
Returns True on success, False otherwise. Uses a 5-second timeout internally.
Raises ValueError for invalid sec_tag, cert_type, or parameter types.
m.write_certificate(1, 0, ROOT_CA) # Root CA, sec_tag=1, cert_type=0m.write_certificate(1, 1, CLIENT_CERT) # Client cert, sec_tag=1, cert_type=1m.write_certificate(1, 2, CLIENT_KEY) # Client priv. key, sec_tag=1, cert_type=2list_certificate
Section titled “list_certificate”m.list_certificate(sec_tag, cert_type=None)List credentials stored under a given sec_tag (AT%CMNG=1).
| Parameter | Type | Default | Description |
|---|---|---|---|
sec_tag | int | — | Security tag to query. |
cert_type | int | None | If provided, filter by this credential type. |
Returns the raw modem response str on success (e.g. %CMNG: 1,0,"<sha256>"), or an empty string on failure.
Raises ValueError for invalid parameters.
read_certificate
Section titled “read_certificate”m.read_certificate(sec_tag, cert_type)Read the content of a specific credential from secure storage (AT%CMNG=2).
| Parameter | Type | Description |
|---|---|---|
sec_tag | int | Security tag to read from. |
cert_type | int | Credential type to retrieve. |
Returns the raw modem response str containing the credential content, or an empty string on failure.
delete_certificate
Section titled “delete_certificate”m.delete_certificate(sec_tag, cert_type)Delete a specific credential from the modem’s secure storage (AT%CMNG=3).
| Parameter | Type | Description |
|---|---|---|
sec_tag | int | Security tag of the credential to delete. |
cert_type | int | Credential type to remove. |
Returns True on success, False otherwise. Uses a 5-second timeout internally.
TLS-secured MQTT example
Section titled “TLS-secured MQTT example”from modem import Modem
m = Modem()m.CFUN(4) # flight mode while provisioning
ROOT_CA = "-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----"CLIENT_CERT = "-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----"CLIENT_KEY = "-----BEGIN RSA PRIVATE KEY-----\n...\n-----END RSA PRIVATE KEY-----"
m.write_certificate(1, 0, ROOT_CA)m.write_certificate(1, 1, CLIENT_CERT)m.write_certificate(1, 2, CLIENT_KEY)
m.CFUN(1)m.mqtt_cfg("my-device-id")m.mqtt_conn(1, "", "", "broker.example.com", 8883, sec_tag=1)