Skip to content

Modem

Module: modem
Source: modules/modem.py
Depends on: nrf9151 (C extension module)

The Modem class wraps the nrf9151 C module to provide AT-command communication with the onboard nRF9151 modem over UART1. It implements the singleton pattern — only one instance exists at runtime.

All methods are blocking: each call occupies the thread until the modem replies or the timeout expires. Use ModemAsync instead if your application runs a uasyncio event loop.

from modem import Modem
m = Modem()

Initialises the modem (nrf9151.init()) and flushes up to 256 bytes from the RX buffer. Subsequent calls return the same instance.


m.send(data)

Send raw bytes to the modem without waiting for a response.

ParameterTypeDescription
databytes or strRaw data to write to UART1.

m.read(num_bytes)

Read raw bytes from the modem.

ParameterTypeDescription
num_bytesintNumber of bytes to read. Must be > 0.

Returns bytes. Raises ValueError if num_bytes is not a positive integer.


m.wait_response(expected="OK", num_bytes=256, timeout_ms=1000)

Poll the modem until expected appears in the accumulated response, "ERROR" is received, or the timeout expires. Blocks the current thread.

ParameterTypeDefaultDescription
expectedstr"OK"String to wait for in the response.
num_bytesint256Max bytes read per iteration.
timeout_msint1000Timeout in milliseconds.

Returns the full response str, "ERROR", or a timeout message.
Raises ValueError if timeout_ms or num_bytes ≤ 0.


m.send_cmd(command, expected="OK", num_bytes=256, timeout_ms=1000, is_bool=True)

Send an AT command (appends \r\n automatically) and wait for a response.

ParameterTypeDefaultDescription
commandstrAT command string — \r\n is appended automatically.
expectedstr"OK"Substring to look for in the modem response.
num_bytesint256Max bytes read per iteration inside wait_response.
timeout_msint1000Timeout in milliseconds.
is_boolboolTrueTrue → returns bool; False → returns raw response str.
ok = m.send_cmd("AT") # True if response contains "OK"
ver = m.send_cmd("AT+CGMR", is_bool=False) # raw string, e.g. "mfw_nrf9151_1.0.0\r\nOK"

m.CFUN(mode)

Set the modem’s radio functionality level via AT+CFUN.

ParameterTypeDescription
modeintOne of: 0, 1, 2, 4, 20, 21, 30, 31, 40, 41, 44.

Returns True on success, False otherwise. Raises ValueError for invalid modes.
Uses a 5-second timeout internally.

ModeMeaning
0Minimum functionality (RF off)
1Full functionality
4Flight mode
2, 20–44Nordic-specific power states

Nordic reference: MQTT client AT commands — nCS 3.0.0

m.mqtt_cfg(client_id, keep_alive=60, clean_session=0)

Configure the MQTT client (AT#XMQTTCFG).

ParameterTypeDefaultDescription
client_idstrMQTT client identifier.
keep_aliveint60Keep-alive interval in seconds. Must be > 0.
clean_sessionint00 = persistent session, 1 = clean session.

Returns True on success. Raises ValueError for invalid parameters.


m.get_mqtt_cfg()

Read the current MQTT configuration (AT#XMQTTCFG?).

Returns the raw #XMQTTCFG: … response str, or an empty string on error.


m.mqtt_conn(op, username, password, url, port, sec_tag=None)

Connect or disconnect from an MQTT broker (AT#XMQTTCONN).

ParameterTypeDescription
opint0 = disconnect, 1 = connect IPv4, 2 = connect IPv6.
usernamestrMQTT username.
passwordstrMQTT password.
urlstrBroker hostname or IP.
portintTCP port (1–65535).
sec_tagintOptional TLS security tag. Omit for unencrypted connections.

Returns True on success. Uses a 5-second timeout internally.


m.is_mqtt_conn()

Query the broker connection status (AT#XMQTTCON?).

Returns True if the modem is currently connected to an MQTT broker, False otherwise.


m.mqtt_publish(topic, msg, qos=0, retain=0)

Publish a message (AT#XMQTTPUB).

ParameterTypeDefaultDescription
topicstrMQTT topic string.
msgstrMessage payload.
qosint0QoS level: 0, 1, or 2.
retainint0Retain flag: 0 or 1.

Returns True on success. Uses a 5-second timeout internally.


m.mqtt_subscribe(topic, qos=0)

Subscribe to a topic (AT#XMQTTSUB).

ParameterTypeDefaultDescription
topicstrMQTT topic filter (wildcards supported).
qosint0QoS level: 0, 1, or 2.

Returns True on success. Uses a 5-second timeout internally.


The modem stores credentials (certificates, keys, PSKs) in its internal secure storage using the AT%CMNG command set. Each credential is identified by a sec_tag (a numeric identifier) and a cert_type (the kind of credential stored under that sec_tag).

Nordic reference: AT%CMNG — Security credentials management

cert_typeMeaningReadable
0Root CA certificateyes
1Client certificateno
2Client private keyno
3Pre-shared key (PSK)no
4PSK identityyes
5Public keyyes
6Device identity public keyyes
7Reserved
8Endorsement keyyes
9Ownership keyyes
10Nordic identity root CAyes
11Nordic base public keyyes
13Asset encryption keyyes

sec_tag must be an integer in the range 0 – 2 552 147 483 647. The same sec_tag can hold multiple credential types.


m.write_certificate(sec_tag, cert_type, content, psw=None, sha256=None)

Store a credential in the modem’s secure storage (AT%CMNG=0).

ParameterTypeDefaultDescription
sec_tagintSecurity tag (0 – 2 552 147 483 647).
cert_typeintCredential type (see table above).
contentstrPEM-encoded certificate, key, or raw PSK string.
pswstrNoneOptional password (e.g. for PKCS#12 bundles).
sha256strNoneOptional SHA-256 hash for integrity verification.

Returns True on success, False otherwise. Uses a 5-second timeout internally.
Raises ValueError for invalid sec_tag, cert_type, or parameter types.

m.write_certificate(1, 0, ROOT_CA) # Root CA, sec_tag=1, cert_type=0
m.write_certificate(1, 1, CLIENT_CERT) # Client cert, sec_tag=1, cert_type=1
m.write_certificate(1, 2, CLIENT_KEY) # Client priv. key, sec_tag=1, cert_type=2

m.list_certificate(sec_tag, cert_type=None)

List credentials stored under a given sec_tag (AT%CMNG=1).

ParameterTypeDefaultDescription
sec_tagintSecurity tag to query.
cert_typeintNoneIf provided, filter by this credential type.

Returns the raw modem response str on success (e.g. %CMNG: 1,0,"<sha256>"), or an empty string on failure.
Raises ValueError for invalid parameters.


m.read_certificate(sec_tag, cert_type)

Read the content of a specific credential from secure storage (AT%CMNG=2).

ParameterTypeDescription
sec_tagintSecurity tag to read from.
cert_typeintCredential type to retrieve.

Returns the raw modem response str containing the credential content, or an empty string on failure.


m.delete_certificate(sec_tag, cert_type)

Delete a specific credential from the modem’s secure storage (AT%CMNG=3).

ParameterTypeDescription
sec_tagintSecurity tag of the credential to delete.
cert_typeintCredential type to remove.

Returns True on success, False otherwise. Uses a 5-second timeout internally.


from modem import Modem
m = Modem()
m.CFUN(4) # flight mode while provisioning
ROOT_CA = "-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----"
CLIENT_CERT = "-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----"
CLIENT_KEY = "-----BEGIN RSA PRIVATE KEY-----\n...\n-----END RSA PRIVATE KEY-----"
m.write_certificate(1, 0, ROOT_CA)
m.write_certificate(1, 1, CLIENT_CERT)
m.write_certificate(1, 2, CLIENT_KEY)
m.CFUN(1)
m.mqtt_cfg("my-device-id")
m.mqtt_conn(1, "", "", "broker.example.com", 8883, sec_tag=1)