modify: pyside6 调度器

This commit is contained in:
chakcy 2025-10-26 21:27:49 +08:00
parent c6bcb64c61
commit 6bd1e28815
9 changed files with 829 additions and 28 deletions

View File

@ -31,7 +31,6 @@ error_log_handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
logging.getLogger().addHandler(error_log_handler)
# logger = logging.getLogger(__name__).addHandler(logging.NullHandler())
__title__ = "queue_sqlite"
__version__ = "0.2.0"

View File

@ -8,7 +8,6 @@
@description : 消息类型枚举类
"""
from enum import Enum

View File

@ -76,23 +76,32 @@ class AsyncTaskCycle:
self.task_error = None
def get_task_result(self):
if isinstance(self.task_result, (dict, list)):
try:
return json.dumps(self.task_result)
except:
return json.dumps({"result": str(self.task_result)})
"""获取任务结果 - 优化版本"""
if self.task_result is None:
return json.dumps({"result": None})
elif isinstance(self.task_result, str):
# 如果已经是字符串,尝试解析
if isinstance(self.task_result, str):
try:
# 如果是 JSON 字符串,直接返回
json.loads(self.task_result)
return self.task_result
except:
# 如果不是 JSON包装成 JSON
return json.dumps({"result": self.task_result})
elif isinstance(self.task_result, (int, float, bool)):
return json.dumps({"result": self.task_result})
elif self.task_result is None:
return "null"
else:
# 如果是 MessageItem 对象,提取其中的 result 字段
if isinstance(self.task_result, MessageItem):
result_data = self.task_result.result
if isinstance(result_data, (dict, list)):
return json.dumps(result_data)
else:
return json.dumps({"result": result_data})
# 其他情况正常序列化
try:
return json.dumps(self.task_result)
except:
return json.dumps({"result": str(self.task_result)})
def get_task_status(self):

View File

@ -80,23 +80,32 @@ class TaskCycle:
self.task_error = None
def get_task_result(self):
if isinstance(self.task_result, (dict, list)):
try:
return json.dumps(self.task_result)
except:
return json.dumps({"result": str(self.task_result)})
"""获取任务结果 - 优化版本"""
if self.task_result is None:
return json.dumps({"result": None})
elif isinstance(self.task_result, str):
# 如果已经是字符串,尝试解析
if isinstance(self.task_result, str):
try:
# 如果是 JSON 字符串,直接返回
json.loads(self.task_result)
return self.task_result
except:
# 如果不是 JSON包装成 JSON
return json.dumps({"result": self.task_result})
elif isinstance(self.task_result, (int, float, bool)):
return json.dumps({"result": self.task_result})
elif self.task_result is None:
return "null"
else:
# 如果是 MessageItem 对象,提取其中的 result 字段
if isinstance(self.task_result, MessageItem):
result_data = self.task_result.result
if isinstance(result_data, (dict, list)):
return json.dumps(result_data)
else:
return json.dumps({"result": result_data})
# 其他情况正常序列化
try:
return json.dumps(self.task_result)
except:
return json.dumps({"result": str(self.task_result)})
def get_task_status(self):

View File

@ -8,7 +8,6 @@
@description : 消息数据模型
"""
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from datetime import datetime
@ -22,7 +21,7 @@ import logging
@dataclass
class MessageItem:
# 必需字段
content: dict = field(
content: dict = field( # type ignore
metadata={"description": "消息内容,包含具体的任务数据或信息"}
)
@ -82,7 +81,7 @@ class MessageItem:
)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MessageItem":
def from_dict(cls, data: dict) -> "MessageItem":
"""从字典创建消息对象"""
# 处理日期时间字段
datetime_fields = ["createtime", "updatetime", "expire_time"]

View File

