modify:qt 调度器

This commit is contained in:
chakcy 2025-11-02 14:21:30 +08:00
parent 58f4ad84ee
commit ac67b60d14
19 changed files with 76 additions and 865 deletions

View File

@ -1 +1 @@
3.11 3.13t

View File

@ -22,8 +22,9 @@ requires-python = ">=3.11"
version = "0.2.0" version = "0.2.0"
[project.optional-dependencies] [project.optional-dependencies]
pyside6 = ["PySide6"] pyside6 = ["qtpy", "PySide6"]
pyqt6 = ["PyQt6"] pyqt6 = ["qtpy", "PyQt6"]
pyqt5 = ["qtpy", "PyQt5"]
[[tool.uv.index]] [[tool.uv.index]]
default = true default = true

View File

@ -13,27 +13,21 @@ from ._async import AsyncQueueScheduler
from .base import BaseScheduler from .base import BaseScheduler
from ..model import MessageItem, SchedulerConfig from ..model import MessageItem, SchedulerConfig
from typing import Callable from typing import Callable
import multiprocessing import logging
SCHEDULER_TYPES = {"standard": StandardQueueScheduler, "async": AsyncQueueScheduler} SCHEDULER_TYPES = {"standard": StandardQueueScheduler, "async": AsyncQueueScheduler}
try: try:
from .pyside6 import PySide6QueueScheduler from .qt import QtQueueScheduler
print(SCHEDULER_TYPES) print(SCHEDULER_TYPES)
if PySide6QueueScheduler.is_available(): if QtQueueScheduler.is_available():
SCHEDULER_TYPES["pyside6"] = PySide6QueueScheduler SCHEDULER_TYPES["qt"] = QtQueueScheduler
except ImportError: except ImportError:
pass logging.warning(
"Qt is not available, if you want to use it, please install PyQt5/PyQt6/PySide6"
try: )
from .pyqt6 import PyQt6QueueScheduler
if PyQt6QueueScheduler.is_available():
SCHEDULER_TYPES["pyqt6"] = PyQt6QueueScheduler
except ImportError:
pass
class QueueScheduler(BaseScheduler): class QueueScheduler(BaseScheduler):

View File

@ -1,157 +0,0 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : __init__.py
@Time : 2025-10-26 10:26:16
@Author : chakcy
@Email : 947105045@qq.com
@description : pyside6调度器 - 独立实现
"""
import logging
import os
from ..base import BaseScheduler
from ...model import MessageItem, SchedulerConfig
from queue_sqlite_core import ShardedQueueOperation
from ...queue_operation.listen_operation import ListenOperation
from .receive_scheduler import PySide6ReceiveScheduler
from .listen_scheduler import PySide6ListenScheduler
from .task_scheduler import PySide6TaskScheduler
from ..cleanup_scheduler import CleanupScheduler as PySide6CleanupScheduler
from PySide6.QtCore import QThreadPool
from PySide6.QtWidgets import QApplication
from typing import Callable, Dict, Any, List
class PySide6QueueScheduler(BaseScheduler):
"""完全独立的 PySide6 队列调度器 - 修复版"""
def __init__(
self,
config: SchedulerConfig = SchedulerConfig(),
):
# 确保有 QApplication 实例(对于 GUI 应用)
self._ensure_qapplication()
# 初始化队列操作
self.queue_operation = ShardedQueueOperation(
config.shard_num, config.queue_name
)
# 初始化监听操作
db_dir = f"cache/{config.queue_name}"
os.makedirs(db_dir, exist_ok=True)
self.listen_operation = ListenOperation(f"{db_dir}/listen.db")
# 确保监听表被创建
self._ensure_listen_operation_tables()
# 初始化各个独立的调度器组件
self.receive_scheduler = PySide6ReceiveScheduler(
self.queue_operation, config.receive_thread_num
)
self.task_scheduler = PySide6TaskScheduler(
self.queue_operation, config.task_thread_num
)
self.listen_scheduler = PySide6ListenScheduler(self.listen_operation)
self.cleanup_scheduler = PySide6CleanupScheduler(self.queue_operation)
@staticmethod
def is_available() -> bool:
try:
import PySide6
# 检查是否在主线程GUI 调度器通常要求)
return True
except ImportError:
return False # 依赖库未安装时不可用
def _ensure_qapplication(self):
"""确保 QApplication 实例存在"""
try:
if not QApplication.instance():
# 对于非GUI应用创建无窗口的QApplication
import sys
self.app = QApplication(sys.argv if hasattr(sys, "argv") else [])
logging.info("已创建 QApplication 实例")
except Exception as e:
logging.warning(f"创建 QApplication 失败: {str(e)}")
def _ensure_listen_operation_tables(self):
"""确保监听操作的表被正确创建"""
try:
# 强制重新创建表
self.listen_operation.create_table()
# 额外检查 change_log 表
conn = self.listen_operation._get_connection()
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='change_log'"
)
if cursor.fetchone() is None:
logging.warning("change_log 表不存在,尝试手动创建")
conn.execute(
"""
CREATE TABLE change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT,
row_id INTEGER,
column_name TEXT,
old_value TEXT,
new_value TEXT,
is_delete integer DEFAULT 0,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
except Exception as e:
logging.error(f"确保监听表存在失败: {str(e)}")
def send_message(self, message: MessageItem, callback: Callable):
"""发送消息到队列"""
self.receive_scheduler.send_message(message, callback)
def update_listen_data(self, key: str, value: str):
"""更新监听数据"""
self.listen_operation.update_listen_data(key, value)
def get_listen_datas(self) -> List:
"""获取所有监听数据"""
return self.listen_operation.get_values()
def get_listen_data(self, key: str):
"""获取单个监听数据"""
return self.listen_operation.get_value(key)
def start(self):
"""启动所有调度器组件"""
self.receive_scheduler.start()
self.task_scheduler.start()
self.cleanup_scheduler.start_cleanup()
self.listen_scheduler.start()
logging.info("PySide6 队列调度器已完全启动")
def stop(self):
"""停止所有调度器组件"""
self.listen_scheduler.stop()
self.cleanup_scheduler.stop_cleanup()
self.task_scheduler.stop()
self.receive_scheduler.stop()
logging.info("PySide6 队列调度器已完全停止")
def get_queue_info(self) -> Dict[str, Any]:
"""获取队列信息"""
try:
return {
"queue_length": self.queue_operation.get_queue_length(),
"shard_num": self.queue_operation.shard_num,
"db_dir": self.queue_operation.db_dir,
"active_threads": QThreadPool.globalInstance().activeThreadCount(),
"max_threads": QThreadPool.globalInstance().maxThreadCount(),
}
except Exception as e:
logging.error(f"获取队列信息失败: {str(e)}")
return {}

