目录
效果图mqtt发布mqtt订阅matplotlib绘制动态图matplotlib绘制mqtt数据实时图像
效果图
mqtt发布
本代码中publish
是一个死循环,数据一直往外发送。
import random import time from paho.mqtt import client as mqtt_client import json from datetime import datetime broker = \'broker.emqx.io\' port = 1883 topic = \"/python/mqtt/li\" client_id = f\'python-mqtt-{random.randint(0, 1000)}\' # 随机生成客户端id def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print(\"Connected to MQTT Broker!\") else: print(\"Failed to connect, return code %d\\n\", rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client def publish(client): while True: time.sleep(0.01) msg = json.dumps({\"MAC\": \"0123456789\", \"samplerate\": 12, \"sampletime\": str(datetime.utcnow().strftime(\'%Y/%m/%d-%H:%M:%S.%f\')[:-3]), \"battery\": 0.5, \"acc\": [ [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], ]}) result = client.publish(topic, msg) status = result[0] if status == 0: print(f\"Send `{msg}` to topic `{topic}`\") else: print(f\"Failed to send message to topic {topic}\") def run(): client = connect_mqtt() client.loop_start() publish(client) if __name__ == \'__main__\': run()
mqtt订阅
from paho.mqtt import client as mqtt_client import time import os broker = \'broker.emqx.io\' port = 1883 topic = \"/python/mqtt/li\" def connect_mqtt(client_id): \"\"\" MQTT 连接函数。 \"\"\" def on_connect(client, userdata, flags, rc): \"\"\" 连接回调函数 在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。 \"\"\" if rc == 0: print(\"Connected to MQTT Broker! return code %d\" % rc) else: print(\"Failed to connect, return code %d\\n\", rc) client = mqtt_client.Client(client_id) # client.username_pw_set(\'uname\', \'upwd\') # 链接mqtt所需的用户名和密码,没有可不写 client.on_connect = on_connect client.connect(broker , port) return client def subscribe(client: mqtt_client, a_topic): \"\"\" 订阅消息 \"\"\" def on_message(client, userdata, msg): \"\"\" 消息回调函数 在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。 * 这里可添加自定义数据处理程序 \"\"\" print(\'From topic : %s\\n\\tmsg : %s\' % (msg.topic, msg.payload.decode())) client.subscribe(topic) client.on_message = on_message def run(client_id, topic): client = connect_mqtt(client_id) subscribe(client, topic) client.loop_forever() if __name__ == \'__main__\': run(\'test_eartag-003-python-li\', \'zk100/gw/#\')
matplotlib绘制动态图
import matplotlib.pyplot as plt import numpy as np count = 100 # 图中最多数据量 ax = list(range(count)) # 保存图1数据 ay = [0] * 100 bx = list(range(count)) # 保存图2数据 by = [0] * 100 num = count # 计数 plt.ion() # 开启一个画图的窗口进入交互模式,用于实时更新数据 plt.rcParams[\'figure.figsize\'] = (10, 10) # 图像显示大小 plt.rcParams[\'font.sans-serif\'] = [\'SimHei\'] # 防止中文标签乱码,还有通过导入字体文件的方法 plt.rcParams[\'axes.unicode_minus\'] = False plt.rcParams[\'lines.linewidth\'] = 0.5 # 设置曲线线条宽度 plt.tight_layout() while True: plt.clf() # 清除刷新前的图表,防止数据量过大消耗内存 plt.suptitle(\"总标题\", fontsize=30) # 添加总标题,并设置文字大小 g1 = np.random.random() # 生成随机数画图 # 图表1 ax.append(num) # 追加x坐标值 ay.append(g1) # 追加y坐标值 agraphic = plt.subplot(2, 1, 1) agraphic.set_title(\'子图表标题1\') # 添加子标题 agraphic.set_xlabel(\'x轴\', fontsize=10) # 添加轴标签 agraphic.set_ylabel(\'y轴\', fontsize=20) plt.plot(ax[-count:], ay[-count:], \'g-\') # 等于agraghic.plot(ax,ay,\'g-\') # 图表2 bx.append(num) by.append(g1) bgraghic = plt.subplot(2, 1, 2) bgraghic.set_title(\'子图表标题2\') bgraghic.plot(bx[-count:], by[-count:], \'r^\') plt.pause(0.001) # 设置暂停时间,太快图表无法正常显示 num = num + 1
matplotlib绘制mqtt数据实时图像
单线程
先启动mqtt订阅服务
mqtt订阅中有阻塞,更新数据后因订阅服务没有结束,导致绘图程序无法绘图
先启动绘图程序
绘图程序本身也是个循环,拿不到mqtt的实时数据,图像无法更新
两个服务加入协程,也不行。具体原因还不知道,容后补充。
mqtt作为线程启动,可解决上述问题
import json import random from paho.mqtt import client as mqtt_client import time import datetime from math import ceil, floor import matplotlib.pyplot as plt import _thread # 公共变量 broker = \'broker.emqx.io\' topic = \"/python/mqtt/li\" port = 1883 client_id = f\'python-mqtt-li-{random.randint(0, 100)}\' show_num = 300 x_num = [-1] # 计数 acc1 = [] acc2 = [] acc3 = [] acc4 = [] acc5 = [] acc6 = [] stime = [] \"\"\"mqtt subscribe topic\"\"\" def str_microsecond_datetime2int_13timestamp(str_microsecond_datetime): \"\"\"将字符串型【毫秒级】格式化时间 转为 【13位】整型时间戳\"\"\" datetime_obj = datetime.datetime.strptime(str_microsecond_datetime, \"%Y/%m/%d-%H:%M:%S.%f\") obj_stamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0) / 1000.0 return obj_stamp def int2datetime(int_float_timestamp): \"\"\" 有小数点:分离小数点,整数转为格式化时间,小数点直接跟在后面 无小数点:从第10位进行分离, 所以本函数只适用于时间戳整数位数大于9且小于11. \"\"\" if \'.\' in str(int_float_timestamp): int_float = str(int_float_timestamp).split(\'.\') date = time.localtime(int(int_float[0])) tempDate = time.strftime(\"%Y/%m/%d-%H:%M:%S\", date) secondafter = \'.\' + str(int_float[1]) return str(tempDate) + secondafter def parse_mqttmsg(msg): \"\"\"解析mqt头数据 MAC samplerate sampletime battery acc\"\"\" content = json.loads(msg.payload.decode()) span = 1000 / content[\'samplerate\'] * 10 time_span = [ceil(span) / 10 / 1000, floor(span) / 10 / 1000] sampletime = content[\'sampletime\'] sampletime_int = str_microsecond_datetime2int_13timestamp(sampletime) acc = content[\'acc\'] for i in range(len(acc)): x_num.append(x_num[-1] + 1) acc1.append(acc[i][0]) acc2.append(acc[i][1]) acc3.append(acc[i][2]) acc4.append(acc[i][3]) acc5.append(acc[i][4]) acc6.append(acc[i][5]) if i != 0: sampletime_int += time_span[i % 2] stime.append(int2datetime(round(sampletime_int * 1000, 0) / 1000)) else: stime.append(sampletime) print(x_num[-1], stime[-1], acc1[-1], acc2[-1], acc3[-1], acc4[-1], acc5[-1], acc6[-1]) def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print(\"Connected to MQTT Broker!\") else: print(\"Failed to connect, return code %d\\n\", rc) pass client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client def subscribe(client: mqtt_client): def on_message(client, userdata, msg): # print(f\"Received `{msg.payload.decode()}` from `{msg.topic}` topic\") parse_mqttmsg(msg) client.subscribe(topic) client.on_message = on_message def run(): client = connect_mqtt() subscribe(client) client.loop_forever() \"\"\" draw figures \"\"\" def draw_figure(): plt.ion() # 开启一个画图的窗口进入交互模式,用于实时更新数据 plt.rcParams[\'figure.figsize\'] = (10, 10) # 图像显示大小 plt.rcParams[\'font.sans-serif\'] = [\'SimHei\'] # 防止中文标签乱码,还有通过导入字体文件的方法 plt.rcParams[\'axes.unicode_minus\'] = False plt.rcParams[\'lines.linewidth\'] = 0.5 # 设置曲线线条宽度 count = 0 while True: plt.clf() # 清除刷新前的图表,防止数据量过大消耗内存 plt.suptitle(\"总标题\", fontsize=30) # 添加总标题,并设置文字大小 plt.tight_layout() # 图表1 agraphic = plt.subplot(2, 1, 1) agraphic.set_title(\'子图表标题1\') # 添加子标题 agraphic.set_xlabel(\'x轴\', fontsize=10) # 添加轴标签 agraphic.set_ylabel(\'y轴\', fontsize=20) plt.plot(x_num[1:][-show_num:], acc1[-show_num:], \'g-\') try: xtricks = list(range(len(acc1) - show_num, len(acc1), 10)) # **1** xlabels = [stime[i] for i in xtricks] # **2** plt.xticks(xtricks, xlabels, rotation=15) except: pass # 图表2 bgraghic = plt.subplot(2, 1, 2) bgraghic.set_title(\'子图表标题2\') bgraghic.set_xlabel(\'x轴\', fontsize=10) # 添加轴标签 bgraghic.set_ylabel(\'y轴\', fontsize=20) bgraghic.plot(x_num[1:][-show_num:], acc2[-show_num:], \'r^\') plt.pause(0.001) # 设置暂停时间,太快图表无法正常显示 count = count + 1 if __name__ == \'__main__\': # 多线程 _thread.start_new_thread(run, ()) draw_figure()
© 版权声明
THE END
暂无评论内容