python串口接收数据

数字信号采集

8人已加入

描述

1、Python使用线程来接收串口数据

python

2、python3 Serial 串口助手的接收读取数据

创建串口助手首先需要创建一个类,重构类的实现过程如下:

#coding=gb18030

import threading

import time

import serial

class ComThread:

def __init__(self, Port=‘COM3’):

#构造串口的属性

self.l_serial = None

self.alive = False

self.waitEnd = None

self.port = Port

self.ID = None

self.data = None

#定义串口等待的函数

def waiting(self):

if not self.waitEnd is None:

self.waitEnd.wait()

def SetStopEvent(self):

if not self.waitEnd is None:

self.waitEnd.set()

self.alive = False

self.stop()

#启动串口的函数

def start(self):

self.l_serial = serial.Serial()

self.l_serial.port = self.port

self.l_serial.baudrate = 115200

#设置等待时间,若超出这停止等待

self.l_serial.timeout = 2

self.l_serial.open()

#判断串口是否已经打开

if self.l_serial.isOpen():

self.waitEnd = threading.Event()

self.alive = True

self.thread_read = None

self.thread_read = threading.Thread(target=self.FirstReader)

self.thread_read.setDaemon(1)

self.thread_read.start()

return True

else:

return False

创建好类后,就要实现串口读取的功能,其读取数据的函数如下:

首先要创建一个字符串来存放接收到的数据:

data = ‘’

data = data.encode(‘utf-8’)#由于串口使用的是字节,故而要进行转码,否则串口会不识别

在创建好变量来接收数据后就要开始接收数据:

n = self.l_serial.inWaiting()#获取接收到的数据长度

if n:

#读取数据并将数据存入data

data = data + self.l_serial.read(n)

#输出接收到的数据

print(‘get data from serial port:’, data)

#显示data的类型,便于如果出错时检查错误

print(type(data))

将数据接收完后,就要对接收到的数据进行处理,提取出有用信息,由于下位机使用的协议不一样,因此处理的方法也不一样,我使用的协议是**+ID+*Data+*,因此我的提取方法如下:

#获取还没接收到的数据长度

n = self.l_serial.inWaiting()

#判断是否已经将下位机传输过来的数据全部提取完毕,防止之前没有获取全部数据

if len(data)》0 and n==0:

try:

#将数据转换为字符型

temp = data.decode(‘gb18030’)

#输出temp类型,看转码是否成功

print(type(temp))

#输出接收到的数据

print(temp)

将数据按换行分割并输出

car,temp = str(temp).split(“\n”,1)

print(car,temp)

#将temp按‘:’分割,并获取第二个数据

string = str(temp).strip().split(“:”)[1]

#由于前面分割后的数据类型是列表,因此需要转换成字符串,而后按照‘*’分割,得到的也就是我们需要的Id和data

str_ID,str_data = str(string).split(“*”,1)

print(str_ID)

print(str_data)

print(type(str_ID),type(str_data))

#判断data最后一位是否是‘*’,若是则退出,若不是则输出异常

if str_data[-1]== ‘*’:

break

else:

print(str_data[-1])

print(‘str_data[-1]!=*’)

except:

print(“读卡错误,请重试!\n”)

其输出结果为:

get data from serial port:b‘ID:4A622E29\n\xbf\xa8\xd6\xd0\xbf\xe924\xca\xfd\xbe\xdd\xce\xaa:1*80*\r\n’

《class ‘bytes’》

《class ‘str’》

ID:4A622E29

卡中块24数据为:1*80*

ID:4A622E29 卡中块24数据为:1*80*

80*

《class ‘str’》 《class ‘str’》

串口助手完整代码如下:

#coding=gb18030

import threading

import time

import serial

class ComThread:

def __init__(self, Port=‘COM3’):

self.l_serial = None

self.alive = False

self.waitEnd = None

self.port = Port

self.ID = None

self.data = None

def waiting(self):

if not self.waitEnd is None:

self.waitEnd.wait()

def SetStopEvent(self):

if not self.waitEnd is None:

self.waitEnd.set()

self.alive = False

self.stop()

def start(self):

self.l_serial = serial.Serial()

self.l_serial.port = self.port

self.l_serial.baudrate = 115200

self.l_serial.timeout = 2

self.l_serial.open()

if self.l_serial.isOpen():

self.waitEnd = threading.Event()

self.alive = True

self.thread_read = None

self.thread_read = threading.Thread(target=self.FirstReader)

self.thread_read.setDaemon(1)

self.thread_read.start()

return True

else:

return False

def SendDate(self,i_msg,send):

lmsg = ‘’

isOK = False

if isinstance(i_msg):

lmsg = i_msg.encode(‘gb18030’)

else:

lmsg = i_msg

try:

# 发送数据到相应的处理组件

self.l_serial.write(send)

except Exception as ex:

pass;

return isOK

def FirstReader(self):

while self.alive:

time.sleep(0.1)

data = ‘’

data = data.encode(‘utf-8’)

n = self.l_serial.inWaiting()

if n:

data = data + self.l_serial.read(n)

print(‘get data from serial port:’, data)

print(type(data))

n = self.l_serial.inWaiting()

if len(data)》0 and n==0:

try:

temp = data.decode(‘gb18030’)

print(type(temp))

print(temp)

car,temp = str(temp).split(“\n”,1)

print(car,temp)

string = str(temp).strip().split(“:”)[1]

str_ID,str_data = str(string).split(“*”,1)

print(str_ID)

print(str_data)

print(type(str_ID),type(str_data))

if str_data[-1]== ‘*’:

break

else:

print(str_data[-1])

print(‘str_data[-1]!=*’)

except:

print(“读卡错误,请重试!\n”)

self.ID = str_ID

self.data = str_data[0:-1]

self.waitEnd.set()

self.alive = False

def stop(self):

self.alive = False

self.thread_read.join()

if self.l_serial.isOpen():

self.l_serial.close()

#调用串口,测试串口

def main():

rt = ComThread()

rt.sendport = ‘**1*80*’

try:

if rt.start():

print(rt.l_serial.name)

rt.waiting()

print(“The data is:%s,The Id is:%s”%(rt.data,rt.ID))

rt.stop()

else:

pass

except Exception as se:

print(str(se))

if rt.alive:

rt.stop()

print(‘’)

print (‘End OK 。’)

temp_ID=rt.ID

temp_data=rt.data

del rt

return temp_ID,temp_data

if __name__ == ‘__main__’:

#设置一个主函数,用来运行窗口,便于若其他地方下需要调用串口是可以直接调用main函数

ID,data = main()

print(“******”)

print(ID,data)

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分