人工智能在牲畜和野生动物监测中的作用预计将显着增长。这个项目就是一个例子,展示了人工智能如何使用嵌入式机器学习以快速有效的方式跟踪和计数对象(动物或农作物)。该跟踪系统使用无人机飞越场地(向下扫描表面)的计算机视觉,摄像头朝下。ML 模型将能够检测和区分动物或作物的类型,并可以实时计算每种对象(动物/作物)的累积数量。这使野生动物救援队能够监测动物/作物的数量,也可以用于企业计算畜牧和农业市场的潜在收入。
本项目使用 Edge Impulse 的 FOMO(Faster Objects, More Objects)物体检测算法。野生动物/牲畜/资产跟踪环境可以通过选择灰度图像块和具有 2 个输出类(例如乌龟和鸭子)的 FOMO 对象检测来模拟和执行。该项目利用 FOMO 快速高效的算法对对象进行计数,同时使用受限微控制器或单板基于 Linux 的计算机(如 Raspberry Pi)。
Edge Impulse 模型也在我们的 Python 代码中实现,以便它可以累积计算对象。该算法将当前帧的坐标与之前的帧进行比较;查看相机上是否有新对象,或者该对象之前是否已计数。在我们的测试中,有时计算的对象数量仍然不准确,因为该模型仍处于概念验证阶段。不过我们相信这个概念可以进一步发展到现实世界的应用中。
该项目共包括 5 个步骤:
准备
数据采集和标记
使用 FOMO 对象检测训练和构建模型
在 Raspberry Pi 上部署和测试对象检测
构建 Python 应用程序以检测和计数(累积)
第 1 步:准备
使用更新的 Raspberry Pi OS(Buster 或 Bullseye)准备您的 Raspberry Pi。然后打开您的终端应用程序并ssh到您的 Pi。
从上方拍摄不同位置的物体(例如鸭子和乌龟),背景不同的照明条件,以确保模型可以在不同的条件下工作(防止过度拟合)。在这个项目中,我使用智能手机摄像头捕捉图像以进行数据收集,以方便使用。
注意:尽量保持图片中物体大小相似,物体大小的显着差异会混淆 FOMO 算法。
项目使用 Edge Impulse 作为机器学习平台,所以我们需要登录(首先创建一个帐户),然后转到Edge Impulse并创建新项目。
第 2 步:数据采集和标记
选择图像项目选项,然后分类多个对象。
在 Dashboard 》 Project Info 中,选择 Bounding Boxes 进行标记方法,选择 Raspberry Pi 4 进行延迟计算。
然后在数据采集中,单击上传数据选项卡,选择您的文件,选择自动拆分,然后单击开始上传。
现在,是时候贴标签了。单击标签队列选项卡,然后开始在对象周围拖动一个框并标记它(鸭或乌龟)并保存。重复直到标记所有图像。确保训练和测试数据之间的比率是理想的,大约为 80/20。
第 3 步:使用 FOMO 对象检测训练和构建模型
准备好数据集后,转到 Create Impulse 并将 96 x 96 设置为图像宽度 - 高度(这有助于使模型的内存大小保持较小)。然后选择拟合最短轴,并选择图像和对象检测作为学习块。
转到图像参数部分,选择颜色深度作为灰度,然后按保存参数。
最后,单击 Generate features 按钮,您应该会得到如下图所示的结果。
然后,导航到目标检测部分,并保持神经网络的训练设置不变——在我们的例子中是非常平衡的预训练模型,然后我们选择 FOMO (MobileNet V2 0.35)。通过按开始训练来训练模型,您可以看到进度。如果一切正常,您应该会看到如下内容:
之后我们可以测试模型,进入模型测试部分并单击全部分类。如果准确率结果超过 80%,那么我们可以进行下一步——部署。注意:如果准确率结果不如预期,请重新开始使用质量数据、标签,或者只是通过训练周期和学习率设置更改重新训练模型。
第 4 步:部署训练好的模型并在 Raspberry Pi 上进行测试
现在,我们可以切换到 Raspberry Pi。确保您的 Pi 已安装所有依赖项和 Edge Impulse for Linux CLI (如步骤 1 所示)并连接您的 Pi 摄像头(或 USB 网络摄像头)。然后,通过终端ssh你的 Pi 并输入:
$ edge-impulse-linux-runner
(如果您有多个项目,请添加- - clean )在此过程中,您将被要求登录您的 Edge Impulse 帐户。
这将自动下载您的模型并将其编译到您的 Pi,然后开始分类。结果将显示在终端窗口中。
您还可以在浏览器上启动视频流:http:// 你的树莓派 IP 地址:4912
Turtle 和 Duck 已通过 x、y 坐标实时成功识别(每次推理时间非常短)。
在这一步之前,我们已经取出数据并在 Edge Impulse 平台上训练了一个对象检测模型,并在我们的 Raspberry Pi 板上本地运行该模型。因此,可以得出结论,它已成功部署。
第 5 步:构建 Python 程序进行检测和计数
为了使该项目对特定用例更有意义,我们希望它计算从移动相机(通过无人机)拍摄的每种类型对象的累积计数。我们采用 Edge Impulse 的示例对象检测程序,并通过解决加权二分匹配问题将其转变为对象跟踪程序,以便可以跨不同帧跟踪同一对象。有关更多详细信息,您可以在下面的代码中查看。
因为我们使用 Python,所以我们需要安装 Python 3 Edge Impulse SDK 并从之前的 Edge Impulse 示例中克隆存储库。
您还需要下载经过训练的模型文件,以便我们正在运行的程序可以访问它。输入这个来下载它:
$ edge-impulse-linux-runner --download modelfile.eim
确保您/我们的程序 《count_moving_ducks》 放置在正确的目录中,例如:
$ cd linux-sdk-python/examples/image
然后,使用以下命令运行程序:
$ python3 count_moving_ducks.py ~/modelfile.eim
最后,我们成功实现了 Edge Impulse FOMO 对象检测模型,并在树莓派本地运行累积计数程序。以我们获得的速度和精度水平,我们有信心这个项目也可以用于微控制器,如 Arduino 的 Nicla Vision 或 ESP32 CAM,因此更容易安装到无人机上。
count_moving_ducks.py:
'''
Author: Jallson Suryo & Nicholas Patrick
Date: 2022-07-25
License: CC0
Source: Edge Impulse python SDK example file (classify.py) -- modified
Description: Program to count livestock or wildlife from a drone (moving camera) using
Edge Impulse FOMO trained model.
'''
#!/usr/bin/env python
import device_patches # Device specific patches for Jetson Nano (needs to be before importing cv2)
from math import inf, sqrt
from queue import Queue
import cv2
import os
import sys, getopt
import signal
import time
from edge_impulse_linux.image import ImageImpulseRunner
runner = None
# if you don't want to see a camera preview, set this to False
show_camera = True
if (sys.platform == 'linux' and not os.environ.get('DISPLAY')):
show_camera = False
def now():
return round(time.time() * 1000)
def get_webcams():
port_ids = []
for port in range(5):
print("Looking for a camera in port %s:" %port)
camera = cv2.VideoCapture(port)
if camera.isOpened():
ret = camera.read()[0]
if ret:
backendName =camera.getBackendName()
w = camera.get(3)
h = camera.get(4)
print("Camera %s (%s x %s) found in port %s " %(backendName,h,w, port))
port_ids.append(port)
camera.release()
return port_ids
def sigint_handler(sig, frame):
print('Interrupted')
if (runner):
runner.stop()
sys.exit(0)
signal.signal(signal.SIGINT, sigint_handler)
def help():
print('python classify.py
def main(argv):
try:
opts, args = getopt.getopt(argv, "h", ["--help"])
except getopt.GetoptError:
help()
sys.exit(2)
for opt, arg in opts:
if opt in ('-h', '--help'):
help()
sys.exit()
if len(args) == 0:
help()
sys.exit(2)
model = args[0]
dir_path = os.path.dirname(os.path.realpath(__file__))
modelfile = os.path.join(dir_path, model)
print('MODEL: ' + modelfile)
with ImageImpulseRunner(modelfile) as runner:
try:
model_info = runner.init()
print('Loaded runner for "' + model_info['project']['owner'] + ' / ' + model_info['project']['name'] + '"')
labels = model_info['model_parameters']['labels']
if len(args)>= 2:
videoCaptureDeviceId = int(args[1])
else:
port_ids = get_webcams()
if len(port_ids) == 0:
raise Exception('Cannot find any webcams')
if len(args)<= 1 and len(port_ids)> 1:
raise Exception("Multiple cameras found. Add the camera port ID as a second argument to use to this script")
videoCaptureDeviceId = int(port_ids[0])
camera = cv2.VideoCapture(videoCaptureDeviceId)
ret = camera.read()[0]
if ret:
backendName = camera.getBackendName()
w = camera.get(3)
h = camera.get(4)
print("Camera %s (%s x %s) in port %s selected." %(backendName,h,w, videoCaptureDeviceId))
camera.release()
else:
raise Exception("Couldn't initialize selected camera.")
HEIGHT = 96
WIDTH = 96
next_frame_start_time = 0
prev_frame_objects = []
cumulative_counts = {'duck' : 0, 'turtle' : 0}
# iterate through frames
for res, img in runner.classifier(videoCaptureDeviceId):
# print('classification runner response', res)
if "classification" in res["result"].keys():
print('Result (%d ms.) ' % (res['timing']['dsp'] + res['timing']['classification']), end='')
for label in labels:
score = res['result']['classification'][label]
print('%s: %.2f\t' % (label, score), end='')
print('', flush=True)
elif "bounding_boxes" in res["result"].keys():
curr_frame_objects = res["result"]["bounding_boxes"]
m, n = len(prev_frame_objects), len(curr_frame_objects)
print('Found %d bounding boxes (%d ms.)' % (n, res['timing']['dsp'] + res['timing']['classification']))
# iterate through identified objects
for bb in curr_frame_objects:
print('\t%s (%.2f): x=%d y=%d w=%d h=%d' % (bb['label'], bb['value'], bb['x'], bb['y'], bb['width'], bb['height']))
img = cv2.rectangle(img, (bb['x'], bb['y']), (bb['x'] + bb['width'], bb['y'] + bb['height']), (255, 0, 0), 1)
# Pairs objects seen in both the previous frame and the current frame.
# To get a good pairing, each potential pair is given a cost. The problem
# then transforms into minimum cost maximum cardinality bipartite matching.
# populate table
def get_c(a0, a1):
# computes cost of pairs. A cost of inf implies no edge.
A, B = sqrt(HEIGHT ** 2 + WIDTH ** 2) / 8, 5
if a0['label'] != a1['label']: return inf
d2 = (a0['x'] - a1['x']) ** 2 + (a0['x'] - a1['x']) ** 2
dn4 = d2 ** -2 if d2 else 10**20
val = a0['value'] * a1['value'] * (((1 + B) * dn4) / (dn4 + A ** -4) - B)
return inf if val <= 0 else 1 - val
match_c = [[get_c(i, j) for j in curr_frame_objects] for i in prev_frame_objects]
# solves the matching problem in O(V^2E) by repeatedly finding augmenting paths
# using shortest path faster algorithm (SPFA).
# A modified Hungarian algorithm could also have been used.
# 0..m-1: prev, left
# m..m+n-1: this, right
# m+n: source
# m+n+1: sink
source, sink, V = m + n, m + n + 1, m + n + 2
matched = [-1] * (m + n + 2)
adjLis = [[] for i in range(m)] + [[(sink, 0)] for _ in range(n)] + [[(i, 0) for i in range(m)], []]
# left right source sink
for i in range(m):
for j in range(n):
if match_c[i][j] != inf:
adjLis[i].append((j + m, match_c[i][j]))
# finds augmenting paths until no more are found.
while True:
# SPFA
distance = [inf] * V
distance[source] = 0
parent = [-1] * V
Q, inQ = Queue(), [False] * V
Q.put(source); inQ[source] = True
while not Q.empty():
u = Q.get(); inQ[u] = False
for v, w in adjLis[u]:
if u < m and matched[u] == v: continue
if u == source and matched[v] != -1: continue
if distance[u] + w < distance[v]:
distance[v] = distance[u] + w
parent[v] = u
if not inQ[v]: Q.put(v); inQ[v] = True
aug = parent[sink]
if aug == -1: break
# augment the shortest path
while aug != source:
v = aug
aug = parent[aug]
u = aug
aug = parent[aug]
adjLis[v] = [(u, -match_c[u][v - m])]
matched[u], matched[v] = v, u
# updating cumulative_counts by the unmatched new objects
for i in range(n):
if matched[m + i] == -1:
cumulative_counts[curr_frame_objects[i]['label']] += 1
# preparing prev_frame_objects for the next frame
next_prev_frame_objects = curr_frame_objects
# considering objects that became invisible (false negative) for a few frames.
for i in range(m):
if matched[i] != -1: continue
prev_frame_objects[i]['value'] *= 0.7
if prev_frame_objects[i]['value'] >= 0.35:
next_prev_frame_objects.append(prev_frame_objects[i])
prev_frame_objects = next_prev_frame_objects
print("current cumulative_counts:\n %d ducks, %d turtles" % (cumulative_counts['duck'], cumulative_counts['turtle']))
if (show_camera):
cv2.imshow('edgeimpulse', cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
if cv2.waitKey(1) == ord('q'):
break
if (next_frame_start_time > now()):
time.sleep((next_frame_start_time - now()) / 1000)
# operates at a maximum of 5fps
next_frame_start_time = now() + 200
finally:
if (runner):
runner.stop()
if __name__ == "__main__":
main(sys.argv[1:])
全部0条评论
快来发表一下你的评论吧 !