上传文件至 /
This commit is contained in:
69
README.md
69
README.md
@@ -1,2 +1,69 @@
|
||||
# opencv-pedestrian-recognition
|
||||
# 行人识别与闯红灯检测系统
|
||||
|
||||
这是一个基于OpenCV的行人识别与闯红灯检测系统,提供了用户友好的可视化界面。系统能够通过摄像头或视频文件实时检测行人,并在开启红绿灯检测功能后识别行人闯红灯行为。
|
||||
|
||||
## 功能特性
|
||||
|
||||
1. **行人检测**:使用基于OpenCV dnn模块的YOLOv3-tiny模型检测画面中的行人
|
||||
2. **红绿灯识别**:通过颜色识别检测红绿灯状态
|
||||
3. **闯红灯检测**:当检测到红灯且有行人在道路区域时,标记为闯红灯
|
||||
4. **可视化界面**:提供了PyQt5实现的图形界面,支持摄像头和视频文件输入
|
||||
5. **实时统计**:显示检测到的行人数量和闯红灯人数
|
||||
6. **自动模型下载**:首次运行时自动下载YOLOv3-tiny模型文件
|
||||
|
||||
## 安装要求
|
||||
|
||||
- Python 3.6+
|
||||
- 所需依赖包见requirements.txt
|
||||
|
||||
## 安装步骤
|
||||
|
||||
1. 克隆或下载本项目到本地
|
||||
2. 安装所需依赖:
|
||||
```
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
## 使用方法
|
||||
|
||||
1. 运行主程序:
|
||||
```
|
||||
python pedestrian_recognition.py
|
||||
```
|
||||
|
||||
2. 在界面中可以:
|
||||
- 点击"使用摄像头"按钮启动摄像头实时检测
|
||||
- 点击"打开视频文件"按钮选择本地视频文件进行检测
|
||||
- 勾选"开启闯红灯检测"复选框启用红绿灯识别和闯红灯检测功能
|
||||
- 点击"停止"按钮停止当前视频流
|
||||
|
||||
## 技术说明
|
||||
|
||||
- **行人检测**:使用基于OpenCV dnn模块的YOLOv3-tiny模型进行行人检测,比传统HOG+SVM方法具有更高的检测精度和抗干扰能力
|
||||
- **红绿灯识别**:通过HSV颜色空间分析识别红色和绿色交通灯
|
||||
- **界面实现**:使用PyQt5构建图形界面,实现视频显示和用户交互
|
||||
- **视频处理**:使用OpenCV进行视频帧处理,PyQt5进行界面渲染
|
||||
- **模型管理**:自动下载和管理YOLOv3-tiny模型文件,包括配置文件、权重文件和类别名称文件
|
||||
|
||||
## 注意事项
|
||||
|
||||
1. 行人检测在光线充足的环境下效果更好,YOLOv3-tiny模型相比传统方法具有更好的抗干扰能力
|
||||
2. 红绿灯识别依赖于清晰可见的交通灯颜色,请确保画面中红绿灯区域足够明显
|
||||
3. 当前实现的闯红灯检测基于简单的位置判断,可能需要根据实际场景调整参数以提高准确性
|
||||
4. 首次运行时会自动下载YOLOv3-tiny模型文件,需要网络连接
|
||||
5. 模型文件默认保存在项目目录下的yolo_models文件夹中
|
||||
|
||||
## 扩展可能性
|
||||
|
||||
1. 添加深度学习模型以提高行人检测和红绿灯识别的准确性
|
||||
2. 实现多目标跟踪功能
|
||||
3. 添加违规行为记录和报警功能
|
||||
4. 集成声音提示功能
|
||||
5. 支持多摄像头监控
|
||||
|
||||
## 许可证
|
||||
|
||||
本项目采用MIT许可证。
|
||||
|
||||
## 源代码和编译的可执行文件(可能是老版本,)
|
||||
通过网盘分享的文件:opencv-pedestrian-recognition.zip 链接: https://pan.baidu.com/s/1D4dYF_k6PXVrZgEzGA7XVg?pwd=mfxh 提取码: mfxh --来自百度网盘超级会员v9的分享
|
||||
|
||||
911
pedestrian_recognition.py
Normal file
911
pedestrian_recognition.py
Normal file
@@ -0,0 +1,911 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
import sys
|
||||
import os
|
||||
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
|
||||
QHBoxLayout, QPushButton, QLabel, QCheckBox,
|
||||
QFileDialog, QMessageBox, QListWidget, QSplitter)
|
||||
from PyQt5.QtGui import QImage, QPixmap
|
||||
from PyQt5.QtCore import QTimer, Qt
|
||||
import imutils
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
class PedestrianRecognitionApp(QMainWindow):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
# 初始化变量
|
||||
self.camera = None
|
||||
self.video_path = 'video'
|
||||
self.timer = QTimer()
|
||||
self.timer.timeout.connect(self.update_frame)
|
||||
self.is_running = False
|
||||
self.is_paused = False
|
||||
self.traffic_light_detection_enabled = False
|
||||
self.traffic_light_state = None # 'red', 'green', or None
|
||||
self.traffic_light_class_id = 9 # 红绿灯在COCO数据集中的类别ID
|
||||
|
||||
# 处理PyInstaller打包后的路径问题
|
||||
if hasattr(sys, '_MEIPASS'):
|
||||
# 在打包后的环境中
|
||||
self.base_dir = sys._MEIPASS
|
||||
|
||||
# 优先使用exe文件所在目录下的video文件夹,方便用户添加自己的视频
|
||||
exe_dir = os.path.dirname(os.path.abspath(sys.executable))
|
||||
self.default_video_folder = os.path.join(exe_dir, "video")
|
||||
else:
|
||||
# 在开发环境中
|
||||
self.base_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
self.default_video_folder = os.path.join(self.base_dir, "video")
|
||||
|
||||
# 确保video文件夹存在
|
||||
os.makedirs(self.default_video_folder, exist_ok=True)
|
||||
# 选中的视频文件路径
|
||||
self.selected_video_path = None
|
||||
|
||||
# 加载YOLOv3模型 (使用OpenCV dnn模块,避免PyTorch依赖)
|
||||
self.net = None
|
||||
self.output_layers = []
|
||||
self.classes = []
|
||||
self.person_class_id = 0
|
||||
self.load_yolo_model()
|
||||
|
||||
# 初始化界面
|
||||
self.init_ui()
|
||||
|
||||
# 尝试加载中文字体,使用默认字体作为备选
|
||||
try:
|
||||
# 尝试使用系统中的SimHei字体
|
||||
self.font = ImageFont.truetype("simhei.ttf", 20)
|
||||
except:
|
||||
# 如果找不到指定字体,使用PIL默认字体
|
||||
self.font = ImageFont.load_default()
|
||||
|
||||
|
||||
|
||||
def load_default_video(self):
|
||||
"""从默认视频文件夹加载第一个视频文件"""
|
||||
try:
|
||||
# 确保默认视频文件夹存在
|
||||
os.makedirs(self.default_video_folder, exist_ok=True)
|
||||
|
||||
# 获取默认文件夹中的所有视频文件
|
||||
video_extensions = ['.mp4', '.avi', '.mov', '.mkv']
|
||||
video_files = []
|
||||
|
||||
for file in os.listdir(self.default_video_folder):
|
||||
if any(file.lower().endswith(ext) for ext in video_extensions):
|
||||
video_files.append(os.path.join(self.default_video_folder, file))
|
||||
|
||||
# 如果找到视频文件,自动加载第一个
|
||||
if video_files:
|
||||
# 按文件名排序,确保加载的是确定性的第一个文件
|
||||
video_files.sort()
|
||||
self.video_path = video_files[0]
|
||||
self.status_label.setText(f'已从默认文件夹加载视频: {os.path.basename(self.video_path)}')
|
||||
except Exception as e:
|
||||
# 如果自动加载失败,不影响程序运行
|
||||
print(f"自动加载默认视频失败: {e}")
|
||||
|
||||
|
||||
|
||||
def load_yolo_model(self):
|
||||
"""加载YOLOv3模型配置和权重"""
|
||||
# 模型配置文件和权重文件的URL
|
||||
cfg_url = "https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3-tiny.cfg"
|
||||
weights_url = "https://pjreddie.com/media/files/yolov3-tiny.weights"
|
||||
names_url = "https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names"
|
||||
|
||||
# 确定模型保存路径
|
||||
if hasattr(sys, '_MEIPASS'):
|
||||
# 在打包后的环境中,优先使用exe文件所在目录
|
||||
exe_dir = os.path.dirname(os.path.abspath(sys.executable))
|
||||
model_dir = os.path.join(exe_dir, "yolo_models")
|
||||
else:
|
||||
# 在开发环境中
|
||||
model_dir = os.path.join(self.base_dir, "yolo_models")
|
||||
|
||||
# 确保文件夹存在
|
||||
os.makedirs(model_dir, exist_ok=True)
|
||||
|
||||
cfg_path = os.path.join(model_dir, "yolov3-tiny.cfg")
|
||||
weights_path = os.path.join(model_dir, "yolov3-tiny.weights")
|
||||
names_path = os.path.join(model_dir, "coco.names")
|
||||
|
||||
# 如果文件不存在,则下载
|
||||
if not os.path.exists(cfg_path):
|
||||
print(f"正在下载YOLO配置文件到 {cfg_path}")
|
||||
self.download_file(cfg_url, cfg_path)
|
||||
|
||||
if not os.path.exists(weights_path):
|
||||
print(f"正在下载YOLO权重文件到 {weights_path}")
|
||||
print("注意:这可能需要一些时间,请耐心等待...")
|
||||
self.download_file(weights_url, weights_path)
|
||||
|
||||
if not os.path.exists(names_path):
|
||||
print(f"正在下载类别名称文件到 {names_path}")
|
||||
self.download_file(names_url, names_path)
|
||||
|
||||
# 加载类别名称
|
||||
with open(names_path, 'r') as f:
|
||||
self.classes = [line.strip() for line in f.readlines()]
|
||||
|
||||
# 加载模型
|
||||
try:
|
||||
self.net = cv2.dnn.readNet(weights_path, cfg_path)
|
||||
# 设置计算后端
|
||||
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_DEFAULT)
|
||||
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
|
||||
|
||||
# 获取输出层
|
||||
layer_names = self.net.getLayerNames()
|
||||
# OpenCV 4.x和3.x获取输出层的方式不同
|
||||
try:
|
||||
self.output_layers = [layer_names[i - 1] for i in self.net.getUnconnectedOutLayers()]
|
||||
except:
|
||||
self.output_layers = [layer_names[i[0] - 1] for i in self.net.getUnconnectedOutLayers()]
|
||||
|
||||
print("YOLO模型加载成功")
|
||||
except Exception as e:
|
||||
print(f"加载YOLO模型时出错: {e}")
|
||||
print("将使用默认的行人检测方法")
|
||||
|
||||
def download_file(self, url, save_path):
|
||||
"""下载文件的辅助方法,带进度条、重试机制和多线程下载支持"""
|
||||
import urllib.request
|
||||
import time
|
||||
import sys
|
||||
import threading
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import shutil
|
||||
import os
|
||||
|
||||
# 替代下载源列表
|
||||
alternative_urls = {
|
||||
"https://github.com/pjreddie/darknet/blob/master/cfg/yolov3-tiny.cfg?raw=true": [
|
||||
"https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3-tiny.cfg",
|
||||
"https://pjreddie.com/media/files/yolov3-tiny.cfg"
|
||||
],
|
||||
"https://pjreddie.com/media/files/yolov3-tiny.weights": [
|
||||
"https://github.com/AlexeyAB/darknet/releases/download/darknet_yolo_v3_optimal/yolov3-tiny.weights"
|
||||
],
|
||||
"https://github.com/pjreddie/darknet/blob/master/data/coco.names?raw=true": [
|
||||
"https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names"
|
||||
]
|
||||
}
|
||||
|
||||
# 获取所有可能的下载源
|
||||
all_urls = [url] + alternative_urls.get(url, [])
|
||||
max_retries = 3
|
||||
retry_count = 0
|
||||
current_url_index = 0
|
||||
|
||||
# 尝试所有下载源
|
||||
while current_url_index < len(all_urls) and retry_count < max_retries:
|
||||
current_url = all_urls[current_url_index]
|
||||
try:
|
||||
print(f"正在下载 {os.path.basename(save_path)} (尝试 {retry_count + 1}/{max_retries}, 源 {current_url_index + 1}/{len(all_urls)})...")
|
||||
print(f"下载地址: {current_url}")
|
||||
|
||||
# 检查是否支持范围请求
|
||||
req = urllib.request.Request(current_url, method='HEAD')
|
||||
with urllib.request.urlopen(req) as response:
|
||||
content_length = response.getheader('Content-Length')
|
||||
accept_ranges = response.getheader('Accept-Ranges')
|
||||
|
||||
total_size = int(content_length) if content_length else 0
|
||||
|
||||
# 回调函数用于显示下载进度
|
||||
class ProgressTracker:
|
||||
def __init__(self):
|
||||
self.total_downloaded = 0
|
||||
self.lock = threading.Lock()
|
||||
self.start_time = time.time()
|
||||
|
||||
def update(self, downloaded):
|
||||
with self.lock:
|
||||
self.total_downloaded += downloaded
|
||||
if total_size > 0:
|
||||
percent = min(100, int(self.total_downloaded * 100 / total_size))
|
||||
elapsed = time.time() - self.start_time
|
||||
if elapsed > 0:
|
||||
speed = self.total_downloaded / elapsed / 1024 # KB/s
|
||||
else:
|
||||
speed = 0
|
||||
sys.stdout.write(f"\r[{percent:3d}%] 已下载: {self.total_downloaded / 1024 / 1024:.2f} MB / {total_size / 1024 / 1024:.2f} MB | 速度: {speed:.2f} KB/s")
|
||||
sys.stdout.flush()
|
||||
|
||||
tracker = ProgressTracker()
|
||||
|
||||
# 如果服务器支持范围请求且文件较大,使用多线程下载
|
||||
if accept_ranges == 'bytes' and total_size > 10 * 1024 * 1024: # 文件大于10MB
|
||||
print(f"文件较大,启用多线程下载模式")
|
||||
self._download_multithreaded(current_url, save_path, total_size, tracker)
|
||||
else:
|
||||
# 使用单线程下载
|
||||
def progress_hook(count, block_size, total_size):
|
||||
tracker.update(block_size)
|
||||
|
||||
urllib.request.urlretrieve(current_url, save_path, reporthook=progress_hook)
|
||||
|
||||
print(f"\n下载完成: {save_path}")
|
||||
return # 下载成功,退出函数
|
||||
|
||||
except Exception as e:
|
||||
retry_count += 1
|
||||
print(f"\n下载失败: {e}")
|
||||
if retry_count >= max_retries:
|
||||
retry_count = 0
|
||||
current_url_index += 1
|
||||
else:
|
||||
wait_time = 2 * retry_count # 指数退避
|
||||
print(f"{wait_time}秒后重试...")
|
||||
time.sleep(wait_time)
|
||||
|
||||
print(f"所有下载源均失败,请手动下载 {url} 并保存为 {save_path}")
|
||||
|
||||
def _download_multithreaded(self, url, save_path, total_size, tracker):
|
||||
"""多线程下载函数"""
|
||||
import urllib.request
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
num_threads = 4 # 使用4个线程下载
|
||||
chunk_size = total_size // num_threads
|
||||
temp_files = []
|
||||
|
||||
def download_chunk(start, end, temp_file):
|
||||
"""下载文件的一个块"""
|
||||
headers = {'Range': f'bytes={start}-{end}'}
|
||||
req = urllib.request.Request(url, headers=headers)
|
||||
with urllib.request.urlopen(req) as response, open(temp_file, 'wb') as f:
|
||||
while True:
|
||||
data = response.read(8192) # 8KB chunks
|
||||
if not data:
|
||||
break
|
||||
f.write(data)
|
||||
tracker.update(len(data))
|
||||
|
||||
try:
|
||||
# 创建临时目录
|
||||
temp_dir = tempfile.mkdtemp()
|
||||
|
||||
# 创建线程池
|
||||
with ThreadPoolExecutor(max_workers=num_threads) as executor:
|
||||
futures = []
|
||||
# 提交下载任务
|
||||
for i in range(num_threads):
|
||||
start = i * chunk_size
|
||||
# 最后一个块下载到文件末尾
|
||||
end = total_size - 1 if i == num_threads - 1 else (i + 1) * chunk_size - 1
|
||||
temp_file = os.path.join(temp_dir, f'chunk_{i}.part')
|
||||
temp_files.append(temp_file)
|
||||
futures.append(executor.submit(download_chunk, start, end, temp_file))
|
||||
|
||||
# 等待所有线程完成
|
||||
for future in futures:
|
||||
future.result()
|
||||
|
||||
# 合并所有块
|
||||
with open(save_path, 'wb') as outfile:
|
||||
for temp_file in temp_files:
|
||||
with open(temp_file, 'rb') as infile:
|
||||
shutil.copyfileobj(infile, outfile)
|
||||
|
||||
finally:
|
||||
# 清理临时文件
|
||||
for temp_file in temp_files:
|
||||
if os.path.exists(temp_file):
|
||||
try:
|
||||
os.remove(temp_file)
|
||||
except:
|
||||
pass
|
||||
# 清理临时目录
|
||||
if os.path.exists(temp_dir):
|
||||
try:
|
||||
os.rmdir(temp_dir)
|
||||
except:
|
||||
pass
|
||||
|
||||
def create_video_list_widget(self):
|
||||
# 创建视频文件列表组件
|
||||
video_list_container = QWidget()
|
||||
video_list_layout = QVBoxLayout(video_list_container)
|
||||
|
||||
# 标题
|
||||
title_label = QLabel("可用视频文件")
|
||||
title_label.setStyleSheet("font-weight: bold; color: #333;")
|
||||
video_list_layout.addWidget(title_label)
|
||||
|
||||
# 视频文件列表
|
||||
self.video_list_widget = QListWidget()
|
||||
self.video_list_widget.setSelectionMode(QListWidget.SingleSelection)
|
||||
self.video_list_widget.itemClicked.connect(self.on_video_item_clicked)
|
||||
video_list_layout.addWidget(self.video_list_widget)
|
||||
|
||||
# 刷新按钮
|
||||
refresh_button = QPushButton("刷新列表")
|
||||
refresh_button.clicked.connect(self.refresh_video_list)
|
||||
video_list_layout.addWidget(refresh_button)
|
||||
|
||||
return video_list_container
|
||||
|
||||
def refresh_video_list(self):
|
||||
"""加载并显示默认视频文件夹中的视频文件"""
|
||||
try:
|
||||
# 清空当前列表
|
||||
self.video_list_widget.clear()
|
||||
|
||||
# 确保默认视频文件夹存在
|
||||
os.makedirs(self.default_video_folder, exist_ok=True)
|
||||
|
||||
# 获取文件夹中的所有视频文件
|
||||
video_extensions = ['.mp4', '.avi', '.mov', '.mkv']
|
||||
video_files = []
|
||||
|
||||
for file in os.listdir(self.default_video_folder):
|
||||
if any(file.lower().endswith(ext) for ext in video_extensions):
|
||||
video_files.append(file)
|
||||
|
||||
# 按名称排序
|
||||
video_files.sort()
|
||||
|
||||
# 添加到列表中
|
||||
for file in video_files:
|
||||
self.video_list_widget.addItem(file)
|
||||
|
||||
self.status_label.setText(f'发现 {len(video_files)} 个视频文件')
|
||||
except Exception as e:
|
||||
print(f"刷新视频列表失败: {e}")
|
||||
self.status_label.setText('刷新视频列表失败')
|
||||
|
||||
def on_video_item_clicked(self, item):
|
||||
"""处理视频列表项点击事件"""
|
||||
# 获取选中的视频文件名
|
||||
video_filename = item.text()
|
||||
# 构建完整路径
|
||||
self.selected_video_path = os.path.join(self.default_video_folder, video_filename)
|
||||
# 更新状态显示
|
||||
self.status_label.setText(f'已选择: {video_filename},请点击"播放视频文件"按钮开始识别')
|
||||
# 如果视频正在播放,停止它
|
||||
if self.is_running:
|
||||
self.stop_video()
|
||||
|
||||
def init_ui(self):
|
||||
# 设置窗口标题和大小
|
||||
self.setWindowTitle('行人识别与闯红灯检测系统')
|
||||
self.setGeometry(100, 100, 1000, 700)
|
||||
|
||||
# 创建主布局
|
||||
central_widget = QWidget()
|
||||
main_layout = QVBoxLayout(central_widget)
|
||||
self.setCentralWidget(central_widget)
|
||||
|
||||
# 创建水平分割器
|
||||
splitter = QSplitter(Qt.Horizontal)
|
||||
|
||||
# 创建左侧视频文件列表并获取返回的widget
|
||||
video_list_widget = self.create_video_list_widget()
|
||||
|
||||
# 创建右侧视频显示区域
|
||||
right_widget = QWidget()
|
||||
right_layout = QVBoxLayout(right_widget)
|
||||
|
||||
# 创建视频显示标签
|
||||
self.video_label = QLabel()
|
||||
self.video_label.setAlignment(Qt.AlignCenter)
|
||||
self.video_label.setMinimumSize(640, 480)
|
||||
right_layout.addWidget(self.video_label)
|
||||
|
||||
# 将左侧列表和右侧视频区域添加到分割器
|
||||
splitter.addWidget(video_list_widget)
|
||||
splitter.addWidget(right_widget)
|
||||
|
||||
# 设置分割器的初始大小比例
|
||||
splitter.setSizes([200, 800])
|
||||
|
||||
# 将分割器添加到主布局
|
||||
main_layout.addWidget(splitter)
|
||||
|
||||
# 创建控制按钮布局
|
||||
control_layout = QHBoxLayout()
|
||||
|
||||
# 摄像头按钮
|
||||
self.camera_button = QPushButton('使用摄像头')
|
||||
self.camera_button.clicked.connect(self.start_camera)
|
||||
control_layout.addWidget(self.camera_button)
|
||||
|
||||
# 视频文件按钮
|
||||
self.video_button = QPushButton('播放视频文件')
|
||||
self.video_button.clicked.connect(self.open_video_file)
|
||||
control_layout.addWidget(self.video_button)
|
||||
|
||||
# 停止按钮
|
||||
self.stop_button = QPushButton('停止')
|
||||
self.stop_button.clicked.connect(self.stop_video)
|
||||
self.stop_button.setEnabled(False)
|
||||
control_layout.addWidget(self.stop_button)
|
||||
|
||||
# 暂停/继续按钮
|
||||
self.pause_button = QPushButton('暂停')
|
||||
self.pause_button.clicked.connect(self.toggle_pause)
|
||||
self.pause_button.setEnabled(False)
|
||||
control_layout.addWidget(self.pause_button)
|
||||
|
||||
# 红绿灯检测复选框
|
||||
self.traffic_light_checkbox = QCheckBox('开启闯红灯检测')
|
||||
self.traffic_light_checkbox.stateChanged.connect(self.toggle_traffic_light_detection)
|
||||
control_layout.addWidget(self.traffic_light_checkbox)
|
||||
|
||||
main_layout.addLayout(control_layout)
|
||||
|
||||
# 添加状态标签
|
||||
self.status_label = QLabel('就绪')
|
||||
main_layout.addWidget(self.status_label)
|
||||
|
||||
# 在status_label初始化后再调用refresh_video_list
|
||||
self.refresh_video_list()
|
||||
|
||||
def start_camera(self):
|
||||
# 停止之前的视频流
|
||||
self.stop_video()
|
||||
|
||||
# 尝试打开摄像头
|
||||
self.camera = cv2.VideoCapture(0)
|
||||
if not self.camera.isOpened():
|
||||
QMessageBox.critical(self, '错误', '无法打开摄像头')
|
||||
return
|
||||
|
||||
self.is_running = True
|
||||
self.is_paused = False
|
||||
self.update_ui_state()
|
||||
self.timer.start(30) # 约33fps
|
||||
self.status_label.setText('摄像头运行中...')
|
||||
|
||||
def open_video_file(self):
|
||||
# 停止之前的视频流
|
||||
self.stop_video()
|
||||
|
||||
# 如果已经从列表中选择了视频文件,直接使用它
|
||||
if self.selected_video_path and os.path.exists(self.selected_video_path):
|
||||
file_path = self.selected_video_path
|
||||
else:
|
||||
# 确保默认视频文件夹存在
|
||||
os.makedirs(self.default_video_folder, exist_ok=True)
|
||||
|
||||
# 打开文件选择对话框,默认路径设为default_video_folder
|
||||
file_path, _ = QFileDialog.getOpenFileName(self, '选择视频文件', self.default_video_folder, 'Video Files (*.mp4 *.avi *.mov *.mkv)')
|
||||
if not file_path:
|
||||
return
|
||||
|
||||
self.video_path = file_path
|
||||
self.camera = cv2.VideoCapture(file_path)
|
||||
if not self.camera.isOpened():
|
||||
QMessageBox.critical(self, '错误', '无法打开视频文件')
|
||||
return
|
||||
|
||||
self.is_running = True
|
||||
self.is_paused = False
|
||||
self.update_ui_state()
|
||||
self.timer.start(30)
|
||||
# 显示更简洁的状态信息
|
||||
self.status_label.setText(f'视频播放中: {os.path.basename(file_path)} - 正在进行行人识别...')
|
||||
|
||||
def stop_video(self):
|
||||
if self.timer.isActive():
|
||||
self.timer.stop()
|
||||
if self.camera is not None:
|
||||
self.camera.release()
|
||||
self.camera = None
|
||||
self.is_running = False
|
||||
self.is_paused = False
|
||||
self.update_ui_state()
|
||||
self.status_label.setText('已停止')
|
||||
|
||||
def toggle_traffic_light_detection(self, state):
|
||||
self.traffic_light_detection_enabled = state == Qt.Checked
|
||||
if self.traffic_light_detection_enabled:
|
||||
self.status_label.setText('已开启闯红灯检测')
|
||||
else:
|
||||
self.status_label.setText('已关闭闯红灯检测')
|
||||
|
||||
def update_ui_state(self):
|
||||
self.camera_button.setEnabled(not self.is_running)
|
||||
self.video_button.setEnabled(not self.is_running)
|
||||
self.stop_button.setEnabled(self.is_running)
|
||||
self.pause_button.setEnabled(self.is_running)
|
||||
|
||||
def detect_pedestrians(self, frame):
|
||||
"""使用YOLOv3模型检测行人"""
|
||||
# 调整帧大小以平衡性能和精度
|
||||
frame = imutils.resize(frame, width=min(800, frame.shape[1]))
|
||||
|
||||
# 如果YOLO模型没有加载成功,返回空列表
|
||||
if self.net is None:
|
||||
# 这里可以添加回退到HOG+SVM的逻辑,但暂时返回空
|
||||
return frame, []
|
||||
|
||||
# 获取图像尺寸
|
||||
height, width, channels = frame.shape
|
||||
|
||||
# 创建blob对象 (缩放为416x416,归一化,交换通道)
|
||||
blob = cv2.dnn.blobFromImage(frame, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
|
||||
|
||||
# 将blob输入到网络
|
||||
self.net.setInput(blob)
|
||||
|
||||
# 获取输出层的结果
|
||||
outs = self.net.forward(self.output_layers)
|
||||
|
||||
# 存储检测到的行人框
|
||||
pedestrian_boxes = []
|
||||
|
||||
# 解析输出结果
|
||||
for out in outs:
|
||||
for detection in out:
|
||||
# 获取所有类别的分数
|
||||
scores = detection[5:]
|
||||
# 获取最高分数的类别ID
|
||||
class_id = np.argmax(scores)
|
||||
# 获取置信度
|
||||
confidence = scores[class_id]
|
||||
|
||||
# 如果是人类且置信度大于阈值
|
||||
if class_id == self.person_class_id and confidence > 0.6:
|
||||
# 计算边界框坐标 (YOLO输出的是中心点和宽高)
|
||||
center_x = int(detection[0] * width)
|
||||
center_y = int(detection[1] * height)
|
||||
w = int(detection[2] * width)
|
||||
h = int(detection[3] * height)
|
||||
|
||||
# 计算左上角坐标
|
||||
x = int(center_x - w / 2)
|
||||
y = int(center_y - h / 2)
|
||||
|
||||
# 确保坐标在图像范围内
|
||||
x = max(0, x)
|
||||
y = max(0, y)
|
||||
w = min(width - x, w)
|
||||
h = min(height - y, h)
|
||||
|
||||
# 过滤掉过小的检测框,避免误检
|
||||
if w > 30 and h > 60:
|
||||
pedestrian_boxes.append((x, y, w, h))
|
||||
# 绘制绿色矩形框
|
||||
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
|
||||
|
||||
# 对重叠的检测框应用非最大值抑制,提高检测精度
|
||||
if len(pedestrian_boxes) > 0:
|
||||
# 转换为OpenCV需要的格式
|
||||
boxes = np.array(pedestrian_boxes)
|
||||
# 创建置信度数组 (这里简化处理,全部设为1)
|
||||
confidences = np.ones(len(boxes))
|
||||
# 应用非最大值抑制
|
||||
indices = cv2.dnn.NMSBoxes(boxes.tolist(), confidences.tolist(), 0.4, 0.4)
|
||||
# 只保留非最大值抑制后的框
|
||||
if len(indices) > 0:
|
||||
# 根据OpenCV版本处理不同格式的indices
|
||||
filtered_boxes = []
|
||||
if isinstance(indices, np.ndarray):
|
||||
# 处理不同版本的OpenCV返回格式
|
||||
if indices.ndim == 2:
|
||||
# 格式为 [[0], [1], [2]]
|
||||
for i in indices:
|
||||
idx = i[0]
|
||||
filtered_boxes.append(pedestrian_boxes[idx])
|
||||
# 重新绘制保留的矩形框
|
||||
x, y, w, h = pedestrian_boxes[idx]
|
||||
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
|
||||
else:
|
||||
# 格式为 [0, 1, 2]
|
||||
for i in indices:
|
||||
idx = int(i)
|
||||
filtered_boxes.append(pedestrian_boxes[idx])
|
||||
# 重新绘制保留的矩形框
|
||||
x, y, w, h = pedestrian_boxes[idx]
|
||||
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
|
||||
else:
|
||||
# 兼容列表格式
|
||||
for i in indices:
|
||||
if isinstance(i, list) or isinstance(i, tuple):
|
||||
idx = i[0]
|
||||
else:
|
||||
idx = i
|
||||
filtered_boxes.append(pedestrian_boxes[idx])
|
||||
# 重新绘制保留的矩形框
|
||||
x, y, w, h = pedestrian_boxes[idx]
|
||||
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
|
||||
|
||||
pedestrian_boxes = filtered_boxes
|
||||
|
||||
return frame, pedestrian_boxes
|
||||
|
||||
def detect_traffic_light(self, frame):
|
||||
"""使用YOLOv3检测红绿灯位置,然后通过颜色分析判断状态"""
|
||||
# 首先使用YOLO检测红绿灯位置
|
||||
traffic_light_boxes = []
|
||||
traffic_light_state = "未知"
|
||||
|
||||
# 如果YOLO模型没有加载成功,直接返回原帧和未知状态
|
||||
if self.net is None:
|
||||
frame = self.draw_chinese_text(frame, "红绿灯: 检测模型未加载", (10, 30), font_size=20, color=(255, 255, 255))
|
||||
return frame, traffic_light_state
|
||||
|
||||
# 获取图像尺寸
|
||||
height, width, channels = frame.shape
|
||||
|
||||
# 创建blob对象
|
||||
blob = cv2.dnn.blobFromImage(frame, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
|
||||
|
||||
# 将blob输入到网络
|
||||
self.net.setInput(blob)
|
||||
|
||||
# 获取输出层的结果
|
||||
outs = self.net.forward(self.output_layers)
|
||||
|
||||
# 解析输出结果,寻找红绿灯
|
||||
for out in outs:
|
||||
for detection in out:
|
||||
# 获取所有类别的分数
|
||||
scores = detection[5:]
|
||||
# 获取最高分数的类别ID
|
||||
class_id = np.argmax(scores)
|
||||
# 获取置信度
|
||||
confidence = scores[class_id]
|
||||
|
||||
# 如果是红绿灯且置信度大于阈值(降低阈值以提高检测率)
|
||||
if class_id == self.traffic_light_class_id and confidence > 0.3:
|
||||
# 计算边界框坐标
|
||||
center_x = int(detection[0] * width)
|
||||
center_y = int(detection[1] * height)
|
||||
|
||||
# 增加检测框大小,确保完全包裹整个红绿灯
|
||||
w = int(detection[2] * width * 1.5) # 增加宽度50%
|
||||
h = int(detection[3] * height * 1.5) # 增加高度50%
|
||||
|
||||
# 计算左上角坐标
|
||||
x = int(center_x - w / 2)
|
||||
y = int(center_y - h / 2)
|
||||
|
||||
# 确保坐标在图像范围内
|
||||
x = max(0, x)
|
||||
y = max(0, y)
|
||||
w = min(width - x, w)
|
||||
h = min(height - y, h)
|
||||
|
||||
# 添加到红绿灯框列表
|
||||
traffic_light_boxes.append((x, y, w, h))
|
||||
# 绘制蓝色矩形框标记红绿灯位置
|
||||
cv2.rectangle(frame, (x, y), (x + w, y + h), (255, 0, 0), 2)
|
||||
|
||||
# 如果检测到红绿灯,分析其颜色状态
|
||||
if len(traffic_light_boxes) > 0:
|
||||
# 对每个检测到的红绿灯进行颜色分析
|
||||
for (x, y, w, h) in traffic_light_boxes:
|
||||
# 提取红绿灯区域
|
||||
roi = frame[y:y+h, x:x+w]
|
||||
|
||||
# 转换到HSV颜色空间
|
||||
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
|
||||
|
||||
# 定义红色和绿色的HSV范围(降低阈值以提高检测率)
|
||||
# 红色有两个范围
|
||||
lower_red1 = np.array([0, 80, 80])
|
||||
upper_red1 = np.array([15, 255, 255])
|
||||
lower_red2 = np.array([150, 80, 80])
|
||||
upper_red2 = np.array([180, 255, 255])
|
||||
|
||||
lower_green = np.array([30, 80, 80])
|
||||
upper_green = np.array([80, 255, 255])
|
||||
|
||||
# 创建红色和绿色的掩码
|
||||
mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1)
|
||||
mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2)
|
||||
mask_red = mask_red1 + mask_red2
|
||||
mask_green = cv2.inRange(hsv, lower_green, upper_green)
|
||||
|
||||
# 计算红色和绿色像素的数量
|
||||
red_pixels = cv2.countNonZero(mask_red)
|
||||
green_pixels = cv2.countNonZero(mask_green)
|
||||
|
||||
# 设置最小像素数量阈值(降低阈值以提高检测率)
|
||||
min_pixels = max(5, roi.size * 0.005) # 至少5个像素或占区域的0.5%
|
||||
|
||||
# 判断红绿灯状态
|
||||
if red_pixels > green_pixels and red_pixels > min_pixels:
|
||||
traffic_light_state = "红灯"
|
||||
# 在红绿灯区域内绘制红色标记
|
||||
frame = self.draw_chinese_text(frame, "红灯", (x, y - 20), font_size=16, color=(0, 0, 255))
|
||||
elif green_pixels > red_pixels and green_pixels > min_pixels:
|
||||
traffic_light_state = "绿灯"
|
||||
# 在红绿灯区域内绘制绿色标记
|
||||
frame = self.draw_chinese_text(frame, "绿灯", (x, y - 20), font_size=16, color=(0, 255, 0))
|
||||
else:
|
||||
# 不确定状态,可能是黄灯或未检测到明确颜色
|
||||
frame = self.draw_chinese_text(frame, "未知", (x, y - 20), font_size=16, color=(255, 255, 255))
|
||||
else:
|
||||
# 没有检测到红绿灯,直接显示未知状态
|
||||
frame = self.draw_chinese_text(frame, "红绿灯: 未检测到", (10, 30), font_size=20, color=(255, 255, 255))
|
||||
|
||||
# 显示整体红绿灯状态
|
||||
if traffic_light_state == "红灯":
|
||||
# 确保红灯状态显示为红色(RGB格式为255,0,0)
|
||||
frame = self.draw_chinese_text(frame, "红绿灯: 红灯", (10, 30), font_size=20, color=(255, 0, 0))
|
||||
elif traffic_light_state == "绿灯":
|
||||
# 确保绿灯状态显示为绿色(RGB格式为0,255,0)
|
||||
frame = self.draw_chinese_text(frame, "红绿灯: 绿灯", (10, 30), font_size=20, color=(0, 255, 0))
|
||||
else:
|
||||
frame = self.draw_chinese_text(frame, "红绿灯: 未知状态", (10, 30), font_size=20, color=(255, 255, 255))
|
||||
|
||||
return frame, traffic_light_state
|
||||
|
||||
|
||||
def detect_jaywalking(self, frame, pedestrian_boxes, traffic_light_state):
|
||||
# 简单的闯红灯检测逻辑:
|
||||
# 如果检测到红灯,且在画面下半部分有行人,则判定为闯红灯
|
||||
jaywalking_boxes = []
|
||||
|
||||
if traffic_light_state == "红灯" and pedestrian_boxes:
|
||||
# 定义道路区域(假设在画面下半部分)
|
||||
road_y_threshold = frame.shape[0] * 0.5
|
||||
|
||||
for (x, y, w, h) in pedestrian_boxes:
|
||||
# 检查行人是否在道路区域内
|
||||
if y + h > road_y_threshold:
|
||||
jaywalking_boxes.append((x, y, w, h))
|
||||
# 用红色框标记闯红灯的行人
|
||||
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 3)
|
||||
frame = self.draw_chinese_text(frame, "闯红灯", (x, y - 10), font_size=16, color=(0, 0, 255))
|
||||
|
||||
# 显示闯红灯人数
|
||||
if jaywalking_boxes:
|
||||
# 红灯状态时显示红色
|
||||
text_color = (255, 0, 0) if traffic_light_state == "红灯" else (0, 0, 255)
|
||||
frame = self.draw_chinese_text(frame, f"闯红灯人数: {len(jaywalking_boxes)}", (10, 60), font_size=20, color=text_color)
|
||||
|
||||
return frame, jaywalking_boxes
|
||||
|
||||
def update_frame(self):
|
||||
"""更新视频帧"""
|
||||
if self.camera is None or not self.is_running or self.is_paused:
|
||||
return
|
||||
|
||||
ret, frame = self.camera.read()
|
||||
if not ret:
|
||||
# 如果是视频文件播放结束,重新开始播放
|
||||
if self.video_path is not None:
|
||||
self.camera.set(cv2.CAP_PROP_POS_FRAMES, 0)
|
||||
ret, frame = self.camera.read()
|
||||
if not ret:
|
||||
self.stop_video()
|
||||
QMessageBox.information(self, '提示', '视频播放完毕')
|
||||
return
|
||||
else:
|
||||
self.stop_video()
|
||||
QMessageBox.information(self, '提示', '摄像头断开')
|
||||
return
|
||||
|
||||
# 如果开启了红绿灯检测,先检测红绿灯再检测行人,避免行人框遮挡影响红绿灯识别
|
||||
traffic_light_state = "未知"
|
||||
jaywalking_boxes = []
|
||||
|
||||
if self.traffic_light_detection_enabled:
|
||||
frame, traffic_light_state = self.detect_traffic_light(frame)
|
||||
|
||||
# 检测行人
|
||||
frame, pedestrian_boxes = self.detect_pedestrians(frame)
|
||||
|
||||
# 显示检测到的行人数量
|
||||
frame = self.draw_chinese_text(frame, f"行人数量: {len(pedestrian_boxes)}", (10, 90), font_size=20, color=(255, 255, 255))
|
||||
|
||||
# 如果开启了红绿灯检测,再检测闯红灯行为
|
||||
if self.traffic_light_detection_enabled:
|
||||
frame, jaywalking_boxes = self.detect_jaywalking(frame, pedestrian_boxes, traffic_light_state)
|
||||
|
||||
# 将OpenCV的BGR格式转换为Qt的RGB格式
|
||||
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
h, w, ch = rgb_frame.shape
|
||||
bytes_per_line = ch * w
|
||||
q_image = QImage(rgb_frame.data, w, h, bytes_per_line, QImage.Format_RGB888)
|
||||
|
||||
# 调整图像大小以适应标签
|
||||
pixmap = QPixmap.fromImage(q_image)
|
||||
scaled_pixmap = pixmap.scaled(self.video_label.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation)
|
||||
|
||||
# 在标签上显示图像
|
||||
self.video_label.setPixmap(scaled_pixmap)
|
||||
|
||||
def toggle_pause(self):
|
||||
"""切换视频播放的暂停/继续状态"""
|
||||
self.is_paused = not self.is_paused
|
||||
if self.is_paused:
|
||||
self.pause_button.setText('继续')
|
||||
self.status_label.setText('已暂停')
|
||||
else:
|
||||
self.pause_button.setText('暂停')
|
||||
# 恢复状态显示
|
||||
if self.video_path:
|
||||
self.status_label.setText(f'视频播放中: {os.path.basename(self.video_path)} - 正在进行行人识别...')
|
||||
else:
|
||||
self.status_label.setText('摄像头运行中...')
|
||||
|
||||
def resizeEvent(self, event):
|
||||
# 窗口大小改变时,更新视频显示
|
||||
if hasattr(self.video_label, 'pixmap') and self.video_label.pixmap() is not None:
|
||||
scaled_pixmap = self.video_label.pixmap().scaled(
|
||||
self.video_label.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation)
|
||||
self.video_label.setPixmap(scaled_pixmap)
|
||||
super().resizeEvent(event)
|
||||
|
||||
def closeEvent(self, event):
|
||||
# 关闭窗口时释放资源
|
||||
self.stop_video()
|
||||
event.accept()
|
||||
|
||||
def draw_chinese_text(self, frame, text, position, font_size=20, color=(255, 255, 255)):
|
||||
"""在OpenCV图像上绘制中文文本
|
||||
|
||||
Args:
|
||||
frame: OpenCV图像 (BGR格式)
|
||||
text: 要绘制的中文文本
|
||||
position: 文本位置 (x, y)
|
||||
font_size: 字体大小
|
||||
color: 文本颜色 (RGB格式)
|
||||
|
||||
Returns:
|
||||
绘制了文本的图像
|
||||
"""
|
||||
# 将BGR转换为RGB
|
||||
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
|
||||
# 转换为PIL图像
|
||||
pil_img = Image.fromarray(rgb_frame)
|
||||
draw = ImageDraw.Draw(pil_img)
|
||||
|
||||
# 尝试加载多种中文字体,增加成功率
|
||||
font = None
|
||||
fonts_to_try = [
|
||||
"simhei.ttf", # 黑体
|
||||
"msyh.ttc", # 微软雅黑
|
||||
"simsun.ttc", # 宋体
|
||||
os.path.join(self.base_dir, "fonts", "simhei.ttf"), # 程序目录下的字体
|
||||
os.path.join(os.path.dirname(os.path.abspath(sys.executable)), "fonts", "simhei.ttf") # exe目录下的字体
|
||||
]
|
||||
|
||||
# 尝试在系统字体目录中查找
|
||||
if sys.platform == 'win32':
|
||||
fonts_to_try.extend([
|
||||
os.path.join("C:", "Windows", "Fonts", "simhei.ttf"),
|
||||
os.path.join("C:", "Windows", "Fonts", "msyh.ttc"),
|
||||
os.path.join("C:", "Windows", "Fonts", "simsun.ttc")
|
||||
])
|
||||
|
||||
# 尝试加载字体
|
||||
for font_path in fonts_to_try:
|
||||
try:
|
||||
font = ImageFont.truetype(font_path, font_size)
|
||||
break
|
||||
except:
|
||||
continue
|
||||
|
||||
# 如果所有尝试都失败,使用默认字体
|
||||
if font is None:
|
||||
font = ImageFont.load_default()
|
||||
|
||||
# 绘制文本
|
||||
draw.text(position, text, font=font, fill=color)
|
||||
|
||||
# 转换回OpenCV格式
|
||||
result_img = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
|
||||
|
||||
return result_img
|
||||
|
||||
if __name__ == '__main__':
|
||||
app = QApplication(sys.argv)
|
||||
# 设置中文字体
|
||||
font = app.font()
|
||||
font.setFamily("SimHei")
|
||||
app.setFont(font)
|
||||
|
||||
window = PedestrianRecognitionApp()
|
||||
window.show()
|
||||
sys.exit(app.exec_())
|
||||
5
requirements.txt
Normal file
5
requirements.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
opencv-python
|
||||
numpy
|
||||
PyQt5
|
||||
imutils
|
||||
Pillow
|
||||
Reference in New Issue
Block a user