当前位置: 移动技术网 > IT编程>脚本编程>Python > 在 Python 中使用 MQTT的方法

在 Python 中使用 MQTT的方法

2020年08月19日  | 移动技术网IT编程  | 我要评论
python 是一种广泛使用的解释型、高级编程、通用型编程语言。python 的设计哲学强调代码的可读性和简洁的语法(尤其是使用空格缩进划分代码块,而非使用大括号或者关键词)。python 让开发者能

python 是一种广泛使用的解释型、高级编程、通用型编程语言。python 的设计哲学强调代码的可读性和简洁的语法(尤其是使用空格缩进划分代码块,而非使用大括号或者关键词)。python 让开发者能够用更少的代码表达想法,不管是小型还是大型程序,该语言都试图让程序的结构清晰明了。

mqtt 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。

本文主要介绍如何在 python 项目中使用 paho-mqtt 客户端库 ,实现客户端与 mqtt 服务器的连接、订阅、取消订阅、收发消息等功能。

项目初始化

本项目使用 python 3.6 进行开发测试,读者可用如下命令确认 python 的版本。

➜ ~ python3 --version    
python 3.6.7

选择 mqtt 客户端库

paho-mqtt 是目前 python 中使用较多的 mqtt 客户端库,它在 python 2.7 或 3.x 上为客户端类提供了对 mqtt v3.1 和 v3.1.1 的支持。它还提供了一些帮助程序功能,使将消息发布到 mqtt 服务器变得非常简单。

pip 安装 paho mqtt 客户端

pip 是 python 包管理工具,该工具提供了对 python 包的查找、下载、安装、卸载的功能。

pip3 install -i https://pypi.doubanio.com/simple paho-mqtt

python mqtt 使用

连接 mqtt 服务器

本文将使用 emq x 提供的 免费公共 mqtt 服务器 ,该服务基于 emq x 的 mqtt 物联网云平台 创建。服务器接入信息如下:

  • broker: broker.emqx.io
  • tcp port: 1883
  • websocket port: 8083

导入 paho mqtt客户端

from paho.mqtt import client as mqtt_client

设置 mqtt broker 连接参数

设置 mqtt broker 连接地址,端口以及 topic,同时我们调用 python random.randint 函数随机生成 mqtt 客户端 id。

broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
client_id = f'python-mqtt-{random.randint(0, 1000)}'

编写 mqtt 连接函数

编写连接回调函数 on_connect ,该函数将在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。通常同时我们将创建一个 mqtt 客户端,该客户端将连接到 broker.emqx.io 。

def connect_mqtt():
 def on_connect(client, userdata, flags, rc):
  if rc == 0:
   print("connected to mqtt broker!")
  else:
   print("failed to connect, return code %d\n", rc)
 # set connecting client id
 client = mqtt_client.client(client_id)
 client.on_connect = on_connect
 client.connect(broker, port)
 return client

发布消息

首先定义一个 while 循环语句,在循环中我们将设置每秒调用 mqtt 客户端 publish 函数向 /python/mqtt 主题发送消息。

def publish(client):
  msg_count = 0
  while true:
   time.sleep(1)
   msg = f"messages: {msg_count}"
   result = client.publish(topic, msg)
   # result: [0, 1]
   status = result[0]
   if status == 0:
    print(f"send `{msg}` to topic `{topic}`")
   else:
    print(f"failed to send message to topic {topic}")
   msg_count += 1

订阅消息

编写消息回调函数 on_message ,该函数将在客户端从 mqtt broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。

def subscribe(client: mqtt_client):
 def on_message(client, userdata, msg):
  print(f"received `{msg.payload.decode()}` from `{msg.topic}` topic")

 client.subscribe(topic)
 client.on_message = on_message

完整代码

消息发布代码

# python 3.6

import random
import time

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client id with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'


def connect_mqtt():
 def on_connect(client, userdata, flags, rc):
  if rc == 0:
   print("connected to mqtt broker!")
  else:
   print("failed to connect, return code %d\n", rc)

 client = mqtt_client.client(client_id)
 client.on_connect = on_connect
 client.connect(broker, port)
 return client


def publish(client):
 msg_count = 0
 while true:
  time.sleep(1)
  msg = f"messages: {msg_count}"
  result = client.publish(topic, msg)
  # result: [0, 1]
  status = result[0]
  if status == 0:
   print(f"send `{msg}` to topic `{topic}`")
  else:
   print(f"failed to send message to topic {topic}")
  msg_count += 1


def run():
 client = connect_mqtt()
 client.loop_start()
 publish(client)


if __name__ == '__main__':
 run()

消息订阅代码

# python 3.6

import random
import time

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client id with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'


def connect_mqtt():
 def on_connect(client, userdata, flags, rc):
  if rc == 0:
   print("connected to mqtt broker!")
  else:
   print("failed to connect, return code %d\n", rc)

 client = mqtt_client.client(client_id)
 client.on_connect = on_connect
 client.connect(broker, port)
 return client


def publish(client):
 msg_count = 0
 while true:
  time.sleep(1)
  msg = f"messages: {msg_count}"
  result = client.publish(topic, msg)
  # result: [0, 1]
  status = result[0]
  if status == 0:
   print(f"send `{msg}` to topic `{topic}`")
  else:
   print(f"failed to send message to topic {topic}")
  msg_count += 1


def run():
 client = connect_mqtt()
 client.loop_start()
 publish(client)


if __name__ == '__main__':
 run()
消息订阅代码
# python3.6

import random

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client id with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'


def connect_mqtt() -> mqtt_client:
 def on_connect(client, userdata, flags, rc):
  if rc == 0:
   print("connected to mqtt broker!")
  else:
   print("failed to connect, return code %d\n", rc)

 client = mqtt_client.client(client_id)
 client.on_connect = on_connect
 client.connect(broker, port)
 return client


def subscribe(client: mqtt_client):
 def on_message(client, userdata, msg):
  print(f"received `{msg.payload.decode()}` from `{msg.topic}` topic")

 client.subscribe(topic)
 client.on_message = on_message


def run():
 client = connect_mqtt()
 subscribe(client)
 client.loop_forever()


if __name__ == '__main__':
 run()

测试

消息发布

运行 mqtt 消息发布代码,我们将看到客户端连接成功,并且成功将消息发布。

python3 pub.py

消息订阅

运行 mqtt 消息订阅代码,我们将看到客户端连接成功,并且成功接收到发布的消息。

python3 sub.py

总结

至此,我们完成了使用 paho-mqtt 客户端连接到 公共 mqtt 服务器 ,并实现了测试客户端与 mqtt 服务器的连接、消息发布和订阅。

与 c ++ 或 java 之类的高级语言不同,python 比较适合设备侧的业务逻辑实现,使用 python 您可以减少代码上的逻辑复杂度,降低与设备的交互成本。我们相信在物联网领域 python 将会有更广泛的应用。

接下来我们将会陆续发布更多关于物联网开发及 python 的相关文章,敬请关注。

以上就是在 python 中使用 mqtt的方法的详细内容,更多关于python 中使用 mqtt的资料请关注移动技术网其它相关文章!

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网