View File

@ -1,56 +0,0 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : callback_task.py
@Time : 2025-10-28 11:40:02
@Author : chakcy
@Email : 947105045@qq.com
@description : 回调任务
"""
import asyncio
import logging
from queue_sqlite_core import ShardedQueueOperation
from ...model import MessageItem
from typing import Callable
from PySide6.QtCore import QRunnable
class PySide6CallbackTask(QRunnable):
"""PySide6 回调任务"""
def __init__(
self,
callback: Callable,
message: MessageItem,
queue_operation: ShardedQueueOperation,
):
super().__init__()
self.callback = callback
self.message = message
self.queue_operation = queue_operation
self.setAutoDelete(True)
def run(self):
"""执行回调函数"""
try:
# 检查是否是协程函数
if asyncio.iscoroutinefunction(self.callback):
# 对于异步回调,在当前线程中运行事件循环
asyncio.run(self._run_async_callback())
else:
self.callback(self.message)
except Exception as e:
logging.error(f"回调执行错误: {str(e)}")
finally:
# 删除消息
try:
self.queue_operation.delete_message(self.message.id)
except Exception as e:
logging.error(f"删除消息失败 {self.message.id}: {str(e)}")
async def _run_async_callback(self):
"""运行异步回调"""
try:
await self.callback(self.message)
except Exception as e:
logging.error(f"异步回调执行错误: {str(e)}")

View File

@ -1,114 +0,0 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : listen_scheduler.py
@Time : 2025-10-28 14:22:38
@Author : chakcy
@Email : 947105045@qq.com
@description : 监听调度器
"""
import logging
from .listen_task import PySide6ListenTask
from ...queue_operation.listen_operation import ListenOperation
from PySide6.QtCore import QThreadPool
import threading
import time
class PySide6ListenScheduler:
"""PySide6 监听调度器 - 修复版"""
def __init__(self, listen_operation: ListenOperation):
self.listen_operation = listen_operation
self.is_running = False
self.listen_thread = None
self.thread_pool = QThreadPool.globalInstance()
# 确保监听表存在
self._ensure_listen_tables()
def _ensure_listen_tables(self):
"""确保监听相关的表存在"""
try:
# 检查表是否存在,如果不存在则创建
conn = self.listen_operation._get_connection()
# 检查 change_log 表是否存在
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='change_log'"
)
if cursor.fetchone() is None:
logging.info("创建缺失的 change_log 表")
conn.execute(
"""
CREATE TABLE change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT,
row_id INTEGER,
column_name TEXT,
old_value TEXT,
new_value TEXT,
is_delete integer DEFAULT 0,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
except Exception as e:
logging.error(f"确保监听表存在失败: {str(e)}")
def _listen_loop(self):
"""监听数据变化循环"""
while self.is_running:
try:
status, change_data_items = self.listen_operation.listen_data()
if status:
for data in change_data_items:
try:
# 修正字段索引
if len(data) >= 8:
delete_id = data[0] # id
table_name = data[1] # table_name
row_id = data[2] # row_id
column_name = data[3] # column_name
old_value = data[4] # old_value
new_value = data[5] # new_value
# 使用线程池执行监听任务
task = PySide6ListenTask(
column_name,
new_value,
int(delete_id),
self.listen_operation,
)
self.thread_pool.start(task)
except Exception as e:
logging.error(f"处理监听数据失败: {str(e)}")
else:
time.sleep(0.05)
except Exception as e:
logging.error(f"监听循环错误: {str(e)}")
time.sleep(0.1)
def start(self):
"""启动监听调度器"""
if self.is_running:
return
self.is_running = True
self.listen_thread = threading.Thread(target=self._listen_loop, daemon=True)
self.listen_thread.start()
logging.info("PySide6 监听调度器已启动")
def stop(self):
"""停止监听调度器"""
if not self.is_running:
return
self.is_running = False
if self.listen_thread and self.listen_thread.is_alive():
self.listen_thread.join(timeout=3.0)
logging.info("PySide6 监听调度器已停止")

View File

@ -1,57 +0,0 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : listen_task.py
@Time : 2025-10-28 14:30:41
@Author : chakcy
@Email : 947105045@qq.com
@description : 监听任务
"""
from queue_sqlite.queue_operation.listen_operation import ListenOperation
from ...mounter.listen_mounter import ListenMounter
import logging
from PySide6.QtCore import QRunnable
import asyncio
from typing import Callable
class PySide6ListenTask(QRunnable):
"""PySide6 监听任务 - 修复版"""
def __init__(
self, key: str, value: str, delete_id: int, listen_operation: ListenOperation
):
super().__init__()
self.key = key
self.value = value
self.delete_id = delete_id
self.listen_operation = listen_operation
self.setAutoDelete(True)
def run(self):
"""执行监听回调"""
try:
listen_function = ListenMounter.get_Listener_function(self.key)
if listen_function:
if asyncio.iscoroutinefunction(listen_function):
# 异步监听函数
asyncio.run(self._run_async_listener(listen_function))
else:
# 同步监听函数
listen_function(self.value)
except Exception as e:
logging.error(f"监听函数执行错误 {self.key}: {str(e)}")
finally:
# 删除变更日志
try:
self.listen_operation.delete_change_log(self.delete_id)
except Exception as e:
logging.error(f"删除变更日志失败 {self.delete_id}: {str(e)}")
async def _run_async_listener(self, listen_function: Callable):
"""执行异步监听函数"""
try:
await listen_function(self.value)
except Exception as e:
logging.error(f"异步监听函数执行错误 {self.key}: {str(e)}")

View File

@ -1,99 +0,0 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : receive_scheduler.py
@Time : 2025-10-28 14:31:06
@Author : chakcy
@Email : 947105045@qq.com
@description : 接收调度器
"""
from typing import Callable, Optional
from queue_sqlite_core import ShardedQueueOperation
from PySide6.QtCore import QThreadPool
import threading
from ...model import MessageItem
import time
import logging
from .callback_task import PySide6CallbackTask
class PySide6ReceiveScheduler:
"""PySide6 接收调度器 - 独立实现"""
def __init__(
self, queue_operation: ShardedQueueOperation, receive_thread_num: int = 1
):
self.callbacks = {}
self.is_running = False
self.lock = threading.Lock()
self.queue_operation = queue_operation
self.receive_thread = None
self.thread_pool = QThreadPool.globalInstance()
def send_message(self, message: MessageItem, callback: Optional[Callable] = None):
"""发送消息到队列"""
if callback is None:
callback = lambda message: logging.info(f"收到消息: {message.id}")
# 入队消息
self.queue_operation.enqueue(message.to_dict_by_core())
with self.lock:
self.callbacks[message.id] = callback
def _receive_loop(self):
"""接收消息循环"""
while self.is_running:
try:
# 获取已完成的消息
message_list = self.queue_operation.get_completed_messages()
if message_list:
for message_data in message_list:
try:
message = MessageItem.from_dict(message_data)
with self.lock:
callback = self.callbacks.pop(message.id, None)
if callback is None:
callback = lambda msg: None
# 使用线程池执行回调
task = PySide6CallbackTask(
callback, message, self.queue_operation
)
self.thread_pool.start(task)
except Exception as e:
logging.error(f"处理完成消息失败: {str(e)}")
# 即使处理失败也尝试删除消息
try:
self.queue_operation.delete_message(message_data["id"])
except:
pass
else:
time.sleep(0.05) # 短暂休眠避免CPU空转
except Exception as e:
logging.error(f"接收消息循环错误: {str(e)}")
time.sleep(0.1) # 出错时稍长休眠
def start(self):
"""启动接收调度器"""
if self.is_running:
return
self.is_running = True
self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True)
self.receive_thread.start()
logging.info("PySide6 接收调度器已启动")
def stop(self):
"""停止接收调度器"""
if not self.is_running:
return
self.is_running = False
if self.receive_thread and self.receive_thread.is_alive():
self.receive_thread.join(timeout=3.0)
logging.info("PySide6 接收调度器已停止")

View File

@ -1,148 +0,0 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : task_executor.py
@Time : 2025-10-28 14:31:25
@Author : chakcy
@Email : 947105045@qq.com
@description : 任务执行器
"""
import asyncio
import logging
from typing import Any, Callable, Dict
from datetime import datetime
from ...constant import MessageStatus
from ...mounter.task_mounter import TaskMounter
from queue_sqlite_core import ShardedQueueOperation
from ...model.message_item import MessageItem
from PySide6.QtCore import QRunnable
class PySide6TaskExecutor(QRunnable):
"""PySide6 任务执行器 - 修复版"""
def __init__(
self, message_data: Dict[str, Any], queue_operation: ShardedQueueOperation
):
super().__init__()
self.message_data = message_data
self.queue_operation = queue_operation
self.setAutoDelete(True)
def run(self):
"""执行任务"""
try:
message = MessageItem.from_dict(self.message_data)
if message.destination == "client":
self._process_client_message(message)
return
task_function = TaskMounter.get_task_function(message.destination)
if task_function is None:
raise ValueError(f"任务函数未找到: {message.destination}")
# 执行任务
if asyncio.iscoroutinefunction(task_function):
# 异步任务 - 在当前线程运行事件循环
asyncio.run(self._execute_async_task(message, task_function))
else:
# 同步任务
self._execute_sync_task(message, task_function)
except Exception as e:
logging.error(f"任务执行错误: {str(e)}")
# 更新任务状态为失败
try:
self.queue_operation.update_status(
self.message_data["id"], MessageStatus.FAILED.value
)
self.queue_operation.update_result(
self.message_data["id"], f'{{"error": "{str(e)}"}}'
)
except Exception as update_error:
logging.error(f"更新任务状态失败: {str(update_error)}")
def _process_client_message(self, message: MessageItem):
"""处理客户端消息"""
message.status = MessageStatus.COMPLETED
message.result = {"result": "success"}
message.updatetime = datetime.now()
self._update_task_result(message)
def _execute_sync_task(self, message: MessageItem, task_function: Callable):
"""执行同步任务"""
from ...cycle.task_cycle import TaskCycle
try:
task_cycle = TaskCycle(message, task_function)
task_cycle.run()
status = task_cycle.get_task_status()
if not status:
raise ValueError("任务未完成")
message.status = status
if message.status == MessageStatus.FAILED:
message.result = {"error": task_cycle.get_task_error()}
else:
# 获取序列化后的结果
result_str = task_cycle.get_task_result()
try:
import json
message.result = json.loads(result_str)
except:
message.result = {"result": result_str}
self._update_task_result(message)
except Exception as e:
logging.error(f"同步任务执行失败 {message.id}: {str(e)}")
message.status = MessageStatus.FAILED
message.result = {"error": str(e)}
self._update_task_result(message)
async def _execute_async_task(self, message: MessageItem, task_function: Callable):
"""执行异步任务"""
from ...cycle.async_task_cycle import AsyncTaskCycle
try:
task_cycle = AsyncTaskCycle(message, task_function)
await task_cycle.run()
status = task_cycle.get_task_status()
if not status:
raise ValueError("任务未完成")
message.status = status
if message.status == MessageStatus.FAILED:
message.result = {"error": task_cycle.get_task_error()}
else:
# 获取序列化后的结果
result_str = task_cycle.get_task_result()
try:
import json
message.result = json.loads(result_str)
except:
message.result = {"result": result_str}
self._update_task_result(message)
except Exception as e:
logging.error(f"异步任务执行失败 {message.id}: {str(e)}")
message.status = MessageStatus.FAILED
message.result = {"error": str(e)}
self._update_task_result(message)
def _update_task_result(self, message: MessageItem):
"""更新任务结果到数据库"""
import json
try:
# 序列化结果
result_str = json.dumps(message.result) if message.result else "{}"
self.queue_operation.update_result(message.id, result_str)
self.queue_operation.update_status(message.id, message.status.value)
except Exception as e:
logging.error(f"更新任务结果失败 {message.id}: {str(e)}")

View File

@ -1,69 +0,0 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : task_scheduler.py
@Time : 2025-10-28 14:19:39
@Author : chakcy
@Email : 947105045@qq.com
@description : PySide6 任务调度器
"""
import logging
from PySide6.QtCore import QThreadPool
from .task_executor import PySide6TaskExecutor
from queue_sqlite_core import ShardedQueueOperation
import time
import threading
class PySide6TaskScheduler:
"""PySide6 任务调度器 - 独立实现"""
def __init__(
self, queue_operation: ShardedQueueOperation, task_thread_num: int = 4
):
self.is_running = False
self.queue_operation = queue_operation
self.task_thread = None
self.thread_pool = QThreadPool.globalInstance()
# 设置最大线程数
self.thread_pool.setMaxThreadCount(max(1, task_thread_num))
def _task_loop(self):
"""任务处理循环"""
while self.is_running:
try:
# 出队消息进行处理
message_list = self.queue_operation.dequeue(
size=self.thread_pool.maxThreadCount() * 2
)
if message_list:
for message_data in message_list:
# 使用线程池执行任务
task = PySide6TaskExecutor(message_data, self.queue_operation)
self.thread_pool.start(task)
else:
time.sleep(0.05) # 短暂休眠
except Exception as e:
logging.error(f"任务调度循环错误: {str(e)}")
time.sleep(0.1)
def start(self):
"""启动任务调度器"""
if self.is_running:
return
self.is_running = True
self.task_thread = threading.Thread(target=self._task_loop, daemon=True)
self.task_thread.start()
logging.info("PySide6 任务调度器已启动")
def stop(self):
"""停止任务调度器"""
if not self.is_running:
return
self.is_running = False
if self.task_thread and self.task_thread.is_alive():
self.task_thread.join(timeout=3.0)
logging.info("PySide6 任务调度器已停止")

View File

@ -5,7 +5,7 @@
@Time : 2025-10-26 10:26:16 @Time : 2025-10-26 10:26:16
@Author : chakcy @Author : chakcy
@Email : 947105045@qq.com @Email : 947105045@qq.com
@description : pyqt6调度器 - 独立实现 @description : Qt调度器 - 独立实现
""" """
import logging import logging
@ -14,17 +14,19 @@ from ..base import BaseScheduler
from ...model import MessageItem, SchedulerConfig from ...model import MessageItem, SchedulerConfig
from queue_sqlite_core import ShardedQueueOperation from queue_sqlite_core import ShardedQueueOperation
from ...queue_operation.listen_operation import ListenOperation from ...queue_operation.listen_operation import ListenOperation
from .receive_scheduler import PyQt6ReceiveScheduler from .receive_scheduler import QtReceiveScheduler
from .listen_scheduler import PyQt6ListenScheduler from .listen_scheduler import QtListenScheduler
from .task_scheduler import PyQt6TaskScheduler from .task_scheduler import QtTaskScheduler
from ..cleanup_scheduler import CleanupScheduler as PyQt6CleanupScheduler from ..cleanup_scheduler import CleanupScheduler as QtCleanupScheduler
from PyQt6.QtCore import QThreadPool from qtpy.QtCore import QThreadPool
from PyQt6.QtWidgets import QApplication from qtpy.QtWidgets import QApplication
from typing import Callable, Dict, Any, List from typing import Callable, Dict, Any, List
class PyQt6QueueScheduler(BaseScheduler): class QtQueueScheduler(BaseScheduler):
"""完全独立的 PyQt6 队列调度器 - 修复版""" """完全独立的 Qt 队列调度器 - 修复版"""
qt_type = None
def __init__( def __init__(
self, self,
@ -47,24 +49,31 @@ class PyQt6QueueScheduler(BaseScheduler):
self._ensure_listen_operation_tables() self._ensure_listen_operation_tables()
# 初始化各个独立的调度器组件 # 初始化各个独立的调度器组件
self.receive_scheduler = PyQt6ReceiveScheduler( self.receive_scheduler = QtReceiveScheduler(
self.queue_operation, config.receive_thread_num self.queue_operation, config.receive_thread_num
) )
self.task_scheduler = PyQt6TaskScheduler( self.task_scheduler = QtTaskScheduler(
self.queue_operation, config.task_thread_num self.queue_operation, config.task_thread_num
) )
self.listen_scheduler = PyQt6ListenScheduler(self.listen_operation) self.listen_scheduler = QtListenScheduler(self.listen_operation)
self.cleanup_scheduler = PyQt6CleanupScheduler(self.queue_operation) self.cleanup_scheduler = QtCleanupScheduler(self.queue_operation)
@staticmethod @classmethod
def is_available() -> bool: def is_available(cls) -> bool:
# 检查 PySide6/PyQt6/PyQt5/PySide2 模块中的一个
import importlib
qt_modules = ["PySide6", "PyQt6", "PyQt5", "PySide2"]
for module in qt_modules:
try: try:
import PyQt6 importlib.import_module(module)
cls.qt_type = module
# 检查是否在主线程GUI 调度器通常要求) logging.info(f"已找到 Qt 模块: {module}")
print(f"已找到 Qt 模块: {module}")
return True return True
except ImportError: except ImportError: # 模块未找到
return False # 依赖库未安装时不可用 continue
return False
def _ensure_qapplication(self): def _ensure_qapplication(self):
"""确保 QApplication 实例存在""" """确保 QApplication 实例存在"""
@ -132,7 +141,7 @@ class PyQt6QueueScheduler(BaseScheduler):
self.task_scheduler.start() self.task_scheduler.start()
self.cleanup_scheduler.start_cleanup() self.cleanup_scheduler.start_cleanup()
self.listen_scheduler.start() self.listen_scheduler.start()
logging.info("PyQt6 队列调度器已完全启动") logging.info("Qt 队列调度器已完全启动")
def stop(self): def stop(self):
"""停止所有调度器组件""" """停止所有调度器组件"""
@ -140,7 +149,7 @@ class PyQt6QueueScheduler(BaseScheduler):
self.cleanup_scheduler.stop_cleanup() self.cleanup_scheduler.stop_cleanup()
self.task_scheduler.stop() self.task_scheduler.stop()
self.receive_scheduler.stop() self.receive_scheduler.stop()
logging.info("PyQt6 队列调度器已完全停止") logging.info("Qt 队列调度器已完全停止")
def get_queue_info(self) -> Dict[str, Any]: def get_queue_info(self) -> Dict[str, Any]:
"""获取队列信息""" """获取队列信息"""

View File

@ -12,11 +12,11 @@ import logging
from queue_sqlite_core import ShardedQueueOperation from queue_sqlite_core import ShardedQueueOperation
from ...model import MessageItem from ...model import MessageItem
from typing import Callable from typing import Callable
from PyQt6.QtCore import QRunnable from qtpy.QtCore import QRunnable
class PyQt6CallbackTask(QRunnable): class QtCallbackTask(QRunnable):
"""PyQt6 回调任务""" """Qt 回调任务"""
def __init__( def __init__(
self, self,

View File

@ -8,15 +8,15 @@
@description : 监听调度器 @description : 监听调度器
""" """
import logging import logging
from .listen_task import PyQt6ListenTask from .listen_task import QtListenTask
from ...queue_operation.listen_operation import ListenOperation from ...queue_operation.listen_operation import ListenOperation
from PyQt6.QtCore import QThreadPool from qtpy.QtCore import QThreadPool
import threading import threading
import time import time
class PyQt6ListenScheduler: class QtListenScheduler:
"""PyQt6 监听调度器 - 修复版""" """Qt 监听调度器 - 修复版"""
def __init__(self, listen_operation: ListenOperation): def __init__(self, listen_operation: ListenOperation):
self.listen_operation = listen_operation self.listen_operation = listen_operation
@ -76,7 +76,7 @@ class PyQt6ListenScheduler:
new_value = data[5] # new_value new_value = data[5] # new_value
# 使用线程池执行监听任务 # 使用线程池执行监听任务
task = PyQt6ListenTask( task = QtListenTask(
column_name, column_name,
new_value, new_value,
int(delete_id), int(delete_id),
@ -101,7 +101,7 @@ class PyQt6ListenScheduler:
self.is_running = True self.is_running = True
self.listen_thread = threading.Thread(target=self._listen_loop, daemon=True) self.listen_thread = threading.Thread(target=self._listen_loop, daemon=True)
self.listen_thread.start() self.listen_thread.start()
logging.info("PyQt6 监听调度器已启动") logging.info("Qt 监听调度器已启动")
def stop(self): def stop(self):
"""停止监听调度器""" """停止监听调度器"""
@ -111,4 +111,4 @@ class PyQt6ListenScheduler:
self.is_running = False self.is_running = False
if self.listen_thread and self.listen_thread.is_alive(): if self.listen_thread and self.listen_thread.is_alive():
self.listen_thread.join(timeout=3.0) self.listen_thread.join(timeout=3.0)
logging.info("PyQt6 监听调度器已停止") logging.info("Qt 监听调度器已停止")

View File

@ -11,13 +11,13 @@
from queue_sqlite.queue_operation.listen_operation import ListenOperation from queue_sqlite.queue_operation.listen_operation import ListenOperation
from ...mounter.listen_mounter import ListenMounter from ...mounter.listen_mounter import ListenMounter
import logging import logging
from PyQt6.QtCore import QRunnable from qtpy.QtCore import QRunnable
import asyncio import asyncio
from typing import Callable from typing import Callable
class PyQt6ListenTask(QRunnable): class QtListenTask(QRunnable):
"""PyQt6 监听任务 - 修复版""" """Qt 监听任务 - 修复版"""
def __init__( def __init__(
self, key: str, value: str, delete_id: int, listen_operation: ListenOperation self, key: str, value: str, delete_id: int, listen_operation: ListenOperation

View File

@ -11,16 +11,16 @@
from typing import Callable, Optional from typing import Callable, Optional
from queue_sqlite_core import ShardedQueueOperation from queue_sqlite_core import ShardedQueueOperation
from PyQt6.QtCore import QThreadPool from qtpy.QtCore import QThreadPool
import threading import threading
from ...model import MessageItem from ...model import MessageItem
import time import time
import logging import logging
from .callback_task import PyQt6CallbackTask from .callback_task import QtCallbackTask
class PyQt6ReceiveScheduler: class QtReceiveScheduler:
"""PyQt6 接收调度器 - 独立实现""" """Qt 接收调度器 - 独立实现"""
def __init__( def __init__(
self, queue_operation: ShardedQueueOperation, receive_thread_num: int = 1 self, queue_operation: ShardedQueueOperation, receive_thread_num: int = 1
@ -59,7 +59,7 @@ class PyQt6ReceiveScheduler:
callback = lambda msg: None callback = lambda msg: None
# 使用线程池执行回调 # 使用线程池执行回调
task = PyQt6CallbackTask( task = QtCallbackTask(
callback, message, self.queue_operation callback, message, self.queue_operation
) )
self.thread_pool.start(task) # type: ignore self.thread_pool.start(task) # type: ignore
@ -86,7 +86,7 @@ class PyQt6ReceiveScheduler:
self.is_running = True self.is_running = True
self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True) self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True)
self.receive_thread.start() self.receive_thread.start()
logging.info("PySide6 接收调度器已启动") logging.info("Qt 接收调度器已启动")
def stop(self): def stop(self):
"""停止接收调度器""" """停止接收调度器"""
@ -96,4 +96,4 @@ class PyQt6ReceiveScheduler:
self.is_running = False self.is_running = False
if self.receive_thread and self.receive_thread.is_alive(): if self.receive_thread and self.receive_thread.is_alive():
self.receive_thread.join(timeout=3.0) self.receive_thread.join(timeout=3.0)
logging.info("PySide6 接收调度器已停止") logging.info("Qt 接收调度器已停止")

View File

@ -17,11 +17,11 @@ from ...constant import MessageStatus
from ...mounter.task_mounter import TaskMounter from ...mounter.task_mounter import TaskMounter
from queue_sqlite_core import ShardedQueueOperation from queue_sqlite_core import ShardedQueueOperation
from ...model.message_item import MessageItem from ...model.message_item import MessageItem
from PyQt6.QtCore import QRunnable from qtpy.QtCore import QRunnable
class PyQt6TaskExecutor(QRunnable): class QtTaskExecutor(QRunnable):
"""PyQt6 任务执行器 - 修复版""" """Qt 任务执行器 - 修复版"""
def __init__( def __init__(
self, message_data: Dict[str, Any], queue_operation: ShardedQueueOperation self, message_data: Dict[str, Any], queue_operation: ShardedQueueOperation

View File

@ -5,18 +5,18 @@
@Time : 2025-10-28 14:19:39 @Time : 2025-10-28 14:19:39
@Author : chakcy @Author : chakcy
@Email : 947105045@qq.com @Email : 947105045@qq.com
@description : PyQt6 任务调度器 @description : Qt 任务调度器
""" """
import logging import logging
from PyQt6.QtCore import QThreadPool from qtpy.QtCore import QThreadPool
from .task_executor import PyQt6TaskExecutor from .task_executor import QtTaskExecutor
from queue_sqlite_core import ShardedQueueOperation from queue_sqlite_core import ShardedQueueOperation
import time import time
import threading import threading
class PyQt6TaskScheduler: class QtTaskScheduler:
"""PyQt6 任务调度器 - 独立实现""" """Qt 任务调度器 - 独立实现"""
def __init__( def __init__(
self, queue_operation: ShardedQueueOperation, task_thread_num: int = 4 self, queue_operation: ShardedQueueOperation, task_thread_num: int = 4
@ -39,7 +39,7 @@ class PyQt6TaskScheduler:
if message_list: if message_list:
for message_data in message_list: for message_data in message_list:
# 使用线程池执行任务 # 使用线程池执行任务
task = PyQt6TaskExecutor(message_data, self.queue_operation) task = QtTaskExecutor(message_data, self.queue_operation)
self.thread_pool.start(task) # type: ignore self.thread_pool.start(task) # type: ignore
else: else:
time.sleep(0.05) # 短暂休眠 time.sleep(0.05) # 短暂休眠
@ -56,7 +56,7 @@ class PyQt6TaskScheduler:
self.is_running = True self.is_running = True
self.task_thread = threading.Thread(target=self._task_loop, daemon=True) self.task_thread = threading.Thread(target=self._task_loop, daemon=True)
self.task_thread.start() self.task_thread.start()
logging.info("PySide6 任务调度器已启动") logging.info("Qt 任务调度器已启动")
def stop(self): def stop(self):
"""停止任务调度器""" """停止任务调度器"""
@ -66,4 +66,4 @@ class PyQt6TaskScheduler:
self.is_running = False self.is_running = False
if self.task_thread and self.task_thread.is_alive(): if self.task_thread and self.task_thread.is_alive():
self.task_thread.join(timeout=3.0) self.task_thread.join(timeout=3.0)
logging.info("PySide6 任务调度器已停止") logging.info("Qt 任务调度器已停止")

View File

@ -1,93 +0,0 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : test_pyqt6_scheduler.py
@Time : 2025-10-26 10:40:00
@Author : chakcy
@description : 独立 pyqt6 调度器测试
"""
import time
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from queue_sqlite.mounter import task
@task(meta={"task_name": "pyqt6_example"})
def pyqt6_example_task(message_item: MessageItem):
"""pyqt6 调度器测试任务"""
print(f"处理任务: {message_item.id}")
# 模拟一些工作
result = sum(i * i for i in range(1000))
message_item.result = {
"status": "completed",
"result": result,
"task_id": message_item.id,
}
return message_item
@task(meta={"task_name": "async_pyqt6_task"})
async def async_pyqt6_task(message_item: MessageItem):
"""异步任务测试"""
import asyncio
print(f"处理异步任务: {message_item.id}")
await asyncio.sleep(0.1) # 模拟异步操作
message_item.result = {"status": "async_completed", "task_id": message_item.id}
return message_item
def task_callback(message_item: MessageItem):
"""任务完成回调"""
print(f"任务完成回调: {message_item.id}, 结果: {message_item.result}")
def test_pyqt6_scheduler():
"""测试独立的 pyqt6 调度器"""
# 创建独立的 pyqt6 调度器
scheduler = QueueScheduler(
scheduler_type="pyqt6", # 指定使用 pyqt6 调度器
)
print("启动 pyqt6 调度器...")
scheduler.start()
try:
# 发送同步任务
for i in range(5):
message = MessageItem(
content={"task_index": i, "type": "sync"},
destination="pyqt6_example_task",
)
scheduler.send_message(message, task_callback)
print(f"发送同步消息: {message.id}")
# 发送异步任务
for i in range(5):
message = MessageItem(
content={"task_index": i, "type": "async"},
destination="async_pyqt6_task",
)
scheduler.send_message(message, task_callback)
print(f"发送异步消息: {message.id}")
# 等待任务处理
print("等待任务处理...")
for i in range(10):
queue_info = scheduler.scheduler.get_queue_info()
print(f"队列状态: {queue_info}")
time.sleep(1)
if queue_info.get("queue_length", 0) == 0:
break
finally:
print("停止 pyqt6 调度器...")
scheduler.stop()
print("测试完成")
if __name__ == "__main__":
test_pyqt6_scheduler()

View File

@ -48,10 +48,10 @@ def test_pyside6_scheduler():
# 创建独立的 PySide6 调度器 # 创建独立的 PySide6 调度器
scheduler = QueueScheduler( scheduler = QueueScheduler(
scheduler_type="pyside6", # 指定使用 PySide6 调度器 scheduler_type="qt", # 指定使用 PySide6 调度器
) )
print("启动 PySide6 调度器...") print("启动 qt 调度器...")
scheduler.start() scheduler.start()
try: try: