`./main.py` ```python import sys import os from PySide6.QtWidgets import QApplication from app.view import Window from app.scheduler_manager import scheduler def resource_path(relative_path): """获取资源的绝对路径。用于PyInstaller/Nuitka打包后定位资源文件。""" if hasattr(sys, '_MEIPASS'): return os.path.join(sys._MEIPASS, relative_path) # type: ignore return os.path.join(os.path.abspath("."), relative_path) def main(): # 设置基础路径 base_dir = os.path.abspath(".") # 启动队列调度器 scheduler.start_queue_scheduler() # 创建Qt应用 app = QApplication(sys.argv) window = Window() window.show() window.setMicaEffectEnabled(True) # 启动事件循环 exit_code = app.exec() # 停止队列调度器 scheduler.stop_queue_scheduler() sys.exit(exit_code) if __name__ == "__main__": main() ``` `./app\__init__.py` ```python ``` `./app\config\__init__.py` ```python import toml try: config = toml.load("pyproject.toml") except: config = {} __all__ = ["config"] ``` `./app\scheduler_manager\__init__.py` ```python from queue_sqlite.scheduler import QueueScheduler from .students.add_student import add_student from .students.students_listen import students scheduler = QueueScheduler() __all__ = [ "scheduler", "add_student", "students" ] ``` `./app\scheduler_manager\students\add_student.py` ```python from queue_sqlite.mounter.task_mounter import TaskMounter from queue_sqlite.model import MessageItem import time @TaskMounter.task(meta={"task_name": "add_student"}) def add_student(message_item: MessageItem): # create a new instance of the AddStudentDilalog dialog # time.sleep(3) print("点击了添加按钮") return {"message": "学生添加成功"} @TaskMounter.task(meta={"task_name": "test"}) def test(message_item: MessageItem): # print(f"测试任务: {message_item}") return {"message": "测试成功"} ``` `./app\scheduler_manager\students\students_listen.py` ```python from queue_sqlite.mounter.listen_mounter import ListenMounter from ...signal import listen_signals import json @ListenMounter.listener() def students(str_students_list: str): print("students signal received") listen_signals.listening_students_signal.emit(json.loads(str_students_list)) ``` `./app\signal\global_signals.py` ```python from PySide6.QtCore import Signal, QObject class GlobalSignals(QObject): show_add_dialog_signal = Signal(dict) update_queue_state = Signal(dict, str) __all__ = ["GlobalSignals"] ``` `./app\signal\listen_signals.py` ```python from PySide6.QtCore import Signal, QObject class ListenSignals(QObject): listening_students_signal = Signal(list) __all__ = ["ListenSignals"] ``` `./app\signal\__init__.py` ```python from .global_signals import GlobalSignals from .listen_signals import ListenSignals global_signals = GlobalSignals() listen_signals = ListenSignals() __all__ = ["global_signals"] ``` `./app\style\button_style.py` ```python BUTTON_STYLE = """ QPushButton { border: none; padding: 5px 10px; font-family: 'Segoe UI', 'Microsoft YaHei'; font-size: 14px; color: white; border-radius: 5px; } QPushButton:hover { background-color: rgba(255, 255, 255, 0.1); } QPushButton:pressed{ background-color: rgba(255, 255, 255, 0.2); } """ ADD_BUTTON_STYLE = ( BUTTON_STYLE + """ QPushButton { background-color: #0d6efd; } QPushButton:hover { background-color: #0b5ed7; } QPushButton:pressed{ background-color: #0a58ca; } """ ) DELETE_BUTTON_STYLE = ( BUTTON_STYLE + """ QPushButton { background-color: #dc3545; } QPushButton:hover { background-color: #bb2d3b; } QPushButton:pressed{ background-color: #b02a37; """ ) BATCH_DELETE_BUTTON_STYLE = ( BUTTON_STYLE + """ QPushButton { background-color: #fd7e14; } QPushButton:hover { background-color: #e96b10; } QPushButton:pressed{ background-color: #dc680f; } """ ) UPDATE_BUTTON_STYLE = ( BUTTON_STYLE + """ QPushButton { background-color: #198754; } QPushButton:hover { background-color: #157347; } QPushButton:pressed{ background-color: #146c43; } """ ) IMPORT_BUTTON_STYLE = ( BUTTON_STYLE + """ QPushButton { background-color: #6f42c1; } QPushButton:hover { background-color: #5936a2; } QPushButton:pressed{ background-color: #4a2d8e; } """ ) EXPORT_BUTTON_STYLE = ( BUTTON_STYLE + """ QPushButton { background-color: #20c997; } QPushButton:hover { background-color: #1aa179; } QPushButton:pressed{ background-color: #198b6d; } """ ) ``` `./app\style\__init__.py` ```python ``` `./app\view\__init__.py` ```python # coding:utf-8 import os import sys from PySide6.QtCore import QUrl, QSize, QEventLoop, QTimer from PySide6.QtGui import QIcon, QDesktopServices from PySide6.QtWidgets import QApplication from qfluentwidgets import ( NavigationItemPosition, MessageBox, NavigationAvatarWidget, SplitFluentWindow, SplashScreen, ) from qfluentwidgets import FluentIcon as FIF from .students.student_interface import StudentInterface from .video.video_interface import VideoInterface from .camera.camera_interface import CameraInterface from .web_view.web_view_interface import WebViewInterface def resource_path(relative_path): """获取资源的绝对路径""" if hasattr(sys, "_MEIPASS"): return os.path.join(sys._MEIPASS, relative_path) # type: ignore return os.path.join(os.path.abspath("."), relative_path) class Window(SplitFluentWindow): def __init__(self): super().__init__() self.studemt_interface = StudentInterface() self.video_interface = VideoInterface() self.camera_interface = CameraInterface() self.web_view_interface = WebViewInterface(self) self.initNavigation() self.resize(900, 700) # 使用资源路径函数 self.setWindowIcon(QIcon(resource_path("./resources/images/logo.png"))) desktop = QApplication.screens()[0].availableGeometry() w, h = desktop.width(), desktop.height() self.move(w // 2 - self.width() // 2, h // 2 - self.height() // 2) self.splashScreen = SplashScreen(self.windowIcon(), self) self.splashScreen.setIconSize(QSize(102, 102)) self.show() self.createSubInterface() self.splashScreen.finish() def createSubInterface(self): loop = QEventLoop(self) QTimer.singleShot(2000, loop.quit) loop.exec() def initNavigation(self): self.addSubInterface(self.studemt_interface, FIF.RINGER, "Students") self.addSubInterface(self.video_interface, FIF.VIDEO, "Video") self.addSubInterface(self.camera_interface, FIF.CAMERA, "Camera") self.addSubInterface(self.web_view_interface, FIF.FIT_PAGE, "Web View") # 使用资源路径函数 self.navigationInterface.addWidget( routeKey="Info", widget=NavigationAvatarWidget( "Info", resource_path("./resource/images/logo.png") ), onClick=self.showMessageBox, position=NavigationItemPosition.BOTTOM, ) self.navigationInterface.addItem( routeKey="settingInterface", icon=FIF.SETTING, text="设置", position=NavigationItemPosition.BOTTOM, ) self.navigationInterface.setExpandWidth(280) def showMessageBox(self): w = MessageBox( "支持作者🥰", "个人开发不易,如果这个项目帮助到了您,可以考虑请作者喝一瓶快乐水🥤。您的支持就是作者开发和维护项目的动力🚀", self, ) w.yesButton.setText("来啦老弟") w.cancelButton.setText("下次一定") if w.exec(): QDesktopServices.openUrl(QUrl("https://afdian.net/a/zhiyiYo")) ``` `./app\view\camera\camera_interface.py` ```python import sys import cv2 from PySide6.QtCore import Qt, QTimer, Signal, QObject from PySide6.QtGui import QImage, QPixmap from PySide6.QtWidgets import QApplication, QLabel, QVBoxLayout, QWidget, QMessageBox from qfluentwidgets import PushButton class VideoStream(QObject): new_frame = Signal(QImage) error_occurred = Signal(str) def __init__(self, rtsp_url): super().__init__() self.stream_url = rtsp_url # 直接使用RTSP URL self.cap = None self.timer = QTimer() self.timer.timeout.connect(self.update_frame) self.retry_count = 0 self.max_retry = 5 self.open_stream() def open_stream(self): """尝试打开RTSP视频流""" if self.cap: self.cap.release() # 创建VideoCapture对象并设置RTSP参数 self.cap = cv2.VideoCapture(self.stream_url) # 关键设置:使用TCP传输(避免UDP丢包问题),设置超时和缓冲区 # self.cap.set(cv2.CAP_PROP_RTSP_TRANSPORT, 1) # 1 = TCP传输模式 self.cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000) # 5秒超时 self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 最小化缓冲区 # 打开RTSP流 self.cap.open(self.stream_url) if not self.cap.isOpened(): self.retry_count += 1 if self.retry_count <= self.max_retry: QTimer.singleShot(2000, self.open_stream) # 2秒后重试 else: self.error_occurred.emit(f"无法打开RTSP视频流: {self.stream_url}") else: self.retry_count = 0 self.timer.start(30) # 约30fps def update_frame(self): if not self.cap or not self.cap.isOpened(): return ret, frame = self.cap.read() if ret: # 转换为黑白图像 # gray_image = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # h, w = gray_image.shape # qt_image = QImage(gray_image.data, w, h, w, QImage.Format.Format_Grayscale8) # 转换为RGB图像 rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w, ch = rgb_image.shape qt_image = QImage(rgb_image.data, w, h, w * ch, QImage.Format.Format_RGB888) self.new_frame.emit(qt_image) else: # 读取失败时重新连接 self.open_stream() def stop(self): self.timer.stop() if self.cap: self.cap.release() class CameraInterface(QWidget): def __init__(self, rtsp_url="rtsp://127.0.0.1:8554/stream"): super().__init__() self.setObjectName("camera_interface") self.setWindowTitle("RTSP视频流接收器") self.resize(800, 600) # 创建UI self.label = QLabel("等待RTSP视频流...") self.label.setAlignment(Qt.AlignmentFlag.AlignCenter) self.label.setStyleSheet("font-size: 20px; color: #888;") # 创建按钮 self.listen_button = PushButton("开始监听RTSP") self.listen_button.clicked.connect(lambda: self.start_listen(rtsp_url)) layout = QVBoxLayout() layout.setContentsMargins(20, 50, 20, 20) layout.addWidget(self.label) layout.addWidget(self.listen_button) self.setLayout(layout) def start_listen(self, rtsp_url): self.listen_button.setEnabled(False) self.label.setText(f"正在连接: {rtsp_url}...") # 创建视频流 self.stream = VideoStream(rtsp_url) self.stream.new_frame.connect(self.display_frame) self.stream.error_occurred.connect(self.display_error) def display_frame(self, image): pixmap = QPixmap.fromImage(image) self.label.setPixmap( pixmap.scaled( self.label.width(), self.label.height(), Qt.AspectRatioMode.KeepAspectRatio, Qt.TransformationMode.SmoothTransformation, ) ) def display_error(self, message): QMessageBox.critical(self, "RTSP视频流错误", message) self.label.setText(f"错误: {message}") self.listen_button.setEnabled(True) # 出错时重新启用按钮 def closeEvent(self, event): if hasattr(self, "stream") and self.stream: self.stream.stop() super().closeEvent(event) if __name__ == "__main__": app = QApplication(sys.argv) # 使用命令行参数指定RTSP URL rtsp_url = "rtsp://127.0.0.1:8554/stream" # 默认URL if len(sys.argv) > 1: rtsp_url = sys.argv[1] player = CameraInterface(rtsp_url) player.show() sys.exit(app.exec()) ``` `./app\view\camera\__init__.py` ```python ``` `./app\view\students\student_dilalog.py` ```python from qfluentwidgets import MessageBoxBase, LineEdit, ComboBox, SubtitleLabel from PySide6.QtWidgets import QGridLayout, QLabel from PySide6.QtCore import Qt from PySide6.QtGui import QFont class BaseStudentDilalog(MessageBoxBase): def __init__(self, title, parent=None): super().__init__(parent) self.title = title self.setup_ui() def setup_ui(self): self.titleLabel = SubtitleLabel(self.title, self) self.viewLayout.addWidget(self.titleLabel) self.viewLayout.setAlignment(self.titleLabel, Qt.AlignmentFlag.AlignCenter) grid_layout = QGridLayout() self.viewLayout.addLayout(grid_layout) self.nameInput = LineEdit(self) self.numberInput = LineEdit(self) self.genderCombo = ComboBox(self) self.genderCombo.addItems(["男", "女"]) self.classCombo = ComboBox(self) self.chineseInput = LineEdit(self) self.mathInput = LineEdit(self) self.englishInput = LineEdit(self) fields = [ ("姓名: ", self.nameInput), ("学号: ", self.numberInput), ("性别: ", self.genderCombo), ("班级: ", self.classCombo), ("语文: ", self.chineseInput), ("数学: ", self.mathInput), ("英语: ", self.englishInput), ] for row, (label_text, widget) in enumerate(fields): label_widget = QLabel(label_text) font = QFont("微软雅黑") font.setBold(True) label_widget.setFont(font) grid_layout.addWidget(label_widget, row, 0) grid_layout.addWidget(widget, row, 1) grid_layout.setColumnStretch(1, 1) grid_layout.setAlignment(Qt.AlignmentFlag.AlignLeft) self.yesButton.setText("确定") self.cancelButton.setText("取消") for widget in [field[1] for field in fields]: widget.setMinimumWidth(200) self.nameInput.setFocus() self.yesButton.setDefault(False) self.yesButton.setAutoDefault(False) class AddStudentDilalog(BaseStudentDilalog): def __init__(self, parent=None): super().__init__("添加学生", parent) self.student_id = None self.yesButton.setText("添加") ``` `./app\view\students\student_interface.py` ```python import json import os import sys from PySide6.QtGui import QIcon from PySide6.QtCore import Slot from PySide6.QtWidgets import ( QWidget, QVBoxLayout, QHBoxLayout, QHeaderView, QCheckBox, QTableWidgetItem, ) from queue_sqlite.model import MessageItem from qfluentwidgets import ( CardWidget, PushButton, SearchLineEdit, TableWidget, setCustomStyleSheet, ) from .student_dilalog import AddStudentDilalog from ...style.button_style import ADD_BUTTON_STYLE, BATCH_DELETE_BUTTON_STYLE from ...signal import global_signals from ...signal import listen_signals from ...scheduler_manager import scheduler def resource_path(relative_path): """获取资源的绝对路径""" if hasattr(sys, "_MEIPASS"): return os.path.join(sys._MEIPASS, relative_path) # type: ignore return os.path.join(os.path.abspath("."), relative_path) class StudentInterface(QWidget): def __init__(self): super().__init__() self.setObjectName("student_interface") self.students = [] self.setup_ui() self.create_signal_slot() self.load_data() self.populate_table(self.students) def create_signal_slot(self): global_signals.show_add_dialog_signal.connect(self.show_add_student_dialog) listen_signals.listening_students_signal.connect(self.populate_table) def setup_ui(self): # 设置 title self.setWindowTitle("学生管理") layout = QVBoxLayout(self) layout.setContentsMargins(20, 50, 20, 20) # (左, 上, 右, 下) card_widget = CardWidget(self) buttons_layout = QHBoxLayout(card_widget) self.addButton = PushButton("新增", self) setCustomStyleSheet(self.addButton, ADD_BUTTON_STYLE, ADD_BUTTON_STYLE) self.addButton.clicked.connect(self.add_student) self.searchInput = SearchLineEdit(self) self.searchInput.setPlaceholderText("搜索学生姓名或学号...") self.searchInput.setFixedWidth(500) self.batchDeleteButton = PushButton("批量删除", self) setCustomStyleSheet( self.batchDeleteButton, BATCH_DELETE_BUTTON_STYLE, BATCH_DELETE_BUTTON_STYLE ) buttons_layout.addWidget(self.addButton) buttons_layout.addWidget(self.searchInput) buttons_layout.addStretch(2) buttons_layout.addWidget(self.batchDeleteButton) layout.addWidget(card_widget) self.table_widget = TableWidget(self) self.table_widget.setBorderRadius(8) # 设置边框圆角 self.table_widget.setBorderVisible(True) # 设置表格边框可见 table_labels = [ "", "学生ID", "姓名", "学号", "性别", "班级", "语文", "数学", "英语", "总分", "操作", ] self.table_widget.setColumnCount(len(table_labels)) self.table_widget.setHorizontalHeaderLabels(table_labels) self.table_widget.horizontalHeader().setSectionResizeMode( QHeaderView.ResizeMode.Stretch ) layout.addWidget(self.table_widget) self.setStyleSheet("StudentInterface {background: white}") self.setWindowIcon(QIcon(resource_path("./resources/images/logo.png"))) def load_data(self): self.students = [ { "student_id": 1, "student_name": "张三", "student_number": "2024010101", "gender": 1, "class_name": "1班", "chinese_score": 80, "math_score": 85, "english_score": 90, "total_score": 265, }, { "student_id": 1, "student_name": "张三", "student_number": "2024010101", "gender": 1, "class_name": "1班", "chinese_score": 80, "math_score": 85, "english_score": 90, "total_score": 265, }, { "student_id": 1, "student_name": "张三", "student_number": "2024010101", "gender": 1, "class_name": "1班", "chinese_score": 80, "math_score": 85, "english_score": 90, "total_score": 265, }, { "student_id": 1, "student_name": "张三", "student_number": "2024010101", "gender": 1, "class_name": "1班", "chinese_score": 80, "math_score": 85, "english_score": 90, "total_score": 265, }, { "student_id": 1, "student_name": "张三", "student_number": "2024010101", "gender": 1, "class_name": "1班", "chinese_score": 80, "math_score": 85, "english_score": 90, "total_score": 265, }, ] scheduler.update_listen_data("students", json.dumps(self.students)) @Slot(list) def populate_table(self, students: list): print(f"Received students: {students}") self.table_widget.setRowCount(len(students)) for row, student_info in enumerate(students): self.setup_table_row(row, student_info) def setup_table_row(self, row, student_info): checkbox = QCheckBox() checkbox.setStyleSheet("margin: 10px") self.table_widget.setCellWidget(row, 0, checkbox) for col, key in enumerate(self.students[0].keys()): value = student_info.get(key, "") if key == "gender": value = "男" if value == 1 else "女" if value == 2 else "未知" item = QTableWidgetItem(str(value)) self.table_widget.setItem(row, col + 1, item) @Slot(dict) def show_add_student_dialog(self, message_dict: dict): message = MessageItem.from_dict(message_dict) print(f"Received message: {message}") print("Creating and showing AddStudentDialog") self.addButton.setEnabled(True) add_student_dialog = AddStudentDilalog(self) if add_student_dialog.exec(): print("用户点击了添加按钮") else: print("用户点击了取消按钮") def add_student_callback(self, message: MessageItem): global_signals.show_add_dialog_signal.emit(message.to_dict()) def add_student(self): self.addButton.setEnabled(False) scheduler.send_message( MessageItem(content={}, destination="add_student"), self.add_student_callback, ) ``` `./app\view\students\__init__.py` ```python ``` `./app\view\video\video_interface.py` ```python from PySide6.QtCore import QUrl, Qt from PySide6.QtGui import QFont # 导入QFont from PySide6.QtWidgets import QWidget, QVBoxLayout, QLabel, QFileDialog, QHBoxLayout from qfluentwidgets import setTheme, Theme, FluentIcon, PushButton, PrimaryPushButton from qfluentwidgets.multimedia import VideoWidget class VideoInterface(QWidget): def __init__(self): super().__init__() from PySide6.QtMultimedia import QMediaPlayer self.player = QMediaPlayer() self.setObjectName("video_interface") self.setWindowTitle("Fluent Video Player") self.resize(900, 600) # 设置主题 # setTheme(Theme.DARK) # 创建主布局 self.main_layout = QVBoxLayout(self) self.main_layout.setContentsMargins(20, 50, 20, 20) self.main_layout.setSpacing(15) # 创建标题标签 - 修复字体设置 self.title_label = QLabel("Fluent Video Player") self.title_label.setAlignment(Qt.AlignmentFlag.AlignCenter) # 正确设置字体:使用QFont.Bold表示粗体 title_font = QFont() title_font.setPointSize(24) title_font.setBold(True) # 使用setBold方法设置粗体 self.title_label.setFont(title_font) self.main_layout.addWidget(self.title_label) # 创建视频播放器 self.video_widget = VideoWidget() self.video_widget.setMinimumHeight(400) self.main_layout.addWidget(self.video_widget, 1) # 创建控制面板 self.control_layout = QHBoxLayout() self.control_layout.setContentsMargins(10, 10, 10, 10) self.control_layout.setSpacing(15) self.main_layout.addLayout(self.control_layout) # 添加控制按钮 self.open_button = PushButton(FluentIcon.FOLDER_ADD, "打开视频") self.open_button.setFixedWidth(120) self.open_button.clicked.connect(self.open_video) self.control_layout.addWidget(self.open_button) # 添加状态标签 - 修复字体设置 self.status_label = QLabel("准备就绪") self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter) status_font = QFont() status_font.setPointSize(12) self.status_label.setFont(status_font) self.control_layout.addWidget(self.status_label) # 设置初始状态 self.video_loaded = False self.is_fullscreen = False # 设置样式 # self.setStyleSheet(""" # QWidget { # background-color: #2b2b2b; # color: #f0f0f0; # } # QLabel { # padding: 5px; # } # """) def open_video(self): """打开视频文件""" file_path, _ = QFileDialog.getOpenFileName( self, "选择视频文件", "", "视频文件 (*.mp4 *.avi *.mkv *.mov *.flv *.wmv);;所有文件 (*.*)", ) if file_path: # 设置视频URL self.video_widget.setVideo(QUrl.fromLocalFile(file_path)) self.status_label.setText(f"已加载视频: {file_path.split('/')[-1]}") self.video_loaded = True def play_video(self): """播放视频""" if self.video_loaded: self.video_widget.play() self.status_label.setText("正在播放...") def pause_video(self): """暂停视频""" if self.video_loaded: self.video_widget.pause() self.status_label.setText("已暂停") def stop_video(self): """停止视频""" if self.video_loaded: self.video_widget.stop() self.status_label.setText("已停止") def toggle_fullscreen(self): """切换全屏模式""" if self.is_fullscreen: self.showNormal() else: self.showFullScreen() self.is_fullscreen = True # if __name__ == "__main__": # app = QApplication(sys.argv) # # 设置应用程序样式 # from qfluentwidgets import setThemeColor # setThemeColor('#28a9e0') # player = VideoPlayer() # player.show() # sys.exit(app.exec()) ``` `./app\view\video\__init__.py` ```python ``` `./app\view\web_view\entrance.py` ```python import os import time import zipfile import json import shutil import base64 import tempfile from pathlib import Path from PySide6.QtCore import QObject, Slot, QUrl from PySide6.QtWebChannel import QWebChannel from PySide6.QtWebEngineCore import QWebEngineSettings from qframelesswindow.webengine import FramelessWebEngineView from queue_sqlite.model import MessageItem from ...scheduler_manager import scheduler from ...signal import global_signals class WebViewQueueSqliteOperations(QObject): """ 用于前端调用QueueSqlite的队列操作 """ def __init__(self, webview: FramelessWebEngineView, parent=None): super().__init__(parent) self.webview = webview self.page = self.webview.page() global_signals.update_queue_state.connect(self.update_queue_state) @Slot(str, str, result=str) def send_message(self, message: str, store_key: str): def callback(result: MessageItem): print(f"send_message callback: {result}") global_signals.update_queue_state.emit(result.to_dict(), store_key) scheduler.send_message(MessageItem.from_dict(json.loads(message)), callback) def update_queue_state(self, result: dict, store_key: str): json_str = json.dumps(result) print(f"result: {result}") print(f"json_str: {json_str}") js_code = f""" try {{ if (typeof window.queueStore.updateQueueData === 'function') {{ window.queueStore.updateQueueData('{store_key}', {json_str}); }} else {{ console.log('updateQueueData function not found'); }} }} catch (error) {{ console.error('Error in updateQueueData:', error); }} """ print(f"执行的JavaScript代码: {js_code}") self.page.runJavaScript(js_code) class WebviewBridge(QObject): """Webview桥接类,用于处理前端和Python后端之间的通信""" def __init__(self, webview, parent=None): super().__init__(parent) self.webview = webview # 启用本地文件访问 self.webview.settings().setAttribute( QWebEngineSettings.WebAttribute.LocalContentCanAccessFileUrls, True ) self.webview.settings().setAttribute( QWebEngineSettings.WebAttribute.LocalContentCanAccessRemoteUrls, True ) self.webview.settings().setAttribute( QWebEngineSettings.WebAttribute.AllowRunningInsecureContent, True ) self.webview.settings().setAttribute( QWebEngineSettings.WebAttribute.AllowGeolocationOnInsecureOrigins, True ) # 设置Web安全策略 self.webview.page().settings().setAttribute( QWebEngineSettings.WebAttribute.LocalContentCanAccessFileUrls, True ) self.webview.page().settings().setAttribute( QWebEngineSettings.WebAttribute.LocalContentCanAccessRemoteUrls, True ) self.webview.page().settings().setAttribute( QWebEngineSettings.WebAttribute.AllowRunningInsecureContent, True ) # 创建插件目录 self.plugins_dir = Path("plugins") os.makedirs(self.plugins_dir, exist_ok=True) # 插件信息文件 self.plugins_info_file = self.plugins_dir / "plugins.json" if not self.plugins_info_file.exists(): with open(self.plugins_info_file, "w") as f: json.dump([], f) @Slot(str, str, str, result=str) def upload_plugin(self, name, description, file_content): """处理插件上传请求""" try: # 将base64编码的文件内容解码并保存为临时文件 file_data = base64.b64decode(file_content) # 创建临时文件来保存上传的ZIP文件 with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp_file: tmp_file.write(file_data) tmp_file_path = tmp_file.name # 判断插件是否已存在 if os.path.exists(self.plugins_dir / name): os.unlink(tmp_file_path) return json.dumps({"success": False, "error": "插件名称已存在"}) # 解压插件 success, msg = self.extract_plugin(tmp_file_path, name) if not success: # 清理临时文件 os.unlink(tmp_file_path) return json.dumps({"success": False, "error": msg}) # 读取 plugin.config.json 文件 plugin_config_file = msg.replace("index.html", "plugin.config.json") if not os.path.exists(plugin_config_file): os.unlink(tmp_file_path) return json.dumps({"success": False, "error": "插件配置文件不存在"}) with open(plugin_config_file, "r") as f: plugin_config = json.load(f) plugin_conofig_keys = [ "pluginName", "pluginId", "pluginDescription", "pluginVersion", "pluginAuthor", ] for key in plugin_conofig_keys: if key not in plugin_config: os.unlink(tmp_file_path) return json.dumps( {"success": False, "error": "插件配置文件缺少必要字段"} ) plugin_info = { "id": plugin_config["pluginId"], "name": name, "pluginName": plugin_config["pluginName"], "description": description, "version": plugin_config["pluginVersion"], "author": plugin_config["pluginAuthor"], "uploadDate": time.strftime("%Y-%m-%d", time.localtime()), "path": str(self.plugins_dir / name / plugin_config["pluginVersion"]), } # 创建版本路径并移动插件文件 version_path = self.plugins_dir / name / plugin_info["version"] version_path.mkdir(exist_ok=True) for file in os.listdir(os.path.dirname(msg)): shutil.move(os.path.join(os.path.dirname(msg), file), version_path) # 读取现有插件信息 if os.path.exists(self.plugins_info_file): with open(self.plugins_info_file, "r") as f: plugins = json.load(f) else: plugins = [] # 判断pluginId是否存在 if plugin_info["id"] in [p["id"] for p in plugins]: os.unlink(tmp_file_path) return json.dumps({"success": False, "error": "插件ID已存在"}) # 添加新插件 plugins.append(plugin_info) # 保存插件信息 with open(self.plugins_info_file, "w") as f: json.dump(plugins, f) # 清理临时文件 os.unlink(tmp_file_path) return json.dumps({"success": True}) except Exception as e: # 确保任何异常都返回有效的JSON return json.dumps({"success": False, "error": str(e)}) @Slot(str, result=str) def update_plugin(self, plugin_id, name, description, file_content): """处理插件更新请求""" try: file_data = base64.b64decode(file_content) with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp_file: tmp_file.write(file_data) tmp_filepath = tmp_file.name success, msg = self.extract_plugin(tmp_filepath, name) if not success: os.unlink(tmp_filepath) return json.dumps({"success": False, "error": msg}) plugin_config_file = msg.replace("index.html", "plugin.config.json") if not os.path.exists(plugin_config_file): os.unlink(tmp_filepath) return json.dumps({"success": False, "error": "插件配置文件不存在"}) with open(plugin_config_file, "r") as f: plugin_config = json.load(f) plugin_config_keys = [ "pluginName", "pluginId", "pluginDescription", "pluginVersion", "pluginAuthor", ] for key in plugin_config_keys: if key not in plugin_config: os.unlink(tmp_filepath) return json.dumps( {"success": False, "error": "插件配置文件缺少必要字段"} ) # 判断插件是否存在 if not os.path.exists(self.plugins_dir / name): os.unlink(tmp_filepath) return json.dumps({"success": False, "error": "插件不存在"}) # 判断版本是否已存在 version_path = self.plugins_dir / name / plugin_config["pluginVersion"] if version_path.exists(): os.unlink(tmp_filepath) return json.dumps({"success": False, "error": "版本已存在"}) # 创建版本路径并移动插件文件 version_path.mkdir(exist_ok=True) for file in os.listdir(os.path.dirname(msg)): shutil.move(os.path.join(os.path.dirname(msg), file), version_path) # 更新插件信息 with open(self.plugins_info_file, "r") as f: plugins = json.load(f) plugin_info = next((p for p in plugins if p["id"] == plugin_id), None) if plugin_info: plugin_info["name"] = name plugin_info["description"] = description plugin_info["version"] = [ p["version"] for p in plugins if p["id"] == plugin_id ][0] plugin_info["uploadDate"] = time.strftime("%Y-%m-%d", time.localtime()) plugin_info["path"] = str( self.plugins_dir / name / plugin_config["pluginVersion"] ) # 保存插件信息 with open(self.plugins_info_file, "w") as f: json.dump(plugins, f) # 清理临时文件 os.unlink(tmp_filepath) return json.dumps({"success": True}) except Exception as e: # 确保任何异常都返回有效的JSON return json.dumps({"success": False, "error": str(e)}) # 解压插件 def extract_plugin(self, plugin_file, plugin_name): """解压插件""" try: with zipfile.ZipFile(plugin_file, "r") as zf: # 获取插件名称 plugin_dir = self.plugins_dir / plugin_name # 如果插件目录已存在,删除 if plugin_dir.exists(): shutil.rmtree(plugin_dir) # 创建插件目录并解压 plugin_dir.mkdir(exist_ok=True) zf.extractall(plugin_dir) # 检测是否有 index.html 文件 index_file = plugin_dir / "index.html" if not index_file.exists(): # 查找任何HTML文件作为入口 html_files = list(plugin_dir.glob("*.html")) if not html_files: return False, "插件中未找到HTML文件" index_file = html_files[0] return True, str(index_file.absolute()) except Exception as e: return False, str(e) @Slot(result=str) def list_plugins(self): """获取插件列表""" try: if os.path.exists(self.plugins_info_file): with open(self.plugins_info_file, "r") as f: plugins = json.load(f) return json.dumps({"success": True, "plugins": plugins}) else: return json.dumps({"success": True, "plugins": []}) except Exception as e: # 确保任何异常都返回有效的JSON return json.dumps({"success": False, "error": str(e)}) @Slot(str, result=str) def launch_plugin(self, plugin_id): """启动插件""" try: if os.path.exists(self.plugins_info_file): with open(self.plugins_info_file, "r") as f: plugins = json.load(f) # 查找插件 plugin = next((p for p in plugins if p["id"] == plugin_id), None) print(f"Launching plugin: {plugin}") if plugin: # 构建插件URL plugin_path = Path(plugin["path"]) # 查找HTML文件作为入口 html_files = list(plugin_path.glob("*.html")) if html_files: index_file = html_files[0] url = QUrl.fromLocalFile(str(index_file.absolute())) print(f"Launching plugin: {url.toString()}") print(f"Plugin path: {plugin_path}") self.webview.load(url) return json.dumps({"success": True, "url": url.toString()}) else: return json.dumps( {"success": False, "error": "插件入口文件未找到"} ) else: return json.dumps({"success": False, "error": "插件未找到"}) else: return json.dumps({"success": False, "error": "插件信息文件未找到"}) except Exception as e: # 确保任何异常都返回有效的JSON return json.dumps({"success": False, "error": str(e)}) @Slot(str, result=str) def save_settings(self, settings_json): """保存设置""" try: settings = json.loads(settings_json) # 在实际应用中,这里会保存设置到配置文件 print(f"保存设置: {settings}") return json.dumps({"success": True}) except Exception as e: # 确保任何异常都返回有效的JSON return json.dumps({"success": False, "error": str(e)}) class WebviewController(QObject): """Webview控制器类,负责管理Webview的逻辑""" def __init__(self, web_engine_page, web_webview, parent=None): super().__init__(parent) self.page = web_engine_page self.bridge = WebviewBridge(web_webview) self.queue_sqlite_operations = WebViewQueueSqliteOperations(web_webview) self.channel = QWebChannel() self.channel.registerObject("entrance", self.bridge) self.channel.registerObject("queueSqlite", self.queue_sqlite_operations) self.page.setWebChannel(self.channel) ``` `./app\view\web_view\web_view_interface.py` ```python # coding:utf-8 import sys import os from pathlib import Path from PySide6.QtWidgets import ( QFrame, QVBoxLayout, QMenu, QApplication, QWidget, QPushButton, QHBoxLayout, ) from PySide6.QtCore import QPoint, Qt, QUrl from qframelesswindow.webengine import FramelessWebEngineView from .entrance import WebviewController from ...config import config class DevToolsWindow(FramelessWebEngineView): def __init__(self, parent=None): super().__init__(parent=parent) self.setWindowTitle("开发者工具") self.resize(600, 600) class DevToolsView(QWidget): def __init__(self, parent=None): super().__init__(parent=parent) self.setWindowTitle("开发者工具") self.dev_tools_window = DevToolsWindow(self) self.resize(600, 600) # 设置窗口大小不可变 self.setFixedSize(self.size()) class WebViewInterface(QFrame): def __init__(self, parent=None): super().__init__(parent=parent) self.setObjectName("webViewInterface") self.dev_tools_view = None # 添加连接状态标志 self.is_local_connection = config.get("project", {}).get("is-production", True) # 插件目录 self.plugins_dir = Path(os.getcwd()) / "plugins" self.plugins_dir.mkdir(exist_ok=True) self.webView = FramelessWebEngineView(self) self.html_path = ( os.getcwd().replace("\\", "/") + "/resources/webview/entrance/dist/index.html" ) self.remote_url = config.get("project", {}).get( "remote-url", "http://localhost:8080/" ) # 创建WebviewController实例 self.page = self.webView.page() self.controller = WebviewController(self.page, self.webView) # 加载初始页面 self.load_current_page() # 创建导航按钮 self.back_button = QPushButton("⬅️", self) self.back_button.clicked.connect(self.webView.back) self.back_button.setFixedSize(40, 30) self.back_button.setEnabled(False) self.forward_button = QPushButton("➡️", self) self.forward_button.clicked.connect(self.webView.forward) self.forward_button.setFixedSize(40, 30) self.forward_button.setEnabled(False) self.reload_button = QPushButton("🔃", self) self.reload_button.clicked.connect(self.webView.reload) self.reload_button.setFixedSize(40, 30) self.home_button = QPushButton("🏠", self) self.home_button.clicked.connect(self.go_home) self.home_button.setFixedSize(40, 30) # 连接WebView的信号以更新按钮状态 self.webView.urlChanged.connect(self.update_navigation_buttons) # 创建导航按钮布局 self.nav_button_layout = QHBoxLayout() self.nav_button_layout.addWidget(self.back_button) self.nav_button_layout.addWidget(self.forward_button) self.nav_button_layout.addWidget(self.reload_button) self.nav_button_layout.addWidget(self.home_button) self.nav_button_layout.addStretch() self.nav_button_layout.setSpacing(5) self.vBoxLayout = QVBoxLayout(self) self.vBoxLayout.setContentsMargins(10, 48, 10, 10) self.vBoxLayout.addLayout(self.nav_button_layout) self.vBoxLayout.addWidget(self.webView) self.webView.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu) self.webView.loadFinished.connect(self.on_load_finished) self.webView.customContextMenuRequested.connect(self.show_context_menu) # 应用样式 self.apply_styles() def apply_styles(self): """应用样式到按钮""" button_style = """ QPushButton { border: none; padding: 5px 10px; font-family: 'Segoe UI', 'Microsoft YaHei'; font-size: 14px; color: white; border-radius: 5px; background-color: #2c68c6; } QPushButton:hover { background-color: #3a75d8; } QPushButton:pressed { background-color: #1a56b0; } QPushButton:disabled { background-color: #6c757d; color: #adb5bd; } """ nav_button_style = ( button_style + """ QPushButton { background-color: #6c757d; } QPushButton:hover { background-color: #5a6268; } QPushButton:pressed { background-color: #495057; } """ ) toggle_button_style = ( button_style + """ QPushButton { background-color: #0d6efd; } QPushButton:hover { background-color: #0b5ed7; } QPushButton:pressed { background-color: #0a58ca; } """ ) self.back_button.setStyleSheet(nav_button_style) self.forward_button.setStyleSheet(nav_button_style) self.reload_button.setStyleSheet(nav_button_style) self.home_button.setStyleSheet(nav_button_style) def go_home(self): """返回首页""" self.webView.load(QUrl(self.html_path)) def update_navigation_buttons(self): """更新导航按钮状态""" self.back_button.setEnabled(self.webView.history().canGoBack()) self.forward_button.setEnabled(self.webView.history().canGoForward()) def load_current_page(self): """加载当前连接对应的页面""" if self.is_local_connection: self.webView.load(QUrl(self.html_path)) else: self.webView.load(QUrl(self.remote_url)) def on_load_finished(self, success): """页面加载完成后初始化开发者工具""" if success: # 预初始化开发者工具 self.init_dev_tools() # 更新导航按钮状态 self.update_navigation_buttons() def init_dev_tools(self): """初始化开发者工具""" if not self.dev_tools_view: self.dev_tools_view = DevToolsView() self.page.setDevToolsPage(self.dev_tools_view.dev_tools_window.page()) def show_context_menu(self, pos: QPoint): """显示自定义右键菜单""" # 创建上下文菜单 menu = QMenu(self) # 添加标准浏览器动作 back_action = menu.addAction("后退") back_action.triggered.connect(self.webView.back) forward_action = menu.addAction("前进") forward_action.triggered.connect(self.webView.forward) reload_action = menu.addAction("重新加载") reload_action.triggered.connect(self.webView.reload) menu.addSeparator() # 添加开发者工具相关动作 dev_tools_action = menu.addAction("打开开发者工具") dev_tools_action.triggered.connect(self.toggle_dev_tools) menu.addSeparator() menu.addSeparator() # 在鼠标位置显示菜单 menu.exec(self.webView.mapToGlobal(pos)) def toggle_dev_tools(self): """切换开发者工具窗口显示状态""" if not self.dev_tools_view: self.init_dev_tools() if self.dev_tools_view is None: return if self.dev_tools_view.dev_tools_window.isVisible(): self.dev_tools_view.hide() self.dev_tools_view.dev_tools_window.hide() else: self.dev_tools_view.show() self.dev_tools_view.dev_tools_window.show() self.dev_tools_view.dev_tools_window.raise_() # 将窗口置于最前 def closeEvent(self, event): """关闭时清理资源""" # 停止controller中的定时器 # self.controller.bridge.timer.stop() if self.dev_tools_view: self.dev_tools_view.close() super().closeEvent(event) if __name__ == "__main__": app = QApplication(sys.argv) window = WebViewInterface() window.resize(1000, 700) window.show() sys.exit(app.exec()) ``` `./app\view\web_view\__init__.py` ```python ``` `./resources\webview\entrance\babel.config.js` ```python module.exports = { presets: [ '@vue/cli-plugin-babel/preset' ] } ``` `./resources\webview\entrance\vue.config.js` ```python const { defineConfig } = require('@vue/cli-service') module.exports = defineConfig({ transpileDependencies: true, publicPath: './' }) ``` `./resources\webview\entrance\src\App.vue` ```python ``` `./resources\webview\entrance\src\main.js` ```python import { createApp } from 'vue' import App from './App.vue' createApp(App).mount('#app') ``` `./resources\webview\entrance\src\components\HelloWorld.vue` ```python ``` `./resources\webview\new_plugin\babel.config.js` ```python module.exports = { presets: [ '@vue/cli-plugin-babel/preset' ] } ``` `./resources\webview\new_plugin\vue.config.js` ```python const { defineConfig } = require('@vue/cli-service') module.exports = defineConfig({ transpileDependencies: true, publicPath: './' }) ``` `./resources\webview\new_plugin\src\App.vue` ```python ``` `./resources\webview\new_plugin\src\main.js` ```python import { createApp } from 'vue' import { createPinia } from 'pinia' import App from './App.vue' import { useQueueStore } from './stores/queue' const app = createApp(App) const pinia = createPinia() app.use(pinia) app.mount('#app') window.queueStore = useQueueStore() ``` `./resources\webview\new_plugin\src\components\HelloWorld.vue` ```python ``` `./resources\webview\new_plugin\src\stores\queue.js` ```python import { defineStore } from 'pinia' export const useQueueStore = defineStore('queueSqlite', { state: () => ({ // 存储队列相关状态 queueData: {}, // 其他状态... }), actions: { updateQueueData(key, value) { this.queueData[key] = value }, // 其他相关操作... } }) ```