@ -8,7 +8,6 @@
@description : 队列调度器
"""
from .standard import StandardQueueScheduler
from ._async import AsyncQueueScheduler
from .base import BaseScheduler
@ -19,6 +18,15 @@ import multiprocessing
SCHEDULER_TYPES = {"standard": StandardQueueScheduler, "async": AsyncQueueScheduler}
try:
from .pyside6 import PySide6QueueScheduler
print(SCHEDULER_TYPES)
if PySide6QueueScheduler.is_available():
SCHEDULER_TYPES["pyside6"] = PySide6QueueScheduler
except ImportError:
pass
class QueueScheduler(BaseScheduler):
def __init__(

View File

@ -0,0 +1,13 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : __init__.py
@Time : 2025-10-26 10:26:16
@Author : chakcy
@Email : 947105045@qq.com
@description : pyside6调度器 - 独立实现
"""
from .pyside6_scheduler import PySide6QueueScheduler
__all__ = ["PySide6QueueScheduler"]

View File

@ -0,0 +1,669 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : pyside6_scheduler.py
@Time : 2025-10-26 10:30:00
@Author : chakcy
@Email : 947105045@qq.com
@description : 完全独立的 PySide6 调度器实现 - 修复版
"""
from PySide6.QtCore import QThreadPool, QRunnable, QObject, Signal, QTimer
from PySide6.QtWidgets import QApplication
import threading
import time
import logging
import os
import asyncio
from typing import Callable, List, Dict, Any, Optional
from datetime import datetime
from ...model import MessageItem
from ...constant import MessageStatus, MessagePriority, MessageType
from ...queue_operation.listen_operation import ListenOperation
from ...mounter.listen_mounter import ListenMounter
from ...mounter.task_mounter import TaskMounter
from queue_sqlite_core import ShardedQueueOperation
from ..base import BaseScheduler
from ..cleanup_scheduler import CleanupScheduler
class PySide6CallbackTask(QRunnable):
"""PySide6 回调任务"""
def __init__(
self,
callback: Callable,
message: MessageItem,
queue_operation: ShardedQueueOperation,
):
super().__init__()
self.callback = callback
self.message = message
self.queue_operation = queue_operation
self.setAutoDelete(True)
def run(self):
"""执行回调函数"""
try:
# 检查是否是协程函数
if asyncio.iscoroutinefunction(self.callback):
# 对于异步回调,在当前线程中运行事件循环
asyncio.run(self._run_async_callback())
else:
self.callback(self.message)
except Exception as e:
logging.error(f"回调执行错误: {str(e)}")
finally:
# 删除消息
try:
self.queue_operation.delete_message(self.message.id)
except Exception as e:
logging.error(f"删除消息失败 {self.message.id}: {str(e)}")
async def _run_async_callback(self):
"""运行异步回调"""
try:
await self.callback(self.message)
except Exception as e:
logging.error(f"异步回调执行错误: {str(e)}")
class PySide6TaskExecutor(QRunnable):
"""PySide6 任务执行器 - 修复版"""
def __init__(
self, message_data: Dict[str, Any], queue_operation: ShardedQueueOperation
):
super().__init__()
self.message_data = message_data
self.queue_operation = queue_operation
self.setAutoDelete(True)
def run(self):
"""执行任务"""
try:
message = MessageItem.from_dict(self.message_data)
if message.destination == "client":
self._process_client_message(message)
return
task_function = TaskMounter.get_task_function(message.destination)
if task_function is None:
raise ValueError(f"任务函数未找到: {message.destination}")
# 执行任务
if asyncio.iscoroutinefunction(task_function):
# 异步任务 - 在当前线程运行事件循环
asyncio.run(self._execute_async_task(message, task_function))
else:
# 同步任务
self._execute_sync_task(message, task_function)
except Exception as e:
logging.error(f"任务执行错误: {str(e)}")
# 更新任务状态为失败
try:
self.queue_operation.update_status(
self.message_data["id"], MessageStatus.FAILED.value
)
self.queue_operation.update_result(
self.message_data["id"], f'{{"error": "{str(e)}"}}'
)
except Exception as update_error:
logging.error(f"更新任务状态失败: {str(update_error)}")
def _process_client_message(self, message: MessageItem):
"""处理客户端消息"""
message.status = MessageStatus.COMPLETED
message.result = {"result": "success"}
message.updatetime = datetime.now()
self._update_task_result(message)
def _execute_sync_task(self, message: MessageItem, task_function: Callable):
"""执行同步任务"""
from ...cycle.task_cycle import TaskCycle
try:
task_cycle = TaskCycle(message, task_function)
task_cycle.run()
status = task_cycle.get_task_status()
if not status:
raise ValueError("任务未完成")
message.status = status
if message.status == MessageStatus.FAILED:
message.result = {"error": task_cycle.get_task_error()}
else:
# 获取序列化后的结果
result_str = task_cycle.get_task_result()
try:
import json
message.result = json.loads(result_str)
except:
message.result = {"result": result_str}
self._update_task_result(message)
except Exception as e:
logging.error(f"同步任务执行失败 {message.id}: {str(e)}")
message.status = MessageStatus.FAILED
message.result = {"error": str(e)}
self._update_task_result(message)
async def _execute_async_task(self, message: MessageItem, task_function: Callable):
"""执行异步任务"""
from ...cycle.async_task_cycle import AsyncTaskCycle
try:
task_cycle = AsyncTaskCycle(message, task_function)
await task_cycle.run()
status = task_cycle.get_task_status()
if not status:
raise ValueError("任务未完成")
message.status = status
if message.status == MessageStatus.FAILED:
message.result = {"error": task_cycle.get_task_error()}
else:
# 获取序列化后的结果
result_str = task_cycle.get_task_result()
try:
import json
message.result = json.loads(result_str)
except:
message.result = {"result": result_str}
self._update_task_result(message)
except Exception as e:
logging.error(f"异步任务执行失败 {message.id}: {str(e)}")
message.status = MessageStatus.FAILED
message.result = {"error": str(e)}
self._update_task_result(message)
def _update_task_result(self, message: MessageItem):
"""更新任务结果到数据库"""
import json
try:
# 序列化结果
result_str = json.dumps(message.result) if message.result else "{}"
self.queue_operation.update_result(message.id, result_str)
self.queue_operation.update_status(message.id, message.status.value)
except Exception as e:
logging.error(f"更新任务结果失败 {message.id}: {str(e)}")
class PySide6ListenTask(QRunnable):
"""PySide6 监听任务 - 修复版"""
def __init__(
self, key: str, value: str, delete_id: int, listen_operation: ListenOperation
):
super().__init__()
self.key = key
self.value = value
self.delete_id = delete_id
self.listen_operation = listen_operation
self.setAutoDelete(True)
def run(self):
"""执行监听回调"""
try:
listen_function = ListenMounter.get_Listener_function(self.key)
if listen_function:
if asyncio.iscoroutinefunction(listen_function):
# 异步监听函数
asyncio.run(self._run_async_listener(listen_function))
else:
# 同步监听函数
listen_function(self.value)
except Exception as e:
logging.error(f"监听函数执行错误 {self.key}: {str(e)}")
finally:
# 删除变更日志
try:
self.listen_operation.delete_change_log(self.delete_id)
except Exception as e:
logging.error(f"删除变更日志失败 {self.delete_id}: {str(e)}")
async def _run_async_listener(self, listen_function: Callable):
"""执行异步监听函数"""
try:
await listen_function(self.value)
except Exception as e:
logging.error(f"异步监听函数执行错误 {self.key}: {str(e)}")
class PySide6ReceiveScheduler:
"""PySide6 接收调度器 - 独立实现"""
def __init__(
self, queue_operation: ShardedQueueOperation, receive_thread_num: int = 1
):
self.callbacks = {}
self.is_running = False
self.lock = threading.Lock()
self.queue_operation = queue_operation
self.receive_thread = None
self.thread_pool = QThreadPool.globalInstance()
def send_message(self, message: MessageItem, callback: Optional[Callable] = None):
"""发送消息到队列"""
if callback is None:
callback = lambda message: logging.info(f"收到消息: {message.id}")
# 入队消息
self.queue_operation.enqueue(message.to_dict_by_core())
with self.lock:
self.callbacks[message.id] = callback
def _receive_loop(self):
"""接收消息循环"""
while self.is_running:
try:
# 获取已完成的消息
message_list = self.queue_operation.get_completed_messages()
if message_list:
for message_data in message_list:
try:
message = MessageItem.from_dict(message_data)
with self.lock:
callback = self.callbacks.pop(message.id, None)
if callback is None:
callback = lambda msg: None
# 使用线程池执行回调
task = PySide6CallbackTask(
callback, message, self.queue_operation
)
self.thread_pool.start(task)
except Exception as e:
logging.error(f"处理完成消息失败: {str(e)}")
# 即使处理失败也尝试删除消息
try:
self.queue_operation.delete_message(message_data["id"])
except:
pass
else:
time.sleep(0.05) # 短暂休眠避免CPU空转
except Exception as e:
logging.error(f"接收消息循环错误: {str(e)}")
time.sleep(0.1) # 出错时稍长休眠
def start(self):
"""启动接收调度器"""
if self.is_running:
return
self.is_running = True
self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True)
self.receive_thread.start()
logging.info("PySide6 接收调度器已启动")
def stop(self):
"""停止接收调度器"""
if not self.is_running:
return
self.is_running = False
if self.receive_thread and self.receive_thread.is_alive():
self.receive_thread.join(timeout=3.0)
logging.info("PySide6 接收调度器已停止")
class PySide6TaskScheduler:
"""PySide6 任务调度器 - 独立实现"""
def __init__(
self, queue_operation: ShardedQueueOperation, task_thread_num: int = 4
):
self.is_running = False
self.queue_operation = queue_operation
self.task_thread = None
self.thread_pool = QThreadPool.globalInstance()
# 设置最大线程数
self.thread_pool.setMaxThreadCount(max(1, task_thread_num))
def _task_loop(self):
"""任务处理循环"""
while self.is_running:
try:
# 出队消息进行处理
message_list = self.queue_operation.dequeue(
size=self.thread_pool.maxThreadCount() * 2
)
if message_list:
for message_data in message_list:
# 使用线程池执行任务
task = PySide6TaskExecutor(message_data, self.queue_operation)
self.thread_pool.start(task)
else:
time.sleep(0.05) # 短暂休眠
except Exception as e:
logging.error(f"任务调度循环错误: {str(e)}")
time.sleep(0.1)
def start(self):
"""启动任务调度器"""
if self.is_running:
return
self.is_running = True
self.task_thread = threading.Thread(target=self._task_loop, daemon=True)
self.task_thread.start()
logging.info("PySide6 任务调度器已启动")
def stop(self):
"""停止任务调度器"""
if not self.is_running:
return
self.is_running = False
if self.task_thread and self.task_thread.is_alive():
self.task_thread.join(timeout=3.0)
logging.info("PySide6 任务调度器已停止")
class PySide6ListenScheduler:
"""PySide6 监听调度器 - 修复版"""
def __init__(self, listen_operation: ListenOperation):
self.listen_operation = listen_operation
self.is_running = False
self.listen_thread = None
self.thread_pool = QThreadPool.globalInstance()
# 确保监听表存在
self._ensure_listen_tables()
def _ensure_listen_tables(self):
"""确保监听相关的表存在"""
try:
# 检查表是否存在,如果不存在则创建
conn = self.listen_operation._get_connection()
# 检查 change_log 表是否存在
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='change_log'"
)
if cursor.fetchone() is None:
logging.info("创建缺失的 change_log 表")
conn.execute(
"""
CREATE TABLE change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT,
row_id INTEGER,
column_name TEXT,
old_value TEXT,
new_value TEXT,
is_delete integer DEFAULT 0,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
except Exception as e:
logging.error(f"确保监听表存在失败: {str(e)}")
def _listen_loop(self):
"""监听数据变化循环"""
while self.is_running:
try:
status, change_data_items = self.listen_operation.listen_data()
if status:
for data in change_data_items:
try:
# 修正字段索引
if len(data) >= 8:
delete_id = data[0] # id
table_name = data[1] # table_name
row_id = data[2] # row_id
column_name = data[3] # column_name
old_value = data[4] # old_value
new_value = data[5] # new_value
# 使用线程池执行监听任务
task = PySide6ListenTask(
column_name,
new_value,
int(delete_id),
self.listen_operation,
)
self.thread_pool.start(task)
except Exception as e:
logging.error(f"处理监听数据失败: {str(e)}")
else:
time.sleep(0.05)
except Exception as e:
logging.error(f"监听循环错误: {str(e)}")
time.sleep(0.1)
def start(self):
"""启动监听调度器"""
if self.is_running:
return
self.is_running = True
self.listen_thread = threading.Thread(target=self._listen_loop, daemon=True)
self.listen_thread.start()
logging.info("PySide6 监听调度器已启动")
def stop(self):
"""停止监听调度器"""
if not self.is_running:
return
self.is_running = False
if self.listen_thread and self.listen_thread.is_alive():
self.listen_thread.join(timeout=3.0)
logging.info("PySide6 监听调度器已停止")
class PySide6CleanupScheduler:
"""PySide6 清理调度器 - 独立实现"""
def __init__(
self,
queue_operation: ShardedQueueOperation,
interval_minutes: int = 60,
remove_days: int = 30,
):
self.queue_operation = queue_operation
self.interval = interval_minutes * 60 # 转换为秒
self.remove_days = remove_days
self.is_running = False
self.cleanup_thread = None
# 立即执行一次清理
self._perform_cleanup()
def _cleanup_loop(self):
"""清理循环"""
while self.is_running:
try:
self._perform_cleanup()
except Exception as e:
logging.error(f"清理操作错误: {str(e)}")
# 休眠等待下次清理
for _ in range(self.interval):
if not self.is_running:
break
time.sleep(1)
def _perform_cleanup(self):
"""执行清理操作"""
try:
# 清理过期但未处理的消息
self.queue_operation.clean_expired_messages()
# 彻底删除指定天数前的消息
self.queue_operation.remove_expired_messages(self.remove_days)
except Exception as e:
logging.error(f"执行清理失败: {str(e)}")
def start(self):
"""启动清理调度器"""
if self.is_running:
return
self.is_running = True
self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
self.cleanup_thread.start()
logging.info("PySide6 清理调度器已启动")
def stop(self):
"""停止清理调度器"""
if not self.is_running:
return
self.is_running = False
if self.cleanup_thread and self.cleanup_thread.is_alive():
self.cleanup_thread.join(timeout=3.0)
logging.info("PySide6 清理调度器已停止")
class PySide6QueueScheduler(BaseScheduler):
"""完全独立的 PySide6 队列调度器 - 修复版"""
def __init__(
self,
receive_thread_num: int = 1,
task_thread_num: int = 4,
shard_num: int = 4,
queue_name: str = "pyside6_queue",
):
# 确保有 QApplication 实例(对于 GUI 应用)
self._ensure_qapplication()
# 初始化队列操作
self.queue_operation = ShardedQueueOperation(shard_num, queue_name)
# 初始化监听操作
db_dir = f"cache/{queue_name}"
os.makedirs(db_dir, exist_ok=True)
self.listen_operation = ListenOperation(f"{db_dir}/listen.db")
# 确保监听表被创建
self._ensure_listen_operation_tables()
# 初始化各个独立的调度器组件
self.receive_scheduler = PySide6ReceiveScheduler(
self.queue_operation, receive_thread_num
)
self.task_scheduler = PySide6TaskScheduler(
self.queue_operation, task_thread_num
)
self.listen_scheduler = PySide6ListenScheduler(self.listen_operation)
self.cleanup_scheduler = PySide6CleanupScheduler(self.queue_operation)
@staticmethod
def is_available() -> bool:
try:
import PySide6
# 检查是否在主线程GUI 调度器通常要求)
return True
except ImportError:
return False # 依赖库未安装时不可用
def _ensure_qapplication(self):
"""确保 QApplication 实例存在"""
try:
if not QApplication.instance():
# 对于非GUI应用创建无窗口的QApplication
import sys
self.app = QApplication(sys.argv if hasattr(sys, "argv") else [])
logging.info("已创建 QApplication 实例")
except Exception as e:
logging.warning(f"创建 QApplication 失败: {str(e)}")
def _ensure_listen_operation_tables(self):
"""确保监听操作的表被正确创建"""
try:
# 强制重新创建表
self.listen_operation.create_table()
# 额外检查 change_log 表
conn = self.listen_operation._get_connection()
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='change_log'"
)
if cursor.fetchone() is None:
logging.warning("change_log 表不存在,尝试手动创建")
conn.execute(
"""
CREATE TABLE change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT,
row_id INTEGER,
column_name TEXT,
old_value TEXT,
new_value TEXT,
is_delete integer DEFAULT 0,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
except Exception as e:
logging.error(f"确保监听表存在失败: {str(e)}")
def send_message(self, message: MessageItem, callback: Callable):
"""发送消息到队列"""
self.receive_scheduler.send_message(message, callback)
def update_listen_data(self, key: str, value: str):
"""更新监听数据"""
self.listen_operation.update_listen_data(key, value)
def get_listen_datas(self) -> List:
"""获取所有监听数据"""
return self.listen_operation.get_values()
def get_listen_data(self, key: str):
"""获取单个监听数据"""
return self.listen_operation.get_value(key)
def start(self):
"""启动所有调度器组件"""
self.receive_scheduler.start()
self.task_scheduler.start()
self.cleanup_scheduler.start()
self.listen_scheduler.start()
logging.info("PySide6 队列调度器已完全启动")
def stop(self):
"""停止所有调度器组件"""
self.listen_scheduler.stop()
self.cleanup_scheduler.stop()
self.task_scheduler.stop()
self.receive_scheduler.stop()
logging.info("PySide6 队列调度器已完全停止")
def get_queue_info(self) -> Dict[str, Any]:
"""获取队列信息"""
try:
return {
"queue_length": self.queue_operation.get_queue_length(),
"shard_num": self.queue_operation.shard_num,
"db_dir": self.queue_operation.db_dir,
"active_threads": QThreadPool.globalInstance().activeThreadCount(),
"max_threads": QThreadPool.globalInstance().maxThreadCount(),
}
except Exception as e:
logging.error(f"获取队列信息失败: {str(e)}")
return {}

View File

@ -0,0 +1,96 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : test_pyside6_scheduler.py
@Time : 2025-10-26 10:40:00
@Author : chakcy
@description : 独立 PySide6 调度器测试
"""
import time
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from queue_sqlite.mounter import task
@task(meta={"task_name": "pyside6_example"})
def pyside6_example_task(message_item: MessageItem):
"""PySide6 调度器测试任务"""
print(f"处理任务: {message_item.id}")
# 模拟一些工作
result = sum(i * i for i in range(1000))
message_item.result = {
"status": "completed",
"result": result,
"task_id": message_item.id,
}
return message_item
@task(meta={"task_name": "async_pyside6_task"})
async def async_pyside6_task(message_item: MessageItem):
"""异步任务测试"""
import asyncio
print(f"处理异步任务: {message_item.id}")
await asyncio.sleep(0.1) # 模拟异步操作
message_item.result = {"status": "async_completed", "task_id": message_item.id}
return message_item
def task_callback(message_item: MessageItem):
"""任务完成回调"""
print(f"任务完成回调: {message_item.id}, 结果: {message_item.result}")
def test_pyside6_scheduler():
"""测试独立的 PySide6 调度器"""
# 创建独立的 PySide6 调度器
scheduler = QueueScheduler(
receive_thread_num=2,
task_thread_num=6, # 使用 PySide6 线程池
shard_num=4,
scheduler_type="pyside6", # 指定使用 PySide6 调度器
)
print("启动 PySide6 调度器...")
scheduler.start()
try:
# 发送同步任务
for i in range(5):
message = MessageItem(
content={"task_index": i, "type": "sync"},
destination="pyside6_example_task",
)
scheduler.send_message(message, task_callback)
print(f"发送同步消息: {message.id}")
# 发送异步任务
for i in range(5):
message = MessageItem(
content={"task_index": i, "type": "async"},
destination="async_pyside6_task",
)
scheduler.send_message(message, task_callback)
print(f"发送异步消息: {message.id}")
# 等待任务处理
print("等待任务处理...")
for i in range(10):
queue_info = scheduler.scheduler.get_queue_info()
print(f"队列状态: {queue_info}")
time.sleep(1)
if queue_info.get("queue_length", 0) == 0:
break
finally:
print("停止 PySide6 调度器...")
scheduler.stop()
print("测试完成")
if __name__ == "__main__":
test_pyside6_scheduler()