diff --git a/src/queue_sqlite/__init__.py b/src/queue_sqlite/__init__.py index 66188b6..f24fa75 100644 --- a/src/queue_sqlite/__init__.py +++ b/src/queue_sqlite/__init__.py @@ -31,7 +31,6 @@ error_log_handler.setFormatter( logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ) logging.getLogger().addHandler(error_log_handler) -# logger = logging.getLogger(__name__).addHandler(logging.NullHandler()) __title__ = "queue_sqlite" __version__ = "0.2.0" diff --git a/src/queue_sqlite/constant/message_type.py b/src/queue_sqlite/constant/message_type.py index f27aad2..a5884fc 100644 --- a/src/queue_sqlite/constant/message_type.py +++ b/src/queue_sqlite/constant/message_type.py @@ -8,7 +8,6 @@ @description : 消息类型枚举类 """ - from enum import Enum diff --git a/src/queue_sqlite/cycle/async_task_cycle.py b/src/queue_sqlite/cycle/async_task_cycle.py index 18199d5..48ae462 100644 --- a/src/queue_sqlite/cycle/async_task_cycle.py +++ b/src/queue_sqlite/cycle/async_task_cycle.py @@ -76,23 +76,32 @@ class AsyncTaskCycle: self.task_error = None def get_task_result(self): - if isinstance(self.task_result, (dict, list)): - try: - return json.dumps(self.task_result) - except: - return json.dumps({"result": str(self.task_result)}) + """获取任务结果 - 优化版本""" + if self.task_result is None: + return json.dumps({"result": None}) - elif isinstance(self.task_result, str): + # 如果已经是字符串,尝试解析 + if isinstance(self.task_result, str): try: + # 如果是 JSON 字符串,直接返回 json.loads(self.task_result) return self.task_result except: + # 如果不是 JSON,包装成 JSON return json.dumps({"result": self.task_result}) - elif isinstance(self.task_result, (int, float, bool)): - return json.dumps({"result": self.task_result}) - elif self.task_result is None: - return "null" - else: + + # 如果是 MessageItem 对象,提取其中的 result 字段 + if isinstance(self.task_result, MessageItem): + result_data = self.task_result.result + if isinstance(result_data, (dict, list)): + return json.dumps(result_data) + else: + return json.dumps({"result": result_data}) + + # 其他情况正常序列化 + try: + return json.dumps(self.task_result) + except: return json.dumps({"result": str(self.task_result)}) def get_task_status(self): diff --git a/src/queue_sqlite/cycle/task_cycle.py b/src/queue_sqlite/cycle/task_cycle.py index 14243ac..44ac917 100644 --- a/src/queue_sqlite/cycle/task_cycle.py +++ b/src/queue_sqlite/cycle/task_cycle.py @@ -80,23 +80,32 @@ class TaskCycle: self.task_error = None def get_task_result(self): - if isinstance(self.task_result, (dict, list)): - try: - return json.dumps(self.task_result) - except: - return json.dumps({"result": str(self.task_result)}) + """获取任务结果 - 优化版本""" + if self.task_result is None: + return json.dumps({"result": None}) - elif isinstance(self.task_result, str): + # 如果已经是字符串,尝试解析 + if isinstance(self.task_result, str): try: + # 如果是 JSON 字符串,直接返回 json.loads(self.task_result) return self.task_result except: + # 如果不是 JSON,包装成 JSON return json.dumps({"result": self.task_result}) - elif isinstance(self.task_result, (int, float, bool)): - return json.dumps({"result": self.task_result}) - elif self.task_result is None: - return "null" - else: + + # 如果是 MessageItem 对象,提取其中的 result 字段 + if isinstance(self.task_result, MessageItem): + result_data = self.task_result.result + if isinstance(result_data, (dict, list)): + return json.dumps(result_data) + else: + return json.dumps({"result": result_data}) + + # 其他情况正常序列化 + try: + return json.dumps(self.task_result) + except: return json.dumps({"result": str(self.task_result)}) def get_task_status(self): diff --git a/src/queue_sqlite/model/message_item.py b/src/queue_sqlite/model/message_item.py index 00f61af..9a42f1e 100644 --- a/src/queue_sqlite/model/message_item.py +++ b/src/queue_sqlite/model/message_item.py @@ -8,7 +8,6 @@ @description : 消息数据模型 """ - from dataclasses import dataclass, field from typing import Optional, Dict, Any from datetime import datetime @@ -22,7 +21,7 @@ import logging @dataclass class MessageItem: # 必需字段 - content: dict = field( + content: dict = field( # type ignore metadata={"description": "消息内容,包含具体的任务数据或信息"} ) @@ -82,7 +81,7 @@ class MessageItem: ) @classmethod - def from_dict(cls, data: Dict[str, Any]) -> "MessageItem": + def from_dict(cls, data: dict) -> "MessageItem": """从字典创建消息对象""" # 处理日期时间字段 datetime_fields = ["createtime", "updatetime", "expire_time"] diff --git a/src/queue_sqlite/scheduler/__init__.py b/src/queue_sqlite/scheduler/__init__.py index d54c891..a1c873c 100644 --- a/src/queue_sqlite/scheduler/__init__.py +++ b/src/queue_sqlite/scheduler/__init__.py @@ -8,7 +8,6 @@ @description : 队列调度器 """ - from .standard import StandardQueueScheduler from ._async import AsyncQueueScheduler from .base import BaseScheduler @@ -19,6 +18,15 @@ import multiprocessing SCHEDULER_TYPES = {"standard": StandardQueueScheduler, "async": AsyncQueueScheduler} +try: + from .pyside6 import PySide6QueueScheduler + + print(SCHEDULER_TYPES) + if PySide6QueueScheduler.is_available(): + SCHEDULER_TYPES["pyside6"] = PySide6QueueScheduler +except ImportError: + pass + class QueueScheduler(BaseScheduler): def __init__( diff --git a/src/queue_sqlite/scheduler/pyside6/__init__.py b/src/queue_sqlite/scheduler/pyside6/__init__.py index e69de29..6dc5127 100644 --- a/src/queue_sqlite/scheduler/pyside6/__init__.py +++ b/src/queue_sqlite/scheduler/pyside6/__init__.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +""" +@File : __init__.py +@Time : 2025-10-26 10:26:16 +@Author : chakcy +@Email : 947105045@qq.com +@description : pyside6调度器 - 独立实现 +""" + +from .pyside6_scheduler import PySide6QueueScheduler + +__all__ = ["PySide6QueueScheduler"] diff --git a/src/queue_sqlite/scheduler/pyside6/pyside6_scheduler.py b/src/queue_sqlite/scheduler/pyside6/pyside6_scheduler.py new file mode 100644 index 0000000..e990306 --- /dev/null +++ b/src/queue_sqlite/scheduler/pyside6/pyside6_scheduler.py @@ -0,0 +1,669 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +""" +@File : pyside6_scheduler.py +@Time : 2025-10-26 10:30:00 +@Author : chakcy +@Email : 947105045@qq.com +@description : 完全独立的 PySide6 调度器实现 - 修复版 +""" + +from PySide6.QtCore import QThreadPool, QRunnable, QObject, Signal, QTimer +from PySide6.QtWidgets import QApplication +import threading +import time +import logging +import os +import asyncio +from typing import Callable, List, Dict, Any, Optional +from datetime import datetime + +from ...model import MessageItem +from ...constant import MessageStatus, MessagePriority, MessageType +from ...queue_operation.listen_operation import ListenOperation +from ...mounter.listen_mounter import ListenMounter +from ...mounter.task_mounter import TaskMounter +from queue_sqlite_core import ShardedQueueOperation +from ..base import BaseScheduler +from ..cleanup_scheduler import CleanupScheduler + + +class PySide6CallbackTask(QRunnable): + """PySide6 回调任务""" + + def __init__( + self, + callback: Callable, + message: MessageItem, + queue_operation: ShardedQueueOperation, + ): + super().__init__() + self.callback = callback + self.message = message + self.queue_operation = queue_operation + self.setAutoDelete(True) + + def run(self): + """执行回调函数""" + try: + # 检查是否是协程函数 + if asyncio.iscoroutinefunction(self.callback): + # 对于异步回调,在当前线程中运行事件循环 + asyncio.run(self._run_async_callback()) + else: + self.callback(self.message) + except Exception as e: + logging.error(f"回调执行错误: {str(e)}") + finally: + # 删除消息 + try: + self.queue_operation.delete_message(self.message.id) + except Exception as e: + logging.error(f"删除消息失败 {self.message.id}: {str(e)}") + + async def _run_async_callback(self): + """运行异步回调""" + try: + await self.callback(self.message) + except Exception as e: + logging.error(f"异步回调执行错误: {str(e)}") + + +class PySide6TaskExecutor(QRunnable): + """PySide6 任务执行器 - 修复版""" + + def __init__( + self, message_data: Dict[str, Any], queue_operation: ShardedQueueOperation + ): + super().__init__() + self.message_data = message_data + self.queue_operation = queue_operation + self.setAutoDelete(True) + + def run(self): + """执行任务""" + try: + message = MessageItem.from_dict(self.message_data) + + if message.destination == "client": + self._process_client_message(message) + return + + task_function = TaskMounter.get_task_function(message.destination) + if task_function is None: + raise ValueError(f"任务函数未找到: {message.destination}") + + # 执行任务 + if asyncio.iscoroutinefunction(task_function): + # 异步任务 - 在当前线程运行事件循环 + asyncio.run(self._execute_async_task(message, task_function)) + else: + # 同步任务 + self._execute_sync_task(message, task_function) + + except Exception as e: + logging.error(f"任务执行错误: {str(e)}") + # 更新任务状态为失败 + try: + self.queue_operation.update_status( + self.message_data["id"], MessageStatus.FAILED.value + ) + self.queue_operation.update_result( + self.message_data["id"], f'{{"error": "{str(e)}"}}' + ) + except Exception as update_error: + logging.error(f"更新任务状态失败: {str(update_error)}") + + def _process_client_message(self, message: MessageItem): + """处理客户端消息""" + message.status = MessageStatus.COMPLETED + message.result = {"result": "success"} + message.updatetime = datetime.now() + self._update_task_result(message) + + def _execute_sync_task(self, message: MessageItem, task_function: Callable): + """执行同步任务""" + from ...cycle.task_cycle import TaskCycle + + try: + task_cycle = TaskCycle(message, task_function) + task_cycle.run() + + status = task_cycle.get_task_status() + if not status: + raise ValueError("任务未完成") + message.status = status + if message.status == MessageStatus.FAILED: + message.result = {"error": task_cycle.get_task_error()} + else: + # 获取序列化后的结果 + result_str = task_cycle.get_task_result() + try: + import json + + message.result = json.loads(result_str) + except: + message.result = {"result": result_str} + + self._update_task_result(message) + + except Exception as e: + logging.error(f"同步任务执行失败 {message.id}: {str(e)}") + message.status = MessageStatus.FAILED + message.result = {"error": str(e)} + self._update_task_result(message) + + async def _execute_async_task(self, message: MessageItem, task_function: Callable): + """执行异步任务""" + from ...cycle.async_task_cycle import AsyncTaskCycle + + try: + task_cycle = AsyncTaskCycle(message, task_function) + await task_cycle.run() + status = task_cycle.get_task_status() + if not status: + raise ValueError("任务未完成") + message.status = status + if message.status == MessageStatus.FAILED: + message.result = {"error": task_cycle.get_task_error()} + else: + # 获取序列化后的结果 + result_str = task_cycle.get_task_result() + try: + import json + + message.result = json.loads(result_str) + except: + message.result = {"result": result_str} + + self._update_task_result(message) + + except Exception as e: + logging.error(f"异步任务执行失败 {message.id}: {str(e)}") + message.status = MessageStatus.FAILED + message.result = {"error": str(e)} + self._update_task_result(message) + + def _update_task_result(self, message: MessageItem): + """更新任务结果到数据库""" + import json + + try: + # 序列化结果 + result_str = json.dumps(message.result) if message.result else "{}" + self.queue_operation.update_result(message.id, result_str) + self.queue_operation.update_status(message.id, message.status.value) + except Exception as e: + logging.error(f"更新任务结果失败 {message.id}: {str(e)}") + + +class PySide6ListenTask(QRunnable): + """PySide6 监听任务 - 修复版""" + + def __init__( + self, key: str, value: str, delete_id: int, listen_operation: ListenOperation + ): + super().__init__() + self.key = key + self.value = value + self.delete_id = delete_id + self.listen_operation = listen_operation + self.setAutoDelete(True) + + def run(self): + """执行监听回调""" + try: + listen_function = ListenMounter.get_Listener_function(self.key) + if listen_function: + if asyncio.iscoroutinefunction(listen_function): + # 异步监听函数 + asyncio.run(self._run_async_listener(listen_function)) + else: + # 同步监听函数 + listen_function(self.value) + except Exception as e: + logging.error(f"监听函数执行错误 {self.key}: {str(e)}") + finally: + # 删除变更日志 + try: + self.listen_operation.delete_change_log(self.delete_id) + except Exception as e: + logging.error(f"删除变更日志失败 {self.delete_id}: {str(e)}") + + async def _run_async_listener(self, listen_function: Callable): + """执行异步监听函数""" + try: + await listen_function(self.value) + except Exception as e: + logging.error(f"异步监听函数执行错误 {self.key}: {str(e)}") + + +class PySide6ReceiveScheduler: + """PySide6 接收调度器 - 独立实现""" + + def __init__( + self, queue_operation: ShardedQueueOperation, receive_thread_num: int = 1 + ): + self.callbacks = {} + self.is_running = False + self.lock = threading.Lock() + self.queue_operation = queue_operation + self.receive_thread = None + self.thread_pool = QThreadPool.globalInstance() + + def send_message(self, message: MessageItem, callback: Optional[Callable] = None): + """发送消息到队列""" + if callback is None: + callback = lambda message: logging.info(f"收到消息: {message.id}") + + # 入队消息 + self.queue_operation.enqueue(message.to_dict_by_core()) + with self.lock: + self.callbacks[message.id] = callback + + def _receive_loop(self): + """接收消息循环""" + while self.is_running: + try: + # 获取已完成的消息 + message_list = self.queue_operation.get_completed_messages() + if message_list: + for message_data in message_list: + try: + message = MessageItem.from_dict(message_data) + + with self.lock: + callback = self.callbacks.pop(message.id, None) + if callback is None: + callback = lambda msg: None + + # 使用线程池执行回调 + task = PySide6CallbackTask( + callback, message, self.queue_operation + ) + self.thread_pool.start(task) + + except Exception as e: + logging.error(f"处理完成消息失败: {str(e)}") + # 即使处理失败也尝试删除消息 + try: + self.queue_operation.delete_message(message_data["id"]) + except: + pass + else: + time.sleep(0.05) # 短暂休眠避免CPU空转 + + except Exception as e: + logging.error(f"接收消息循环错误: {str(e)}") + time.sleep(0.1) # 出错时稍长休眠 + + def start(self): + """启动接收调度器""" + if self.is_running: + return + + self.is_running = True + self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True) + self.receive_thread.start() + logging.info("PySide6 接收调度器已启动") + + def stop(self): + """停止接收调度器""" + if not self.is_running: + return + + self.is_running = False + if self.receive_thread and self.receive_thread.is_alive(): + self.receive_thread.join(timeout=3.0) + logging.info("PySide6 接收调度器已停止") + + +class PySide6TaskScheduler: + """PySide6 任务调度器 - 独立实现""" + + def __init__( + self, queue_operation: ShardedQueueOperation, task_thread_num: int = 4 + ): + self.is_running = False + self.queue_operation = queue_operation + self.task_thread = None + self.thread_pool = QThreadPool.globalInstance() + # 设置最大线程数 + self.thread_pool.setMaxThreadCount(max(1, task_thread_num)) + + def _task_loop(self): + """任务处理循环""" + while self.is_running: + try: + # 出队消息进行处理 + message_list = self.queue_operation.dequeue( + size=self.thread_pool.maxThreadCount() * 2 + ) + if message_list: + for message_data in message_list: + # 使用线程池执行任务 + task = PySide6TaskExecutor(message_data, self.queue_operation) + self.thread_pool.start(task) + else: + time.sleep(0.05) # 短暂休眠 + + except Exception as e: + logging.error(f"任务调度循环错误: {str(e)}") + time.sleep(0.1) + + def start(self): + """启动任务调度器""" + if self.is_running: + return + + self.is_running = True + self.task_thread = threading.Thread(target=self._task_loop, daemon=True) + self.task_thread.start() + logging.info("PySide6 任务调度器已启动") + + def stop(self): + """停止任务调度器""" + if not self.is_running: + return + + self.is_running = False + if self.task_thread and self.task_thread.is_alive(): + self.task_thread.join(timeout=3.0) + logging.info("PySide6 任务调度器已停止") + + +class PySide6ListenScheduler: + """PySide6 监听调度器 - 修复版""" + + def __init__(self, listen_operation: ListenOperation): + self.listen_operation = listen_operation + self.is_running = False + self.listen_thread = None + self.thread_pool = QThreadPool.globalInstance() + + # 确保监听表存在 + self._ensure_listen_tables() + + def _ensure_listen_tables(self): + """确保监听相关的表存在""" + try: + # 检查表是否存在,如果不存在则创建 + conn = self.listen_operation._get_connection() + + # 检查 change_log 表是否存在 + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='change_log'" + ) + if cursor.fetchone() is None: + logging.info("创建缺失的 change_log 表") + conn.execute( + """ + CREATE TABLE change_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + table_name TEXT, + row_id INTEGER, + column_name TEXT, + old_value TEXT, + new_value TEXT, + is_delete integer DEFAULT 0, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + conn.commit() + + except Exception as e: + logging.error(f"确保监听表存在失败: {str(e)}") + + def _listen_loop(self): + """监听数据变化循环""" + while self.is_running: + try: + status, change_data_items = self.listen_operation.listen_data() + if status: + for data in change_data_items: + try: + # 修正字段索引 + if len(data) >= 8: + delete_id = data[0] # id + table_name = data[1] # table_name + row_id = data[2] # row_id + column_name = data[3] # column_name + old_value = data[4] # old_value + new_value = data[5] # new_value + + # 使用线程池执行监听任务 + task = PySide6ListenTask( + column_name, + new_value, + int(delete_id), + self.listen_operation, + ) + self.thread_pool.start(task) + + except Exception as e: + logging.error(f"处理监听数据失败: {str(e)}") + else: + time.sleep(0.05) + + except Exception as e: + logging.error(f"监听循环错误: {str(e)}") + time.sleep(0.1) + + def start(self): + """启动监听调度器""" + if self.is_running: + return + + self.is_running = True + self.listen_thread = threading.Thread(target=self._listen_loop, daemon=True) + self.listen_thread.start() + logging.info("PySide6 监听调度器已启动") + + def stop(self): + """停止监听调度器""" + if not self.is_running: + return + + self.is_running = False + if self.listen_thread and self.listen_thread.is_alive(): + self.listen_thread.join(timeout=3.0) + logging.info("PySide6 监听调度器已停止") + + +class PySide6CleanupScheduler: + """PySide6 清理调度器 - 独立实现""" + + def __init__( + self, + queue_operation: ShardedQueueOperation, + interval_minutes: int = 60, + remove_days: int = 30, + ): + self.queue_operation = queue_operation + self.interval = interval_minutes * 60 # 转换为秒 + self.remove_days = remove_days + self.is_running = False + self.cleanup_thread = None + + # 立即执行一次清理 + self._perform_cleanup() + + def _cleanup_loop(self): + """清理循环""" + while self.is_running: + try: + self._perform_cleanup() + except Exception as e: + logging.error(f"清理操作错误: {str(e)}") + + # 休眠等待下次清理 + for _ in range(self.interval): + if not self.is_running: + break + time.sleep(1) + + def _perform_cleanup(self): + """执行清理操作""" + try: + # 清理过期但未处理的消息 + self.queue_operation.clean_expired_messages() + # 彻底删除指定天数前的消息 + self.queue_operation.remove_expired_messages(self.remove_days) + except Exception as e: + logging.error(f"执行清理失败: {str(e)}") + + def start(self): + """启动清理调度器""" + if self.is_running: + return + + self.is_running = True + self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) + self.cleanup_thread.start() + logging.info("PySide6 清理调度器已启动") + + def stop(self): + """停止清理调度器""" + if not self.is_running: + return + + self.is_running = False + if self.cleanup_thread and self.cleanup_thread.is_alive(): + self.cleanup_thread.join(timeout=3.0) + logging.info("PySide6 清理调度器已停止") + + +class PySide6QueueScheduler(BaseScheduler): + """完全独立的 PySide6 队列调度器 - 修复版""" + + def __init__( + self, + receive_thread_num: int = 1, + task_thread_num: int = 4, + shard_num: int = 4, + queue_name: str = "pyside6_queue", + ): + # 确保有 QApplication 实例(对于 GUI 应用) + self._ensure_qapplication() + + # 初始化队列操作 + self.queue_operation = ShardedQueueOperation(shard_num, queue_name) + + # 初始化监听操作 + db_dir = f"cache/{queue_name}" + os.makedirs(db_dir, exist_ok=True) + self.listen_operation = ListenOperation(f"{db_dir}/listen.db") + + # 确保监听表被创建 + self._ensure_listen_operation_tables() + + # 初始化各个独立的调度器组件 + self.receive_scheduler = PySide6ReceiveScheduler( + self.queue_operation, receive_thread_num + ) + self.task_scheduler = PySide6TaskScheduler( + self.queue_operation, task_thread_num + ) + self.listen_scheduler = PySide6ListenScheduler(self.listen_operation) + self.cleanup_scheduler = PySide6CleanupScheduler(self.queue_operation) + + @staticmethod + def is_available() -> bool: + try: + import PySide6 + + # 检查是否在主线程(GUI 调度器通常要求) + return True + except ImportError: + return False # 依赖库未安装时不可用 + + def _ensure_qapplication(self): + """确保 QApplication 实例存在""" + try: + if not QApplication.instance(): + # 对于非GUI应用,创建无窗口的QApplication + import sys + + self.app = QApplication(sys.argv if hasattr(sys, "argv") else []) + logging.info("已创建 QApplication 实例") + except Exception as e: + logging.warning(f"创建 QApplication 失败: {str(e)}") + + def _ensure_listen_operation_tables(self): + """确保监听操作的表被正确创建""" + try: + # 强制重新创建表 + self.listen_operation.create_table() + + # 额外检查 change_log 表 + conn = self.listen_operation._get_connection() + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='change_log'" + ) + if cursor.fetchone() is None: + logging.warning("change_log 表不存在,尝试手动创建") + conn.execute( + """ + CREATE TABLE change_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + table_name TEXT, + row_id INTEGER, + column_name TEXT, + old_value TEXT, + new_value TEXT, + is_delete integer DEFAULT 0, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + conn.commit() + + except Exception as e: + logging.error(f"确保监听表存在失败: {str(e)}") + + def send_message(self, message: MessageItem, callback: Callable): + """发送消息到队列""" + self.receive_scheduler.send_message(message, callback) + + def update_listen_data(self, key: str, value: str): + """更新监听数据""" + self.listen_operation.update_listen_data(key, value) + + def get_listen_datas(self) -> List: + """获取所有监听数据""" + return self.listen_operation.get_values() + + def get_listen_data(self, key: str): + """获取单个监听数据""" + return self.listen_operation.get_value(key) + + def start(self): + """启动所有调度器组件""" + self.receive_scheduler.start() + self.task_scheduler.start() + self.cleanup_scheduler.start() + self.listen_scheduler.start() + logging.info("PySide6 队列调度器已完全启动") + + def stop(self): + """停止所有调度器组件""" + self.listen_scheduler.stop() + self.cleanup_scheduler.stop() + self.task_scheduler.stop() + self.receive_scheduler.stop() + logging.info("PySide6 队列调度器已完全停止") + + def get_queue_info(self) -> Dict[str, Any]: + """获取队列信息""" + try: + return { + "queue_length": self.queue_operation.get_queue_length(), + "shard_num": self.queue_operation.shard_num, + "db_dir": self.queue_operation.db_dir, + "active_threads": QThreadPool.globalInstance().activeThreadCount(), + "max_threads": QThreadPool.globalInstance().maxThreadCount(), + } + except Exception as e: + logging.error(f"获取队列信息失败: {str(e)}") + return {} diff --git a/tests/test_pyside6_scheduler.py b/tests/test_pyside6_scheduler.py new file mode 100644 index 0000000..9578b35 --- /dev/null +++ b/tests/test_pyside6_scheduler.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +""" +@File : test_pyside6_scheduler.py +@Time : 2025-10-26 10:40:00 +@Author : chakcy +@description : 独立 PySide6 调度器测试 +""" + +import time +from queue_sqlite.scheduler import QueueScheduler +from queue_sqlite.model import MessageItem +from queue_sqlite.mounter import task + + +@task(meta={"task_name": "pyside6_example"}) +def pyside6_example_task(message_item: MessageItem): + """PySide6 调度器测试任务""" + print(f"处理任务: {message_item.id}") + # 模拟一些工作 + result = sum(i * i for i in range(1000)) + message_item.result = { + "status": "completed", + "result": result, + "task_id": message_item.id, + } + return message_item + + +@task(meta={"task_name": "async_pyside6_task"}) +async def async_pyside6_task(message_item: MessageItem): + """异步任务测试""" + import asyncio + + print(f"处理异步任务: {message_item.id}") + await asyncio.sleep(0.1) # 模拟异步操作 + message_item.result = {"status": "async_completed", "task_id": message_item.id} + return message_item + + +def task_callback(message_item: MessageItem): + """任务完成回调""" + print(f"任务完成回调: {message_item.id}, 结果: {message_item.result}") + + +def test_pyside6_scheduler(): + """测试独立的 PySide6 调度器""" + + # 创建独立的 PySide6 调度器 + scheduler = QueueScheduler( + receive_thread_num=2, + task_thread_num=6, # 使用 PySide6 线程池 + shard_num=4, + scheduler_type="pyside6", # 指定使用 PySide6 调度器 + ) + + print("启动 PySide6 调度器...") + scheduler.start() + + try: + # 发送同步任务 + for i in range(5): + message = MessageItem( + content={"task_index": i, "type": "sync"}, + destination="pyside6_example_task", + ) + scheduler.send_message(message, task_callback) + print(f"发送同步消息: {message.id}") + + # 发送异步任务 + for i in range(5): + message = MessageItem( + content={"task_index": i, "type": "async"}, + destination="async_pyside6_task", + ) + scheduler.send_message(message, task_callback) + print(f"发送异步消息: {message.id}") + + # 等待任务处理 + print("等待任务处理...") + for i in range(10): + queue_info = scheduler.scheduler.get_queue_info() + print(f"队列状态: {queue_info}") + time.sleep(1) + + if queue_info.get("queue_length", 0) == 0: + break + + finally: + print("停止 PySide6 调度器...") + scheduler.stop() + print("测试完成") + + +if __name__ == "__main__": + test_pyside6_scheduler()