本文来源电子发烧友社区,作者:HonestQiao, 帖子地址:https://bbs.elecfans.com/jishu_2303715_1_1.html
之前分享的文章中,在米尔MYD-YT507开发板上进行了摄像头流媒体的尝试,在此基础上,进一步对之前的评测计划进行了实现。
经过充分的学习,最终应用Fluter+Django+OpenCV,实现了一款米尔行车记录仪,现将实现的具体内容,与大家分享。
目录:
一、行车记录仪业务逻辑规划
经过详细的分析,规划了如下的基本业务逻辑结构:
整体分为三个部分:
为了又快又好的开发行车记录仪的实际界面,以及后续进行各移动平台的App开发,选择了Flutter。事实证明,坑太多了。不过,跨平台特性,确实好。
二、硬件设备准备:
开发这款行车记录仪,实际使用到的硬件设备如下:
路由器没有拍照,用普通无线路由器即可,当然带宽越高越好。
开发板上有两个USB3.0接口,选一个接上路由器即可。
然后,将开发板使用网线连接到路由器,再上电,就可以进行实际的操作了。
我这边实际使用中,电源接口有点松,容易突然断电,所以使用胶带进行了加固。
三、摄像头实时画面播放服务开发
在之前尝试MJPEG视频流直播的时候,使用了mjpeg_streamer,但不清楚如何进行视频的分割。
因为行车记录仪,一般都是按照一定的时间进行视频的分割存放,避免单个视频过大。
经过仔细的学习了解,OpenCV也可以获取摄像头的信息,并按照需要写入文件。
最后,采用了Python+OpenCV的方案,有Python负责具体的逻辑,Python-OpenCV负责摄像头视频数据的采集。
视频采集部分,包含的具体功能为:
采集摄像头的数据,Python-opencv搞定。
写入视频数据到文件,Python简单搞定。
提供实时视频预览,这个花了不少功夫。
因为同时要写入到文件,还要提供预览,数据需要复用。
经过学习了解,可以将Python-opencv采集的画面,按帧在HTTP以JPEG数据发送,那么播放端,就能收到MJPEG数据流,进行播放了。
因此,第一版,参考资料,实现了一个Python版的MJPEG播放服务,读取帧,写入临时文件,然后从临时文件读取数据返回。
为了提高效率,还进行了优化,不写入临时文件,直接在内存中进行转换。
最终形成的代码如下:
# http服务器请求处理:网页、MJPEG数据流
class CamHandler(BaseHTTPRequestHandler):
def do_GET(self):
# mjpeg推流
if self.path.endswith('.mjpg'):
self.send_response(200)
self.send_header('Content-type','multipart/x-mixed-replace; boundary=--jpgboundary')
self.end_headers()
while True:
if is_stop:
break
try:
# rc,img = cameraCapture.read()
rc,img = success,frame
if not rc:
continue
if True:
imgRGB=cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
jpg = Image.fromarray(imgRGB)
tmpFile = BytesIO()
jpg.save(tmpFile,'JPEG')
self.wfile.write(b"--jpgboundary")
self.send_header(b'Content-type','image/jpeg')
self.send_header(b'Content-length',str(tmpFile.getbuffer().nbytes))
self.end_headers()
jpg.save(self.wfile,'JPEG')
else:
img_fps = JPEG_QUALITY_VALUE
img_param = [int(cv2.IMWRITE_JPEG_QUALITY), img_fps]
img_str = cv2.imencode('.jpg', img, img_param)[1].tobytes() # change image to jpeg format
self.send_header('Content-type','image/jpeg')
self.end_headers()
self.wfile.write(img_str)
self.wfile.write(b"
--jpgboundary
") # end of this part
time.sleep(0.033)
except KeyboardInterrupt:
self.wfile.write(b"
--jpgboundary--
")
break
except BrokenPipeError:
continue
return
# 网页
if self.path == '/' or self.path.endswith('.html'):
self.send_response(200)
self.send_header('Content-type','text/html')
self.end_headers()
self.wfile.write(b'
'
) self.wfile.write(('' % self.headers.get('Host')).encode()) self.wfile.write(b'') return
这段代码,提供了两个功能:
在开发过程中,运行该服务后,随时可以通过浏览器查看效果。
其中涉及到opencv相关的知识,以及webserver相关的知识,大家可以了解相关的资料做基础,这里就不详细说了。
本来以为提供了MJPEG服务,就能够在Flutter开发的Web界面中调用了。然而,实际使用时,发现坑来了。
Flutter的公共库里面,有MJPEG的库,但是在目前的版本中,已经不能使用了。且官方认为用的人不多,在可预见的将来,不会修复。悲催啊!!!
条条大道通罗马,此处不通开新路。
经过再次的学习了解,Flutter的Video功能,支持Stream模式,其可以采用WebSocket的方式来获取数据,然后进行播放。
那么,只要能够在服务端,将获取的帧数据,使用WebSocket提供,就能够正常播放了。
最终,使用Python开发了能够提供实时视频数据的WebSocket服务,具体代码如下:
# websocket服务请求处理
async def CamTransmitHandler(websocket, path):
print("Client Connected !")
try :
while True:
# rc,img = cameraCapture.read()
rc,img = success,frame
if not rc:
continue
img_fps = JPEG_QUALITY_VALUE
img_param = [int(cv2.IMWRITE_JPEG_QUALITY), img_fps]
encoded = cv2.imencode('.jpg', img, img_param)[1]
data = str(base64.b64encode(encoded))
data = data[2:len(data)-1]
await websocket.send(data)
# cv2.imshow("Transimission", frame)
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
# cap.release()
except EXCEPTION_CONNECTION_CLOSE as e:
print("Client Disconnected !")
# cap.release()
except:
print("Someting went Wrong !")
这个部分比之前的更简单,就是简单的转换数据,喂数据给WebSocket即可。
上述的两部分代码中,都没有包含完整的逻辑处理过程,只有关键代码部分。
各部分分别讲完以后,将提供完整的代码以供学习。
到这里,实时流媒体功能就实现了。
四、摄像头视频信息记录
实际上,上一步的实时视频功能,也依赖于这一步,因为其需要共享实际获取的摄像头信息。
其基本逻辑也比较简单,步骤如下:
具体代码如下:
# 捕获摄像头
cameraCapture = cv2.VideoCapture(CAMERA_NO)
# 摄像头参数设置
cameraCapture.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
cameraCapture.set(cv2.CAP_PROP_FRAME_WIDTH, 240)
cameraCapture.set(cv2.CAP_PROP_SATURATION, 135)
fps = 30
size=(int(cameraCapture.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cameraCapture.get(cv2.CAP_PROP_FRAME_HEIGHT)))
# 读取捕获的数据
success,frame = cameraCapture.read()
...
while True:
if is_stop:
success = False
break;
success,frame = cameraCapture.read()
if not success:
continue
time_now = get_current_time()
if time_now["time"] - time_record["time"] >= ROTATE_TIME:
if time_record_prev:
thubm_file = get_file_name(time_record_prev, 'thumbs', 'jpg')
print("[Info] write to thumb: %s" % thubm_file)
if not os.path.isfile(thubm_file):
cv2.imwrite(thubm_file, frame)
time_record = time_now
time_record_prev = get_current_time()
video_file = get_file_name(time_record_prev, 'videos', MEDIA_EXT)
print("[Info] write to video: %s" % video_file)
# encode = cv2.VideoWriter_fourcc(*"mp4v")
encode = cv2.VideoWriter_fourcc(*'X264')
# encode = cv2.VideoWriter_fourcc(*'AVC1')
# encode = cv2.VideoWriter_fourcc(*'XVID')
# encode = cv2.VideoWriter_fourcc(*'H264')
videoWriter=cv2.VideoWriter(video_file, encode,fps,size) # mp4
numFrameRemaining = ROTATE_TIME * fps #摄像头捕获持续时间
while success and numFrameRemaining > 0:
videoWriter.write(frame)
success,frame = cameraCapture.read()
numFrameRemaining -= 1
cameraCapture.release()
上述代码的逻辑其实很清晰,有opencv的基础,一看就懂。
有一个关键点需要注意的就是 encode = cv2.VideoWriter_fourcc(*'X264')
,在不同的环境下面,提供的编码方式不完全相同。
在米尔MYD-YT507开发板的Ubuntu环境中,可以使用X264编码。
上述代码,会持续不断的读取摄像头的数据帧,存放到frame变量中,然后写入到视频文件中。并进行时间判断,以确定是否需要写入到新的视频文件中。
frame变量,在之前实时视频服务中,也会使用,相当于是共享了。
五、摄像头服务的完整代码
经过上面的两个部分,就完成了摄像头部分的服务代码。
整体的代码如下:
# -*- coding: utf-8 -*-
import signal
import cv2
import time
from PIL import Image
from threading import Thread
from http.server import BaseHTTPRequestHandler,HTTPServer
from socketserver import ThreadingMixIn
from io import BytesIO
import os
import sys
import websockets
import asyncio
import base64
import ctypes
import inspect
CAMERA_NO = 2
ROTATE_TIME = 120
MJPEG_ENABLE = 1
WEBSOCKET_ENABLE = 1
MJPEG_SERVER_PORT = 28888
WEBSOCKET_PORT = 28889
JPEG_QUALITY_VALUE = 65
STORE_DIR = "./data/" if os.uname()[0] == 'Darwin' else "/sdcard/data/"
MEDIA_EXT = "mkv"
EXCEPTION_CONNECTION_CLOSE = websockets.exceptions.ConnectionClosed if sys.version[:3] == '3.6' else websockets.ConnectionClosed
def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
try:
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
# pass
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
except Exception as err:
print(err)
def stop_thread(thread):
"""终止线程"""
_async_raise(thread.ident, SystemExit)
# 信号处理回调
def signal_handler(signum, frame):
# global cameraCapture
# global thread
# global server
# global is_stop
# global success
print('signal_handler: caught signal ' + str(signum))
if signum == signal.SIGINT.value:
print('stop server:')
is_stop = True
success = False
print("mjpeg server.socket.close...")
server.socket.close()
print("mjpeg server.shutdown...")
server.shutdown()
print("ws server.socket.close...")
server_ws.ws_server.close()
time.sleep(1)
# print("ws server.shutdown...")
# await server_ws.ws_server.wait_closed()
print("mjpeg thread.shutdown...")
thread_mjpeg.join()
print("ws loop.shutdown...")
# event_loop_ws.stop()
event_loop_ws.call_soon_threadsafe(event_loop_ws.stop)
time.sleep(1)
# print("ws thread.shutdown...")
# stop_thread(thread_ws)
# time.sleep(1)
# print(server)
# print(server_ws)
print(thread_mjpeg.is_alive())
print(thread_ws.is_alive())
print(event_loop_ws.is_running())
# thread_ws.join()
print("cameraCapture.release...")
cameraCapture.release()
print("quit...")
# print(server_ws)
sys.exit(0)
# http服务器请求处理:网页、MJPEG数据流
class CamHandler(BaseHTTPRequestHandler):
def do_GET(self):
# mjpeg推流
if self.path.endswith('.mjpg'):
self.send_response(200)
self.send_header('Content-type','multipart/x-mixed-replace; boundary=--jpgboundary')
self.end_headers()
while True:
if is_stop:
break
try:
# rc,img = cameraCapture.read()
rc,img = success,frame
if not rc:
continue
if True:
imgRGB=cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
jpg = Image.fromarray(imgRGB)
tmpFile = BytesIO()
jpg.save(tmpFile,'JPEG')
self.wfile.write(b"--jpgboundary")
self.send_header(b'Content-type','image/jpeg')
self.send_header(b'Content-length',str(tmpFile.getbuffer().nbytes))
self.end_headers()
jpg.save(self.wfile,'JPEG')
else:
img_fps = JPEG_QUALITY_VALUE
img_param = [int(cv2.IMWRITE_JPEG_QUALITY), img_fps]
img_str = cv2.imencode('.jpg', img, img_param)[1].tobytes() # change image to jpeg format
self.send_header('Content-type','image/jpeg')
self.end_headers()
self.wfile.write(img_str)
self.wfile.write(b"
--jpgboundary
") # end of this part
time.sleep(0.033)
except KeyboardInterrupt:
self.wfile.write(b"
--jpgboundary--
")
break
except BrokenPipeError:
continue
return
# 网页
if self.path == '/' or self.path.endswith('.html'):
self.send_response(200)
self.send_header('Content-type','text/html')
self.end_headers()
self.wfile.write(b'
'
) self.wfile.write(('' % self.headers.get('Host')).encode()) self.wfile.write(b'') return class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): """Handle requests in a separate thread.""" # 启动MJPEG服务 def mjpeg_server_star(): global success global server global thread_mjpeg try: server = ThreadedHTTPServer(('0.0.0.0', MJPEG_SERVER_PORT), CamHandler) print("mjpeg server started: http://0.0.0.0:%d" % MJPEG_SERVER_PORT) # server.serve_forever() thread_mjpeg = Thread(target=server.serve_forever); thread_mjpeg.start() except KeyboardInterrupt: print("mjpeg server stoping...") server.socket.close() server.shutdown() print("mjpeg server stoped") # websocket服务请求处理 async def CamTransmitHandler(websocket, path): print("Client Connected !") try : while True: # rc,img = cameraCapture.read() rc,img = success,frame if not rc: continue img_fps = JPEG_QUALITY_VALUE img_param = [int(cv2.IMWRITE_JPEG_QUALITY), img_fps] encoded = cv2.imencode('.jpg', img, img_param)[1] data = str(base64.b64encode(encoded)) data = data[2:len(data)-1] await websocket.send(data) # cv2.imshow("Transimission", frame) # if cv2.waitKey(1) & 0xFF == ord('q'): # break # cap.release() except EXCEPTION_CONNECTION_CLOSE as e: print("Client Disconnected !") # cap.release() except: print("Someting went Wrong !") # websocket服务器启动 def websocket_server_start(): global thread_ws global server_ws global event_loop_ws event_loop_ws = asyncio.new_event_loop() def run_server(): global server_ws print("websocket server started: ws://0.0.0.0:%d" % WEBSOCKET_PORT) server_ws = websockets.serve(CamTransmitHandler, port=WEBSOCKET_PORT, loop=event_loop_ws) event_loop_ws.run_until_complete(server_ws) event_loop_ws.run_forever() thread_ws = Thread(target=run_server) thread_ws.start() # try: # yield # except e: # print("An exception occurred") # finally: # event_loop.call_soon_threadsafe(event_loop.stop) # 获取存储的文件名 def get_file_name(time_obj, path, ext): file_name_time = "%04d-%02d-%02d_%02d-%02d-%02d" % (time_obj["year"], time_obj["month"], time_obj["day"], time_obj["hour"], time_obj["min"], 0) return '%s/%s/%s.%s' % (STORE_DIR, path, file_name_time, ext) # 获取当前整分时间 def get_current_time(): time_now = time.localtime() time_int = int(time.time()) return { "year": time_now.tm_year, "month": time_now.tm_mon, "day": time_now.tm_mday, "hour": time_now.tm_hour, "min": time_now.tm_min, "sec": time_now.tm_sec, "time": time_int - time_now.tm_sec } # 设置信号回调 signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # 捕获摄像头 cameraCapture = cv2.VideoCapture(CAMERA_NO) # 摄像头参数设置 cameraCapture.set(cv2.CAP_PROP_FRAME_WIDTH, 320) cameraCapture.set(cv2.CAP_PROP_FRAME_WIDTH, 240) cameraCapture.set(cv2.CAP_PROP_SATURATION, 135) fps = 30 size=(int(cameraCapture.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cameraCapture.get(cv2.CAP_PROP_FRAME_HEIGHT))) # 读取捕获的数据 success,frame = cameraCapture.read() if not success: print("camera start failed.") quit() is_stop = False server = None server_ws = None event_loop_ws = None thread_mjpeg = None thread_ws = None mjpeg_server_star() websocket_server_start() print("record server star:") thubm_file = None video_file = None time_start = int(time.time()) time_record = {"time":0} time_record_prev = None while True: if is_stop: success = False break; success,frame = cameraCapture.read() if not success: continue time_now = get_current_time() if time_now["time"] - time_record["time"] >= ROTATE_TIME: if time_record_prev: thubm_file = get_file_name(time_record_prev, 'thumbs', 'jpg') print("[Info] write to thumb: %s" % thubm_file) if not os.path.isfile(thubm_file): cv2.imwrite(thubm_file, frame) time_record = time_now time_record_prev = get_current_time() video_file = get_file_name(time_record_prev, 'videos', MEDIA_EXT) print("[Info] write to video: %s" % video_file) # encode = cv2.VideoWriter_fourcc(*"mp4v") encode = cv2.VideoWriter_fourcc(*'X264') # encode = cv2.VideoWriter_fourcc(*'AVC1') # encode = cv2.VideoWriter_fourcc(*'XVID') # encode = cv2.VideoWriter_fourcc(*'H264') videoWriter=cv2.VideoWriter(video_file, encode,fps,size) # mp4 numFrameRemaining = ROTATE_TIME * fps #摄像头捕获持续时间 while success and numFrameRemaining > 0: videoWriter.write(frame) success,frame = cameraCapture.read() numFrameRemaining -= 1 cameraCapture.release()
在上述代码中,除了前面说过的三个部分,还包括启动web和websocket线程的部分。因为核心逻辑为读取视频数据并写入文件,所以其他部分,以线程的模式启动,以便同时进行处理。
将上述代码保存为DrivingRecorderAndMjpegServer.py,然后运行即可。(依赖包,见代码库中requirements.txt)
实际访问效果如下:
六、历史数据RestFul服务开发
历史数据服务,本来也可以使用Python直接手写,但考虑到可扩展性,使用Django来进行了编写。
Djano服务,需要提供如下的功能:
2和3本质都是一个问题,通过Django的static功能,就能实现。
也就是在settings.py配置中,提供下面的配置即可:
STATIC_URL = 'static/'
STATICFILES_DIRS = [
BASE_DIR / "static"
]
1对外提供api服务,则需要设置对应的url接口,以及读取历史文件信息,生成前端需要的json数据结构,这部分的具体代码如下:
# 媒体文件存放目录,以及缩略图和视频文件的后缀
THUMB_HOME_DIR = "%s/%s/data/thumbs/" % (BASE_DIR, STATIC_URL)
VIDEO_HOME_DIR = "%s/%s/data/videos/" % (BASE_DIR, STATIC_URL)
IMG_FILTER = [".jpg"]
MEDIA_FILTER = [ ".mkv"]
import json
from django.shortcuts import render, HttpResponse
from rest_framework.response import Response
from rest_framework.permissions import AllowAny
from rest_framework.decorators import api_view, permission_classes
import os
from django.conf import settings
THUMB_HOME_DIR = settings.THUMB_HOME_DIR
VIDEO_HOME_DIR = settings.VIDEO_HOME_DIR
IMG_FILTER = settings.IMG_FILTER
MEDIA_FILTER = settings.MEDIA_FILTER
# Create your views here.
@api_view(['GET'],)
@permission_classes([AllowAny],)
def hello_django(request):
str = '''[
{
"id": 1,
"time": "2022-07-28 21:00",
"title": "2022-07-28 21:00",
"body": "videos/2022-07-28_2100.mp4"
},
{
"id": 2,
"time": "2022-07-28 23:00",
"title": "2022-07-28 23:00",
"body": "videos/2022-07-28_2300.mp4"
},
{
"id": 3,
"time": "2022-07-28 25:00",
"title": "2022-07-28 25:00",
"body": "videos/2022-07-28_2500.mp4"
}
]'''
_json = json.loads(str)
return HttpResponse(json.dumps(_json), content_type='application/json')
@api_view(['GET'],)
@permission_classes([AllowAny],)
def history_list(request):
next = request.GET.get("next", '')
print(f"thumb next = {next}")
path = "/".join(request.path.split("/")[3:])
print(f"thumb request.path= {request.path}")
print(f"thumb path = {path}")
#print os.listdir(FILE_HOME_DIR+".none/")
data = {"files":[], "dirs":[]}
print(data)
child_path = THUMB_HOME_DIR+next
print(f"child_path = {child_path}")
data['cur_dir'] = path+next
print(data)
for dir in os.listdir(child_path):
if os.path.isfile(child_path+"/"+dir):
if os.path.splitext(dir)[1] in IMG_FILTER:
data['files'].append(dir)
else:
data['dirs'].append(dir)
print(data)
data['files']=sorted(data['files'])
data['files'].reverse()
data['infos'] = []
for i in range(0,len(data['files'])):
thumb_name = data['files'][i]
video_name = thumb_name.replace('.jpg', MEDIA_FILTER[0])
file_time = thumb_name.replace('.jpg', '').replace('_', ' ')
data['infos'].append(
{
"id": i,
"time": file_time,
"title": file_time,
"body": thumb_name,
'thumb': thumb_name,
'video': video_name
}
)
return Response(data['infos'], status = 200)
其中有两个接口:
hello_django
是最开始学习使用的,返回写死的json数据。
history_list
,则是自动遍历缩略图文件夹,获取缩略图文件信息,并生成所需要的json数据格式。
在对应的代码库文件中,也包含了requirements.txt,其中标明了实际需要的依赖库。
下载代码,进入manage.py所在的目录后,执行下面的命令即可启动:
访问 192.168.1.15:8000/app/hellodjango :
访问:History List – Django REST framework
可以看到 history_list接口,已经可以提供实际需要的数据了。
七、Flutter Web界面开发
这个部分设计的代码比较多,所以只对关键部分的代码进行说明。
开发的实际代码,位于lib目录,具体为:
整个界面,使用了Scaffold
来模拟手机/Pad的操作界面,具体界面如下:
在实时画面界面中,使用了WebSocket监听,获取到信息,就使用Stream模式,推送给视频播放。
在历史记录界面中,则通过RestFul请求列表数据,然后呈现。
八、整体运行效果
实际的运行效果,不用多说,看界面就成:
九、车试:
经过反复的测试验证,确保各项功能完整后,进行了上车实测。
因为最近的疫情原因,所以只在村里转了一圈,进行了实际测试,可以查看最后的视频。后续有机会,再找个晴朗的天气,去环境优美的地方实际拍摄录制。
十、实际代码说明:
完整的代码,请通过 米尔行车记录仪: 米尔行车记录仪 (https://gitee.com/honestqiao/MYiR-Driving-Recorder) 获取。
代码目录说明如下:
在以上仓库中,包含了详细的代码使用说明。
在实际应用中,将记录视频的data目录与后端static/data目录关联,以便两者统一。
十一、感谢
在研究学习的过程中,参考了数十篇各类资料,先将部分列出如下。对所有学习过的资料的作者,表示深深的感谢。
fetch
is widely supported enough to use · Issue #595 · dart-lang/http (github.com)十二、总结
在研究学习的过程中,对Linux系统下的UVC框架有了进一步的了解,对Flutter进行应用开发有了实际的了解,对OpenCV的实际应用也有了具体的了解。
在实际开发的过程中,遇到的最大的坑来自Flutter,因为变化太快,有一些功能可能兼容性没有跟上。不过更多是自己学艺不精导致的。
另外,目前还只是V1.0版本,后续还存在较大的优化空间。例如对于OpenCV的应用,可以调整参数,优化获取的视频数据的指令和大小等。这些有待于进一步学习后进行。
最主要的,对米尔MYD-YT507开发板有了深入的了解,进行了实际的应用。作为一款车规级处理器T507的开发板,名不虚传!
全部0条评论
快来发表一下你的评论吧 !