diff --git a/.python-version b/.python-version index 2c07333..9b1d4ba 100644 --- a/.python-version +++ b/.python-version @@ -1 +1 @@ -3.11 +3.13t diff --git a/pyproject.toml b/pyproject.toml index 46433b4..ab75933 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,8 +22,9 @@ requires-python = ">=3.11" version = "0.2.0" [project.optional-dependencies] -pyside6 = ["PySide6"] -pyqt6 = ["PyQt6"] +pyside6 = ["qtpy", "PySide6"] +pyqt6 = ["qtpy", "PyQt6"] +pyqt5 = ["qtpy", "PyQt5"] [[tool.uv.index]] default = true diff --git a/src/queue_sqlite/scheduler/__init__.py b/src/queue_sqlite/scheduler/__init__.py index 1a6a4e3..d5d4bd8 100644 --- a/src/queue_sqlite/scheduler/__init__.py +++ b/src/queue_sqlite/scheduler/__init__.py @@ -13,27 +13,21 @@ from ._async import AsyncQueueScheduler from .base import BaseScheduler from ..model import MessageItem, SchedulerConfig from typing import Callable -import multiprocessing +import logging SCHEDULER_TYPES = {"standard": StandardQueueScheduler, "async": AsyncQueueScheduler} try: - from .pyside6 import PySide6QueueScheduler + from .qt import QtQueueScheduler print(SCHEDULER_TYPES) - if PySide6QueueScheduler.is_available(): - SCHEDULER_TYPES["pyside6"] = PySide6QueueScheduler + if QtQueueScheduler.is_available(): + SCHEDULER_TYPES["qt"] = QtQueueScheduler except ImportError: - pass - -try: - from .pyqt6 import PyQt6QueueScheduler - - if PyQt6QueueScheduler.is_available(): - SCHEDULER_TYPES["pyqt6"] = PyQt6QueueScheduler -except ImportError: - pass + logging.warning( + "Qt is not available, if you want to use it, please install PyQt5/PyQt6/PySide6" + ) class QueueScheduler(BaseScheduler): diff --git a/src/queue_sqlite/scheduler/pyside6/__init__.py b/src/queue_sqlite/scheduler/pyside6/__init__.py deleted file mode 100644 index d601686..0000000 --- a/src/queue_sqlite/scheduler/pyside6/__init__.py +++ /dev/null @@ -1,157 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- -""" -@File : __init__.py -@Time : 2025-10-26 10:26:16 -@Author : chakcy -@Email : 947105045@qq.com -@description : pyside6调度器 - 独立实现 -""" - -import logging -import os -from ..base import BaseScheduler -from ...model import MessageItem, SchedulerConfig -from queue_sqlite_core import ShardedQueueOperation -from ...queue_operation.listen_operation import ListenOperation -from .receive_scheduler import PySide6ReceiveScheduler -from .listen_scheduler import PySide6ListenScheduler -from .task_scheduler import PySide6TaskScheduler -from ..cleanup_scheduler import CleanupScheduler as PySide6CleanupScheduler -from PySide6.QtCore import QThreadPool -from PySide6.QtWidgets import QApplication -from typing import Callable, Dict, Any, List - - -class PySide6QueueScheduler(BaseScheduler): - """完全独立的 PySide6 队列调度器 - 修复版""" - - def __init__( - self, - config: SchedulerConfig = SchedulerConfig(), - ): - # 确保有 QApplication 实例(对于 GUI 应用) - self._ensure_qapplication() - - # 初始化队列操作 - self.queue_operation = ShardedQueueOperation( - config.shard_num, config.queue_name - ) - - # 初始化监听操作 - db_dir = f"cache/{config.queue_name}" - os.makedirs(db_dir, exist_ok=True) - self.listen_operation = ListenOperation(f"{db_dir}/listen.db") - - # 确保监听表被创建 - self._ensure_listen_operation_tables() - - # 初始化各个独立的调度器组件 - self.receive_scheduler = PySide6ReceiveScheduler( - self.queue_operation, config.receive_thread_num - ) - self.task_scheduler = PySide6TaskScheduler( - self.queue_operation, config.task_thread_num - ) - self.listen_scheduler = PySide6ListenScheduler(self.listen_operation) - self.cleanup_scheduler = PySide6CleanupScheduler(self.queue_operation) - - @staticmethod - def is_available() -> bool: - try: - import PySide6 - - # 检查是否在主线程(GUI 调度器通常要求) - return True - except ImportError: - return False # 依赖库未安装时不可用 - - def _ensure_qapplication(self): - """确保 QApplication 实例存在""" - try: - if not QApplication.instance(): - # 对于非GUI应用,创建无窗口的QApplication - import sys - - self.app = QApplication(sys.argv if hasattr(sys, "argv") else []) - logging.info("已创建 QApplication 实例") - except Exception as e: - logging.warning(f"创建 QApplication 失败: {str(e)}") - - def _ensure_listen_operation_tables(self): - """确保监听操作的表被正确创建""" - try: - # 强制重新创建表 - self.listen_operation.create_table() - - # 额外检查 change_log 表 - conn = self.listen_operation._get_connection() - cursor = conn.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='change_log'" - ) - if cursor.fetchone() is None: - logging.warning("change_log 表不存在,尝试手动创建") - conn.execute( - """ - CREATE TABLE change_log ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - table_name TEXT, - row_id INTEGER, - column_name TEXT, - old_value TEXT, - new_value TEXT, - is_delete integer DEFAULT 0, - timestamp DATETIME DEFAULT CURRENT_TIMESTAMP - ) - """ - ) - conn.commit() - - except Exception as e: - logging.error(f"确保监听表存在失败: {str(e)}") - - def send_message(self, message: MessageItem, callback: Callable): - """发送消息到队列""" - self.receive_scheduler.send_message(message, callback) - - def update_listen_data(self, key: str, value: str): - """更新监听数据""" - self.listen_operation.update_listen_data(key, value) - - def get_listen_datas(self) -> List: - """获取所有监听数据""" - return self.listen_operation.get_values() - - def get_listen_data(self, key: str): - """获取单个监听数据""" - return self.listen_operation.get_value(key) - - def start(self): - """启动所有调度器组件""" - self.receive_scheduler.start() - self.task_scheduler.start() - self.cleanup_scheduler.start_cleanup() - self.listen_scheduler.start() - logging.info("PySide6 队列调度器已完全启动") - - def stop(self): - """停止所有调度器组件""" - self.listen_scheduler.stop() - self.cleanup_scheduler.stop_cleanup() - self.task_scheduler.stop() - self.receive_scheduler.stop() - logging.info("PySide6 队列调度器已完全停止") - - def get_queue_info(self) -> Dict[str, Any]: - """获取队列信息""" - try: - return { - "queue_length": self.queue_operation.get_queue_length(), - "shard_num": self.queue_operation.shard_num, - "db_dir": self.queue_operation.db_dir, - "active_threads": QThreadPool.globalInstance().activeThreadCount(), - "max_threads": QThreadPool.globalInstance().maxThreadCount(), - } - except Exception as e: - logging.error(f"获取队列信息失败: {str(e)}") - return {} diff --git a/src/queue_sqlite/scheduler/pyside6/callback_task.py b/src/queue_sqlite/scheduler/pyside6/callback_task.py deleted file mode 100644 index 8a46489..0000000 --- a/src/queue_sqlite/scheduler/pyside6/callback_task.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- -""" -@File : callback_task.py -@Time : 2025-10-28 11:40:02 -@Author : chakcy -@Email : 947105045@qq.com -@description : 回调任务 -""" -import asyncio -import logging -from queue_sqlite_core import ShardedQueueOperation -from ...model import MessageItem -from typing import Callable -from PySide6.QtCore import QRunnable - - -class PySide6CallbackTask(QRunnable): - """PySide6 回调任务""" - - def __init__( - self, - callback: Callable, - message: MessageItem, - queue_operation: ShardedQueueOperation, - ): - super().__init__() - self.callback = callback - self.message = message - self.queue_operation = queue_operation - self.setAutoDelete(True) - - def run(self): - """执行回调函数""" - try: - # 检查是否是协程函数 - if asyncio.iscoroutinefunction(self.callback): - # 对于异步回调,在当前线程中运行事件循环 - asyncio.run(self._run_async_callback()) - else: - self.callback(self.message) - except Exception as e: - logging.error(f"回调执行错误: {str(e)}") - finally: - # 删除消息 - try: - self.queue_operation.delete_message(self.message.id) - except Exception as e: - logging.error(f"删除消息失败 {self.message.id}: {str(e)}") - - async def _run_async_callback(self): - """运行异步回调""" - try: - await self.callback(self.message) - except Exception as e: - logging.error(f"异步回调执行错误: {str(e)}") diff --git a/src/queue_sqlite/scheduler/pyside6/listen_scheduler.py b/src/queue_sqlite/scheduler/pyside6/listen_scheduler.py deleted file mode 100644 index 2d4c77a..0000000 --- a/src/queue_sqlite/scheduler/pyside6/listen_scheduler.py +++ /dev/null @@ -1,114 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- -""" -@File : listen_scheduler.py -@Time : 2025-10-28 14:22:38 -@Author : chakcy -@Email : 947105045@qq.com -@description : 监听调度器 -""" -import logging -from .listen_task import PySide6ListenTask -from ...queue_operation.listen_operation import ListenOperation -from PySide6.QtCore import QThreadPool -import threading -import time - - -class PySide6ListenScheduler: - """PySide6 监听调度器 - 修复版""" - - def __init__(self, listen_operation: ListenOperation): - self.listen_operation = listen_operation - self.is_running = False - self.listen_thread = None - self.thread_pool = QThreadPool.globalInstance() - - # 确保监听表存在 - self._ensure_listen_tables() - - def _ensure_listen_tables(self): - """确保监听相关的表存在""" - try: - # 检查表是否存在,如果不存在则创建 - conn = self.listen_operation._get_connection() - - # 检查 change_log 表是否存在 - cursor = conn.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='change_log'" - ) - if cursor.fetchone() is None: - logging.info("创建缺失的 change_log 表") - conn.execute( - """ - CREATE TABLE change_log ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - table_name TEXT, - row_id INTEGER, - column_name TEXT, - old_value TEXT, - new_value TEXT, - is_delete integer DEFAULT 0, - timestamp DATETIME DEFAULT CURRENT_TIMESTAMP - ) - """ - ) - conn.commit() - - except Exception as e: - logging.error(f"确保监听表存在失败: {str(e)}") - - def _listen_loop(self): - """监听数据变化循环""" - while self.is_running: - try: - status, change_data_items = self.listen_operation.listen_data() - if status: - for data in change_data_items: - try: - # 修正字段索引 - if len(data) >= 8: - delete_id = data[0] # id - table_name = data[1] # table_name - row_id = data[2] # row_id - column_name = data[3] # column_name - old_value = data[4] # old_value - new_value = data[5] # new_value - - # 使用线程池执行监听任务 - task = PySide6ListenTask( - column_name, - new_value, - int(delete_id), - self.listen_operation, - ) - self.thread_pool.start(task) - - except Exception as e: - logging.error(f"处理监听数据失败: {str(e)}") - else: - time.sleep(0.05) - - except Exception as e: - logging.error(f"监听循环错误: {str(e)}") - time.sleep(0.1) - - def start(self): - """启动监听调度器""" - if self.is_running: - return - - self.is_running = True - self.listen_thread = threading.Thread(target=self._listen_loop, daemon=True) - self.listen_thread.start() - logging.info("PySide6 监听调度器已启动") - - def stop(self): - """停止监听调度器""" - if not self.is_running: - return - - self.is_running = False - if self.listen_thread and self.listen_thread.is_alive(): - self.listen_thread.join(timeout=3.0) - logging.info("PySide6 监听调度器已停止") diff --git a/src/queue_sqlite/scheduler/pyside6/listen_task.py b/src/queue_sqlite/scheduler/pyside6/listen_task.py deleted file mode 100644 index fe27fc9..0000000 --- a/src/queue_sqlite/scheduler/pyside6/listen_task.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- -""" -@File : listen_task.py -@Time : 2025-10-28 14:30:41 -@Author : chakcy -@Email : 947105045@qq.com -@description : 监听任务 -""" - -from queue_sqlite.queue_operation.listen_operation import ListenOperation -from ...mounter.listen_mounter import ListenMounter -import logging -from PySide6.QtCore import QRunnable -import asyncio -from typing import Callable - - -class PySide6ListenTask(QRunnable): - """PySide6 监听任务 - 修复版""" - - def __init__( - self, key: str, value: str, delete_id: int, listen_operation: ListenOperation - ): - super().__init__() - self.key = key - self.value = value - self.delete_id = delete_id - self.listen_operation = listen_operation - self.setAutoDelete(True) - - def run(self): - """执行监听回调""" - try: - listen_function = ListenMounter.get_Listener_function(self.key) - if listen_function: - if asyncio.iscoroutinefunction(listen_function): - # 异步监听函数 - asyncio.run(self._run_async_listener(listen_function)) - else: - # 同步监听函数 - listen_function(self.value) - except Exception as e: - logging.error(f"监听函数执行错误 {self.key}: {str(e)}") - finally: - # 删除变更日志 - try: - self.listen_operation.delete_change_log(self.delete_id) - except Exception as e: - logging.error(f"删除变更日志失败 {self.delete_id}: {str(e)}") - - async def _run_async_listener(self, listen_function: Callable): - """执行异步监听函数""" - try: - await listen_function(self.value) - except Exception as e: - logging.error(f"异步监听函数执行错误 {self.key}: {str(e)}") diff --git a/src/queue_sqlite/scheduler/pyside6/receive_scheduler.py b/src/queue_sqlite/scheduler/pyside6/receive_scheduler.py deleted file mode 100644 index 3359723..0000000 --- a/src/queue_sqlite/scheduler/pyside6/receive_scheduler.py +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- -""" -@File : receive_scheduler.py -@Time : 2025-10-28 14:31:06 -@Author : chakcy -@Email : 947105045@qq.com -@description : 接收调度器 -""" - - -from typing import Callable, Optional -from queue_sqlite_core import ShardedQueueOperation -from PySide6.QtCore import QThreadPool -import threading -from ...model import MessageItem -import time -import logging -from .callback_task import PySide6CallbackTask - - -class PySide6ReceiveScheduler: - """PySide6 接收调度器 - 独立实现""" - - def __init__( - self, queue_operation: ShardedQueueOperation, receive_thread_num: int = 1 - ): - self.callbacks = {} - self.is_running = False - self.lock = threading.Lock() - self.queue_operation = queue_operation - self.receive_thread = None - self.thread_pool = QThreadPool.globalInstance() - - def send_message(self, message: MessageItem, callback: Optional[Callable] = None): - """发送消息到队列""" - if callback is None: - callback = lambda message: logging.info(f"收到消息: {message.id}") - - # 入队消息 - self.queue_operation.enqueue(message.to_dict_by_core()) - with self.lock: - self.callbacks[message.id] = callback - - def _receive_loop(self): - """接收消息循环""" - while self.is_running: - try: - # 获取已完成的消息 - message_list = self.queue_operation.get_completed_messages() - if message_list: - for message_data in message_list: - try: - message = MessageItem.from_dict(message_data) - - with self.lock: - callback = self.callbacks.pop(message.id, None) - if callback is None: - callback = lambda msg: None - - # 使用线程池执行回调 - task = PySide6CallbackTask( - callback, message, self.queue_operation - ) - self.thread_pool.start(task) - - except Exception as e: - logging.error(f"处理完成消息失败: {str(e)}") - # 即使处理失败也尝试删除消息 - try: - self.queue_operation.delete_message(message_data["id"]) - except: - pass - else: - time.sleep(0.05) # 短暂休眠避免CPU空转 - - except Exception as e: - logging.error(f"接收消息循环错误: {str(e)}") - time.sleep(0.1) # 出错时稍长休眠 - - def start(self): - """启动接收调度器""" - if self.is_running: - return - - self.is_running = True - self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True) - self.receive_thread.start() - logging.info("PySide6 接收调度器已启动") - - def stop(self): - """停止接收调度器""" - if not self.is_running: - return - - self.is_running = False - if self.receive_thread and self.receive_thread.is_alive(): - self.receive_thread.join(timeout=3.0) - logging.info("PySide6 接收调度器已停止") diff --git a/src/queue_sqlite/scheduler/pyside6/task_executor.py b/src/queue_sqlite/scheduler/pyside6/task_executor.py deleted file mode 100644 index 1ef742e..0000000 --- a/src/queue_sqlite/scheduler/pyside6/task_executor.py +++ /dev/null @@ -1,148 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- -""" -@File : task_executor.py -@Time : 2025-10-28 14:31:25 -@Author : chakcy -@Email : 947105045@qq.com -@description : 任务执行器 -""" - - -import asyncio -import logging -from typing import Any, Callable, Dict -from datetime import datetime -from ...constant import MessageStatus -from ...mounter.task_mounter import TaskMounter -from queue_sqlite_core import ShardedQueueOperation -from ...model.message_item import MessageItem -from PySide6.QtCore import QRunnable - - -class PySide6TaskExecutor(QRunnable): - """PySide6 任务执行器 - 修复版""" - - def __init__( - self, message_data: Dict[str, Any], queue_operation: ShardedQueueOperation - ): - super().__init__() - self.message_data = message_data - self.queue_operation = queue_operation - self.setAutoDelete(True) - - def run(self): - """执行任务""" - try: - message = MessageItem.from_dict(self.message_data) - - if message.destination == "client": - self._process_client_message(message) - return - - task_function = TaskMounter.get_task_function(message.destination) - if task_function is None: - raise ValueError(f"任务函数未找到: {message.destination}") - - # 执行任务 - if asyncio.iscoroutinefunction(task_function): - # 异步任务 - 在当前线程运行事件循环 - asyncio.run(self._execute_async_task(message, task_function)) - else: - # 同步任务 - self._execute_sync_task(message, task_function) - - except Exception as e: - logging.error(f"任务执行错误: {str(e)}") - # 更新任务状态为失败 - try: - self.queue_operation.update_status( - self.message_data["id"], MessageStatus.FAILED.value - ) - self.queue_operation.update_result( - self.message_data["id"], f'{{"error": "{str(e)}"}}' - ) - except Exception as update_error: - logging.error(f"更新任务状态失败: {str(update_error)}") - - def _process_client_message(self, message: MessageItem): - """处理客户端消息""" - message.status = MessageStatus.COMPLETED - message.result = {"result": "success"} - message.updatetime = datetime.now() - self._update_task_result(message) - - def _execute_sync_task(self, message: MessageItem, task_function: Callable): - """执行同步任务""" - from ...cycle.task_cycle import TaskCycle - - try: - task_cycle = TaskCycle(message, task_function) - task_cycle.run() - - status = task_cycle.get_task_status() - if not status: - raise ValueError("任务未完成") - message.status = status - if message.status == MessageStatus.FAILED: - message.result = {"error": task_cycle.get_task_error()} - else: - # 获取序列化后的结果 - result_str = task_cycle.get_task_result() - try: - import json - - message.result = json.loads(result_str) - except: - message.result = {"result": result_str} - - self._update_task_result(message) - - except Exception as e: - logging.error(f"同步任务执行失败 {message.id}: {str(e)}") - message.status = MessageStatus.FAILED - message.result = {"error": str(e)} - self._update_task_result(message) - - async def _execute_async_task(self, message: MessageItem, task_function: Callable): - """执行异步任务""" - from ...cycle.async_task_cycle import AsyncTaskCycle - - try: - task_cycle = AsyncTaskCycle(message, task_function) - await task_cycle.run() - status = task_cycle.get_task_status() - if not status: - raise ValueError("任务未完成") - message.status = status - if message.status == MessageStatus.FAILED: - message.result = {"error": task_cycle.get_task_error()} - else: - # 获取序列化后的结果 - result_str = task_cycle.get_task_result() - try: - import json - - message.result = json.loads(result_str) - except: - message.result = {"result": result_str} - - self._update_task_result(message) - - except Exception as e: - logging.error(f"异步任务执行失败 {message.id}: {str(e)}") - message.status = MessageStatus.FAILED - message.result = {"error": str(e)} - self._update_task_result(message) - - def _update_task_result(self, message: MessageItem): - """更新任务结果到数据库""" - import json - - try: - # 序列化结果 - result_str = json.dumps(message.result) if message.result else "{}" - self.queue_operation.update_result(message.id, result_str) - self.queue_operation.update_status(message.id, message.status.value) - except Exception as e: - logging.error(f"更新任务结果失败 {message.id}: {str(e)}") diff --git a/src/queue_sqlite/scheduler/pyside6/task_scheduler.py b/src/queue_sqlite/scheduler/pyside6/task_scheduler.py deleted file mode 100644 index 7e6c683..0000000 --- a/src/queue_sqlite/scheduler/pyside6/task_scheduler.py +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- -""" -@File : task_scheduler.py -@Time : 2025-10-28 14:19:39 -@Author : chakcy -@Email : 947105045@qq.com -@description : PySide6 任务调度器 -""" -import logging -from PySide6.QtCore import QThreadPool -from .task_executor import PySide6TaskExecutor -from queue_sqlite_core import ShardedQueueOperation -import time -import threading - - -class PySide6TaskScheduler: - """PySide6 任务调度器 - 独立实现""" - - def __init__( - self, queue_operation: ShardedQueueOperation, task_thread_num: int = 4 - ): - self.is_running = False - self.queue_operation = queue_operation - self.task_thread = None - self.thread_pool = QThreadPool.globalInstance() - # 设置最大线程数 - self.thread_pool.setMaxThreadCount(max(1, task_thread_num)) - - def _task_loop(self): - """任务处理循环""" - while self.is_running: - try: - # 出队消息进行处理 - message_list = self.queue_operation.dequeue( - size=self.thread_pool.maxThreadCount() * 2 - ) - if message_list: - for message_data in message_list: - # 使用线程池执行任务 - task = PySide6TaskExecutor(message_data, self.queue_operation) - self.thread_pool.start(task) - else: - time.sleep(0.05) # 短暂休眠 - - except Exception as e: - logging.error(f"任务调度循环错误: {str(e)}") - time.sleep(0.1) - - def start(self): - """启动任务调度器""" - if self.is_running: - return - - self.is_running = True - self.task_thread = threading.Thread(target=self._task_loop, daemon=True) - self.task_thread.start() - logging.info("PySide6 任务调度器已启动") - - def stop(self): - """停止任务调度器""" - if not self.is_running: - return - - self.is_running = False - if self.task_thread and self.task_thread.is_alive(): - self.task_thread.join(timeout=3.0) - logging.info("PySide6 任务调度器已停止") diff --git a/src/queue_sqlite/scheduler/pyqt6/__init__.py b/src/queue_sqlite/scheduler/qt/__init__.py similarity index 76% rename from src/queue_sqlite/scheduler/pyqt6/__init__.py rename to src/queue_sqlite/scheduler/qt/__init__.py index 27c9870..9f2a2ff 100644 --- a/src/queue_sqlite/scheduler/pyqt6/__init__.py +++ b/src/queue_sqlite/scheduler/qt/__init__.py @@ -5,7 +5,7 @@ @Time : 2025-10-26 10:26:16 @Author : chakcy @Email : 947105045@qq.com -@description : pyqt6调度器 - 独立实现 +@description : Qt调度器 - 独立实现 """ import logging @@ -14,17 +14,19 @@ from ..base import BaseScheduler from ...model import MessageItem, SchedulerConfig from queue_sqlite_core import ShardedQueueOperation from ...queue_operation.listen_operation import ListenOperation -from .receive_scheduler import PyQt6ReceiveScheduler -from .listen_scheduler import PyQt6ListenScheduler -from .task_scheduler import PyQt6TaskScheduler -from ..cleanup_scheduler import CleanupScheduler as PyQt6CleanupScheduler -from PyQt6.QtCore import QThreadPool -from PyQt6.QtWidgets import QApplication +from .receive_scheduler import QtReceiveScheduler +from .listen_scheduler import QtListenScheduler +from .task_scheduler import QtTaskScheduler +from ..cleanup_scheduler import CleanupScheduler as QtCleanupScheduler +from qtpy.QtCore import QThreadPool +from qtpy.QtWidgets import QApplication from typing import Callable, Dict, Any, List -class PyQt6QueueScheduler(BaseScheduler): - """完全独立的 PyQt6 队列调度器 - 修复版""" +class QtQueueScheduler(BaseScheduler): + """完全独立的 Qt 队列调度器 - 修复版""" + + qt_type = None def __init__( self, @@ -47,24 +49,31 @@ class PyQt6QueueScheduler(BaseScheduler): self._ensure_listen_operation_tables() # 初始化各个独立的调度器组件 - self.receive_scheduler = PyQt6ReceiveScheduler( + self.receive_scheduler = QtReceiveScheduler( self.queue_operation, config.receive_thread_num ) - self.task_scheduler = PyQt6TaskScheduler( + self.task_scheduler = QtTaskScheduler( self.queue_operation, config.task_thread_num ) - self.listen_scheduler = PyQt6ListenScheduler(self.listen_operation) - self.cleanup_scheduler = PyQt6CleanupScheduler(self.queue_operation) + self.listen_scheduler = QtListenScheduler(self.listen_operation) + self.cleanup_scheduler = QtCleanupScheduler(self.queue_operation) - @staticmethod - def is_available() -> bool: - try: - import PyQt6 + @classmethod + def is_available(cls) -> bool: + # 检查 PySide6/PyQt6/PyQt5/PySide2 模块中的一个 + import importlib - # 检查是否在主线程(GUI 调度器通常要求) - return True - except ImportError: - return False # 依赖库未安装时不可用 + qt_modules = ["PySide6", "PyQt6", "PyQt5", "PySide2"] + for module in qt_modules: + try: + importlib.import_module(module) + cls.qt_type = module + logging.info(f"已找到 Qt 模块: {module}") + print(f"已找到 Qt 模块: {module}") + return True + except ImportError: # 模块未找到 + continue + return False def _ensure_qapplication(self): """确保 QApplication 实例存在""" @@ -132,7 +141,7 @@ class PyQt6QueueScheduler(BaseScheduler): self.task_scheduler.start() self.cleanup_scheduler.start_cleanup() self.listen_scheduler.start() - logging.info("PyQt6 队列调度器已完全启动") + logging.info("Qt 队列调度器已完全启动") def stop(self): """停止所有调度器组件""" @@ -140,7 +149,7 @@ class PyQt6QueueScheduler(BaseScheduler): self.cleanup_scheduler.stop_cleanup() self.task_scheduler.stop() self.receive_scheduler.stop() - logging.info("PyQt6 队列调度器已完全停止") + logging.info("Qt 队列调度器已完全停止") def get_queue_info(self) -> Dict[str, Any]: """获取队列信息""" diff --git a/src/queue_sqlite/scheduler/pyqt6/callback_task.py b/src/queue_sqlite/scheduler/qt/callback_task.py similarity index 94% rename from src/queue_sqlite/scheduler/pyqt6/callback_task.py rename to src/queue_sqlite/scheduler/qt/callback_task.py index 3410481..38dc4ad 100644 --- a/src/queue_sqlite/scheduler/pyqt6/callback_task.py +++ b/src/queue_sqlite/scheduler/qt/callback_task.py @@ -12,11 +12,11 @@ import logging from queue_sqlite_core import ShardedQueueOperation from ...model import MessageItem from typing import Callable -from PyQt6.QtCore import QRunnable +from qtpy.QtCore import QRunnable -class PyQt6CallbackTask(QRunnable): - """PyQt6 回调任务""" +class QtCallbackTask(QRunnable): + """Qt 回调任务""" def __init__( self, diff --git a/src/queue_sqlite/scheduler/pyqt6/listen_scheduler.py b/src/queue_sqlite/scheduler/qt/listen_scheduler.py similarity index 92% rename from src/queue_sqlite/scheduler/pyqt6/listen_scheduler.py rename to src/queue_sqlite/scheduler/qt/listen_scheduler.py index d2efd17..3f4b0f0 100644 --- a/src/queue_sqlite/scheduler/pyqt6/listen_scheduler.py +++ b/src/queue_sqlite/scheduler/qt/listen_scheduler.py @@ -8,15 +8,15 @@ @description : 监听调度器 """ import logging -from .listen_task import PyQt6ListenTask +from .listen_task import QtListenTask from ...queue_operation.listen_operation import ListenOperation -from PyQt6.QtCore import QThreadPool +from qtpy.QtCore import QThreadPool import threading import time -class PyQt6ListenScheduler: - """PyQt6 监听调度器 - 修复版""" +class QtListenScheduler: + """Qt 监听调度器 - 修复版""" def __init__(self, listen_operation: ListenOperation): self.listen_operation = listen_operation @@ -76,7 +76,7 @@ class PyQt6ListenScheduler: new_value = data[5] # new_value # 使用线程池执行监听任务 - task = PyQt6ListenTask( + task = QtListenTask( column_name, new_value, int(delete_id), @@ -101,7 +101,7 @@ class PyQt6ListenScheduler: self.is_running = True self.listen_thread = threading.Thread(target=self._listen_loop, daemon=True) self.listen_thread.start() - logging.info("PyQt6 监听调度器已启动") + logging.info("Qt 监听调度器已启动") def stop(self): """停止监听调度器""" @@ -111,4 +111,4 @@ class PyQt6ListenScheduler: self.is_running = False if self.listen_thread and self.listen_thread.is_alive(): self.listen_thread.join(timeout=3.0) - logging.info("PyQt6 监听调度器已停止") + logging.info("Qt 监听调度器已停止") diff --git a/src/queue_sqlite/scheduler/pyqt6/listen_task.py b/src/queue_sqlite/scheduler/qt/listen_task.py similarity index 94% rename from src/queue_sqlite/scheduler/pyqt6/listen_task.py rename to src/queue_sqlite/scheduler/qt/listen_task.py index 5043021..dd61a3b 100644 --- a/src/queue_sqlite/scheduler/pyqt6/listen_task.py +++ b/src/queue_sqlite/scheduler/qt/listen_task.py @@ -11,13 +11,13 @@ from queue_sqlite.queue_operation.listen_operation import ListenOperation from ...mounter.listen_mounter import ListenMounter import logging -from PyQt6.QtCore import QRunnable +from qtpy.QtCore import QRunnable import asyncio from typing import Callable -class PyQt6ListenTask(QRunnable): - """PyQt6 监听任务 - 修复版""" +class QtListenTask(QRunnable): + """Qt 监听任务 - 修复版""" def __init__( self, key: str, value: str, delete_id: int, listen_operation: ListenOperation diff --git a/src/queue_sqlite/scheduler/pyqt6/receive_scheduler.py b/src/queue_sqlite/scheduler/qt/receive_scheduler.py similarity index 90% rename from src/queue_sqlite/scheduler/pyqt6/receive_scheduler.py rename to src/queue_sqlite/scheduler/qt/receive_scheduler.py index b2b1914..497929e 100644 --- a/src/queue_sqlite/scheduler/pyqt6/receive_scheduler.py +++ b/src/queue_sqlite/scheduler/qt/receive_scheduler.py @@ -11,16 +11,16 @@ from typing import Callable, Optional from queue_sqlite_core import ShardedQueueOperation -from PyQt6.QtCore import QThreadPool +from qtpy.QtCore import QThreadPool import threading from ...model import MessageItem import time import logging -from .callback_task import PyQt6CallbackTask +from .callback_task import QtCallbackTask -class PyQt6ReceiveScheduler: - """PyQt6 接收调度器 - 独立实现""" +class QtReceiveScheduler: + """Qt 接收调度器 - 独立实现""" def __init__( self, queue_operation: ShardedQueueOperation, receive_thread_num: int = 1 @@ -59,7 +59,7 @@ class PyQt6ReceiveScheduler: callback = lambda msg: None # 使用线程池执行回调 - task = PyQt6CallbackTask( + task = QtCallbackTask( callback, message, self.queue_operation ) self.thread_pool.start(task) # type: ignore @@ -86,7 +86,7 @@ class PyQt6ReceiveScheduler: self.is_running = True self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True) self.receive_thread.start() - logging.info("PySide6 接收调度器已启动") + logging.info("Qt 接收调度器已启动") def stop(self): """停止接收调度器""" @@ -96,4 +96,4 @@ class PyQt6ReceiveScheduler: self.is_running = False if self.receive_thread and self.receive_thread.is_alive(): self.receive_thread.join(timeout=3.0) - logging.info("PySide6 接收调度器已停止") + logging.info("Qt 接收调度器已停止") diff --git a/src/queue_sqlite/scheduler/pyqt6/task_executor.py b/src/queue_sqlite/scheduler/qt/task_executor.py similarity index 97% rename from src/queue_sqlite/scheduler/pyqt6/task_executor.py rename to src/queue_sqlite/scheduler/qt/task_executor.py index 52808a3..6dab597 100644 --- a/src/queue_sqlite/scheduler/pyqt6/task_executor.py +++ b/src/queue_sqlite/scheduler/qt/task_executor.py @@ -17,11 +17,11 @@ from ...constant import MessageStatus from ...mounter.task_mounter import TaskMounter from queue_sqlite_core import ShardedQueueOperation from ...model.message_item import MessageItem -from PyQt6.QtCore import QRunnable +from qtpy.QtCore import QRunnable -class PyQt6TaskExecutor(QRunnable): - """PyQt6 任务执行器 - 修复版""" +class QtTaskExecutor(QRunnable): + """Qt 任务执行器 - 修复版""" def __init__( self, message_data: Dict[str, Any], queue_operation: ShardedQueueOperation diff --git a/src/queue_sqlite/scheduler/pyqt6/task_scheduler.py b/src/queue_sqlite/scheduler/qt/task_scheduler.py similarity index 82% rename from src/queue_sqlite/scheduler/pyqt6/task_scheduler.py rename to src/queue_sqlite/scheduler/qt/task_scheduler.py index 0accd20..7cd266a 100644 --- a/src/queue_sqlite/scheduler/pyqt6/task_scheduler.py +++ b/src/queue_sqlite/scheduler/qt/task_scheduler.py @@ -5,18 +5,18 @@ @Time : 2025-10-28 14:19:39 @Author : chakcy @Email : 947105045@qq.com -@description : PyQt6 任务调度器 +@description : Qt 任务调度器 """ import logging -from PyQt6.QtCore import QThreadPool -from .task_executor import PyQt6TaskExecutor +from qtpy.QtCore import QThreadPool +from .task_executor import QtTaskExecutor from queue_sqlite_core import ShardedQueueOperation import time import threading -class PyQt6TaskScheduler: - """PyQt6 任务调度器 - 独立实现""" +class QtTaskScheduler: + """Qt 任务调度器 - 独立实现""" def __init__( self, queue_operation: ShardedQueueOperation, task_thread_num: int = 4 @@ -39,7 +39,7 @@ class PyQt6TaskScheduler: if message_list: for message_data in message_list: # 使用线程池执行任务 - task = PyQt6TaskExecutor(message_data, self.queue_operation) + task = QtTaskExecutor(message_data, self.queue_operation) self.thread_pool.start(task) # type: ignore else: time.sleep(0.05) # 短暂休眠 @@ -56,7 +56,7 @@ class PyQt6TaskScheduler: self.is_running = True self.task_thread = threading.Thread(target=self._task_loop, daemon=True) self.task_thread.start() - logging.info("PySide6 任务调度器已启动") + logging.info("Qt 任务调度器已启动") def stop(self): """停止任务调度器""" @@ -66,4 +66,4 @@ class PyQt6TaskScheduler: self.is_running = False if self.task_thread and self.task_thread.is_alive(): self.task_thread.join(timeout=3.0) - logging.info("PySide6 任务调度器已停止") + logging.info("Qt 任务调度器已停止") diff --git a/tests/test_pyqt6_scheduler.py b/tests/test_pyqt6_scheduler.py deleted file mode 100644 index ce9eebf..0000000 --- a/tests/test_pyqt6_scheduler.py +++ /dev/null @@ -1,93 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- -""" -@File : test_pyqt6_scheduler.py -@Time : 2025-10-26 10:40:00 -@Author : chakcy -@description : 独立 pyqt6 调度器测试 -""" - -import time -from queue_sqlite.scheduler import QueueScheduler -from queue_sqlite.model import MessageItem -from queue_sqlite.mounter import task - - -@task(meta={"task_name": "pyqt6_example"}) -def pyqt6_example_task(message_item: MessageItem): - """pyqt6 调度器测试任务""" - print(f"处理任务: {message_item.id}") - # 模拟一些工作 - result = sum(i * i for i in range(1000)) - message_item.result = { - "status": "completed", - "result": result, - "task_id": message_item.id, - } - return message_item - - -@task(meta={"task_name": "async_pyqt6_task"}) -async def async_pyqt6_task(message_item: MessageItem): - """异步任务测试""" - import asyncio - - print(f"处理异步任务: {message_item.id}") - await asyncio.sleep(0.1) # 模拟异步操作 - message_item.result = {"status": "async_completed", "task_id": message_item.id} - return message_item - - -def task_callback(message_item: MessageItem): - """任务完成回调""" - print(f"任务完成回调: {message_item.id}, 结果: {message_item.result}") - - -def test_pyqt6_scheduler(): - """测试独立的 pyqt6 调度器""" - - # 创建独立的 pyqt6 调度器 - scheduler = QueueScheduler( - scheduler_type="pyqt6", # 指定使用 pyqt6 调度器 - ) - - print("启动 pyqt6 调度器...") - scheduler.start() - - try: - # 发送同步任务 - for i in range(5): - message = MessageItem( - content={"task_index": i, "type": "sync"}, - destination="pyqt6_example_task", - ) - scheduler.send_message(message, task_callback) - print(f"发送同步消息: {message.id}") - - # 发送异步任务 - for i in range(5): - message = MessageItem( - content={"task_index": i, "type": "async"}, - destination="async_pyqt6_task", - ) - scheduler.send_message(message, task_callback) - print(f"发送异步消息: {message.id}") - - # 等待任务处理 - print("等待任务处理...") - for i in range(10): - queue_info = scheduler.scheduler.get_queue_info() - print(f"队列状态: {queue_info}") - time.sleep(1) - - if queue_info.get("queue_length", 0) == 0: - break - - finally: - print("停止 pyqt6 调度器...") - scheduler.stop() - print("测试完成") - - -if __name__ == "__main__": - test_pyqt6_scheduler() diff --git a/tests/test_pyside6_scheduler.py b/tests/test_qt_scheduler.py similarity index 95% rename from tests/test_pyside6_scheduler.py rename to tests/test_qt_scheduler.py index 722e5a2..3c30fc3 100644 --- a/tests/test_pyside6_scheduler.py +++ b/tests/test_qt_scheduler.py @@ -48,10 +48,10 @@ def test_pyside6_scheduler(): # 创建独立的 PySide6 调度器 scheduler = QueueScheduler( - scheduler_type="pyside6", # 指定使用 PySide6 调度器 + scheduler_type="qt", # 指定使用 PySide6 调度器 ) - print("启动 PySide6 调度器...") + print("启动 qt 调度器...") scheduler.start() try: