拳力以赴!基于 RT-Thread 与瑞萨 VisionBoard 的 AIoT 猜拳系统实战 | 技术集结

描述

本项目的题目为: 石头剪刀布猜拳识别。本项目实现了基本猜拳识别、根据识别 猜拳结果,与机器内的手势对比,根据结果控制舵机的行为,并且将识别的结果同步显示在上位机。

目录


 

项目概述


 

硬件清单


 

舵机控制引脚


 

软件与运行环境


 

固件编译


 

创建数据集


 

模型训练


 

代码结构与主要函数


 

使用方法


 

常见问题


 

项目源码

1 项目概述


 

目标: 在3秒以内识别猜拳的结果,并且根据猜拳输赢控制舵机旋转,将识别结果显示在上位机

平台: 瑞萨visionboard(摄像头、GPIO)

核心思路:

通过瑞萨visionboard采集照片数据作为数据集进行模型训练

将采集的数据集通过edge impulse进行训练

使用mqttx接受来自瑞萨visionboard的识别数据

根据识别猜拳结果控制舵机行为

使用手册: RA8D1 Group User’s Manual: Hardware(https://www.renesas.cn/zh/document/mah/ra8d1-group-users-manual-hardware?r=25456556

使用烧写工具: Renesas Flash Programmer V3.12(https://en.freedownloadmanager.org/Windows-PC/Renesas-Flash-Programmer.html

openmv固件: openmv固件(https://github.com/RT-Thread-Studio/sdk-bsp-ra8d1-vision-board

2 硬件清单


 

瑞萨visionboard开发板(摄像头)

摄像头(RGB565,工作分辨率 320×240,ov5640)

360度 SG90舵机

3 舵机控制引脚


 

P008 —> 舵机

P008地址: 0x4040_0000 + 0x0020 × m(参考RA8D1 Group User’s Manual: Hardware 655页)

4 软件与运行环境


 

openmv固件(https://github.com/RT-Thread-Studio/sdk-bsp-ra8d1-vision-board

主要依赖: sensor、time、tf、network、uctypes

5 固件编译


 

从github(https://github.com/RT-Thread-Studio/sdk-bsp-ra8d1-vision-board)仓库上将代码拉取到本地

运行链接脚本后,进入到 sdk-bsp-ra8d1-vision-board-master\projects\vision_board_openmv,打开 RT-Thread Env Tool,输入 scons —target=mdk5

项目生成后在C++ 预处理加上MICROPYTHON_USING_UCTYPES 的 define

(可以在sdk-bsp-ra8d1-vision-board-master\projects\vision_board_openmv\packages\micropython-v1.13.0\port\mpconfigport.h查看定义),

编译后在objects文件夹下得到rtthread.hex的openmv固件

使用Renesas Flash Programmer V3.12 烧写固件,将 Enable address check of program file 选项去除勾选

6 创建数据集


 

使用瑞萨visionboard进行训练图片采集,这样可以保持输入输出 的一致,一定程度上提高了识别的准确率。

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  

import sensor, image, time, ossensor.reset()sensor.set_pixformat(sensor.RGB565)sensor.set_framesize(sensor.QVGA)  # 320x240sensor.set_windowing((240,240))sensor.skip_frames(time=2000)img_counter = 0while True:    img = sensor.snapshot()    filename = "/dataset/scissors/scissors_img_%03d.jpg" % img_counter    img.save(filename)    print("Saved:", filename)    img_counter += 1    time.sleep_ms(500)if img_counter >= 550:  # 停止条件break

运行时需要插入sd卡,并且提前创建对应的文件夹

7 模型训练


 

使用edge impulse训练模型, 使用Transfer learning,图片大小为240*240,训练时将图片输入改为灰度,减少干扰

8 代码结构与主要函数


 

mqttx:

def publish(self, topic, msg, retain=False, qos=0): 发送消息到mqttx

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  

def publish(self, topic, msg, retain=False, qos=0):      pkt = bytearray()      # MQTT publish header      header = 0x30      if retain:          header |= 0x01      if qos == 1:          header |= 0x02      elif qos == 2:          header |= 0x04      pkt.append(header)      # 计算剩余长度      remaining_length = 2 + len(topic) + len(msg)      if qos > 0:          remaining_length += 2  # 包含packet id      # 先编码剩余长度      def encode_len(length):          encoded = bytearray()          while True:              digit = length % 128              length = length // 128              if length > 0:                  digit |= 0x80              encoded.append(digit)              if length == 0:                  break          return encoded      pkt += encode_len(remaining_length)      # 主题      pkt += struct.pack("!H", len(topic)) + topic      # qos>0时需要packet id      if qos > 0:          pkt += struct.pack("!H", 1)  # packet id固定为1,可改      pkt += msg      self.sock.write(pkt)

def connect(self, clean_session=True): 连接到mqttx

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  

def connect(self, clean_session=True):        addr = socket.getaddrinfo(self.server, self.port)[0][-1]        self.sock = socket.socket()        self.sock.connect(addr)        pkt = bytearray(b"\x10")  # CONNECT packet type        var_header = bytearray(b"\x00\x04MQTT\x04")  # Protocol Name + Level        flags = 0        if clean_session:            flags |= 0x02        var_header.append(flags)        var_header += struct.pack("!H", self.keepalive)        payload = struct.pack("!H", len(self.client_id)) + self.client_id        remaining_length = len(var_header) + len(payload)        # MQTT剩余长度编码(可能大于127字节,需要多字节编码)        def encode_len(length):            encoded = bytearray()            while True:                digit = length % 128                length = length // 128                if length > 0:                    digit |= 0x80                encoded.append(digit)                if length == 0:                    break            return encoded        pkt += encode_len(remaining_length)        pkt += var_header        pkt += payload        self.sock.write(pkt)        resp = self.sock.read(4)        ifnot resp or resp[0] != 0x20or resp[1] != 0x02:            raise MQTTException("Invalid CONNACK")        if resp[3] != 0:            raise MQTTException("Connection refused, code: %d" % resp[3])

WIFI

def connect_wifi(SSID, PASSWORD): WIFI连接

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  

def connect_wifi(SSID, PASSWORD):    wlan = network.WLAN(network.STA_IF)    wlan.active(True)    wlan.connect(SSID, PASSWORD)    connect_times = 0    whilenot wlan.isconnected():        print('Trying to connect to "{:s}"...'.format(SSID))        time.sleep_ms(1000)        connect_times += 1        if connect_times > 5:            print(f"Connect to {SSID} failed.")            return False    print("WiFi Connected ", wlan.ifconfig())    return wlan.ifconfig()

手势识别

初始化

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  

def __init__(self):    self.net = None    self.lables = None    self.WIFIConnectStatus = GestureRecoginze.connect_wifi("IQOO Neo 6", "x31415926y")    self.MqttxClient = None    self.MqttxConnectStatus = False    if self.WIFIConnectStatus:        self.MqttxClient = MQTTClient("openmv", "broker.hivemq.com", port=1883)        self.MqttxConnectStatus = True        try:            self.MqttxClient.connect()            self.MqttxClient.subscribe("openmv/test")        except:            print("connect to MQTTx failed.")            self.MqttxConnectStatus = False        self.MqttxClient.set_callback(lambda topic, msg: print(topic, msg))    self.servo = Servo360(0x40400000, 8)    """ 初始化摄像头 """    sensor.reset()                         # Reset and initialize the sensor.    sensor.set_pixformat(sensor.RGB565)    # Set pixel format to RGB565 (or GRAYSCALE)    sensor.set_framesize(sensor.QVGA)      # Set frame size to QVGA (320x240)    sensor.set_windowing((240,240))        # Set 240x240 window.    sensor.skip_frames(time=2000)          # Let the camera adjust.    """  加载模型  """    try:        # load the model, alloc the model file on the heap if we have at least 64K free after loading        self.net = tf.load("trained.tflite", load_to_fb=uos.stat('trained.tflite')[6] > (gc.mem_free() - (64*1024)))    except Exception as e:        print(e)        raise Exception('Failed to load "trained.tflite", did you copy the .tflite and labels.txt file onto the mass-storage device? (' + str(e) + ')')    try:        self.labels = [line.rstrip('\n') for line in open("labels.txt")]    except Exception as e:        raise Exception('Failed to load "labels.txt", did you copy the .tflite and labels.txt file onto the mass-storage device? (' + str(e) + ')')

识别主体

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  

def MainAction(self, comparetimes):        clock = time.clock()        compare_times = 0        start_time = None        CompareResultShow = None        compare_result = None        while(compare_times            clock.tick()            img = sensor.snapshot()            results = self.net.classify(img,                                   roi=(0, 0, img.width(), img.height()),                                   scale_mul=0,                                   x_overlap=0,                                   y_overlap=0)            obj = results[0]            scores = obj[4]            predictions_list = list(zip(self.labels, scores))            predictions_max = 0            predictions_num = None            for i in range(len(predictions_list)):                label, score = predictions_list[i]                if score > predictions_max:                    predictions_max = score                    predictions_num = label                print("%s = %f" % (label, score))            img.draw_string(0, 0, "Predictions: %s" % predictions_num, mono_space=False, scale=2)            if start_time is None:                start_time = time.ticks_ms()            if time.ticks_diff(time.ticks_ms(), start_time) > 5000:                if predictions_max > 0.90:                    machines_gesture = random.randint(0, 2)                    if machines_gesture == 0: # rock                        if predictions_num == "rock":                            compare_result = "draw"                        elif predictions_num == "paper":                            compare_result = "win"                        else:                            compare_result = "lose"                    elif machines_gesture == 1: # paper                        if predictions_num == "rock":                            compare_result = "lose"                        elif predictions_num == "paper":                            compare_result = "draw"                        else:                            compare_result = "win"                    else: # scissors                        if predictions_num == "rock":                            compare_result = "win"                        elif predictions_num == "paper":                            compare_result = "lose"                        else:                            compare_result = "draw"                    if self.WIFIConnectStatus:                        self.MqttxClient.publish("openmv/test", ujson.dumps({"compare_times": compare_times,                                                                           "machine_label":self.RPS[machines_gesture],                                                                           "label": predictions_num,                                                                           "score": predictions_max,                                                                           "compare_result": compare_result}))                    if compare_result == "win":                        self.servo.run(1, 1) #正转一秒                    else:                        self.servo.run(-1, 1)                    print(compare_times)                    start_time = time.ticks_ms()                    CompareResultShow = time.ticks_ms()                    compare_times += 1            else:                print("get_ready......")            if CompareResultShow is not None and time.ticks_diff(time.ticks_ms(), CompareResultShow) < 2500:                img.draw_string(0, 20, "machines_gesture: %s" % self.RPS[machines_gesture], mono_space=False, scale=2)                img.draw_string(0, 40, "compare result: %s" % compare_result, mono_space=False, scale=2)            else:                CompareResultShow = None                img.draw_string(0, 20, "get_ready......", mono_space=False, scale=2)            print(clock.fps(), "fps")

9 使用方法


 

前置条件

在光线充足,设置一个白色的识别背景

确保wifi、mqttx可以连接

使用

将开发板置于手的正上方20-30cm处,经过5s将会识别一次,比对后会将结果显示在 屏幕上2.5s,对比5次之后识别结束

10 常见问题


 

在某些情况下识别错误: 确保光线充足,与开发板的距离适当

无法在mqttx上获取识别结果: 确保WIFI是否存在,密码是否正确

 

 

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分