Работа с очередью MQTT в Python

Заметка для начинающих. Хочу показать пример работы с MQTT на Python.
Я использую MQTT в реле, которое управляет световыми приборами в доме. Реле подписывается на топик и ждет команды. В зависимости от поступившей команды, реле подает напряжение на приборы.

Импортируем класс и создаем инстанс

Тут я сразу создаю флажок connected_flag. Он мне будет нужен для работы с коллбэками.

import paho.mqtt.client as mqtt

mqttc = mqtt.Client('relay_listener')
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
# Uncomment to enable debug messages
#mqttc.on_log = on_log
# Признак успешного соединения
mqttc.connected_flag=False

Подключаемся к серверу

Теперь самое интересное. Я использую loop_start() для того, чтобы mqtt клиент начал делать попытки подключения. Как только подключение установится, сработает событие on_connect, в котором я поставлю флажок connected_flag.

Первая распространенная ошибка — делать только одну попытку подключения. Нет никакой гарантии что связь с сервером будет в момент первой попытки соединения. Мы же работаем с «домашним» сетевым оборудованием. В итоге скрипт периодически будет вылетать с ошибкой при запуске и заметим мы это не сразу.

Сразу после loop_stop() я снимаю флаг mqttc._thread_terminate. Думаю, это баг текущей версии компонента MQTT. Если этого не сделать, то перестанет работать loop_forever().

loop_forever() подвешивает основной поток навечно в режиме ожидания.

mqttc.loop_start()
mqttc.connect(mqtt_credentials_from_file_dict.get("localnet").get("host"), 1883, 60)

# Попытки соединения
while not mqttc.connected_flag: #wait in loop
	time.sleep(1)

mqttc.loop_stop()
mqttc._thread_terminate = False

mqttc.loop_forever()

Подписываемся на топик

Вторая распространенная ошибка. Посмотрим на on_connect.
При обрыве соединения с сервером важно каждый раз вызывать subscribe. Если сделать subscribe только в момент первого соединения, то клиент забудет название канала, на который подписан.

def on_connect(mqttc, obj, flags, rc):
	if rc == 0:
		mqttc.connected_flag=True
		res = mqttc.subscribe(listen_topic, 2)
		if res[0] != mqtt.MQTT_ERR_SUCCESS:
			logging.info(str(datetime.datetime.now()) + " The client is not subscribed")

	if rc == 1:
		logging.info(str(datetime.datetime.now()) +  " Connection failed: incorrect protocol version")
	if rc == 2:
		logging.info(str(datetime.datetime.now()) +  " Connection failed: invalid client identifier")
	if rc == 3:
		logging.info(str(datetime.datetime.now()) +  " Connection failed: server unavailable")
	if rc == 4:
		logging.info(str(datetime.datetime.now()) +  " Connection failed: bad app_id or access_key")
	if rc == 5:
		logging.info(str(datetime.datetime.now()) +  " Connection failed: not authorised")

Получаем и обрабатываем сообщение

Тут вроде бы ничего особеного. Но есть третья распространенная ошибка. Не забывайте, что Python может работать с несколькими потоками. И если мы будем обращаться к одному и тому-же ресурсу одновременно, то получим ошибку. Так что добавляем Lock() или аналог при необходимости.

def on_message(mqttc, obj, msg):
	print(str(datetime.datetime.now()) +  " Recieved data. Topic: "+msg.topic+". Msg:"+str(msg.payload))
Поделиться
Отправить
 73   2020   MQTT   python   умный дом
Популярное