diff --git a/.python-version b/.python-version index 24ee5b1..e4fba21 100644 --- a/.python-version +++ b/.python-version @@ -1 +1 @@ -3.13 +3.12 diff --git a/src/queue_sqlite/scheduler/__init__.py b/src/queue_sqlite/scheduler/__init__.py index b288a92..2f54e3a 100644 --- a/src/queue_sqlite/scheduler/__init__.py +++ b/src/queue_sqlite/scheduler/__init__.py @@ -41,13 +41,14 @@ class QueueScheduler(BaseScheduler): scheduler_type: SchedulerType = "standard", config: SchedulerConfig = SchedulerConfig(), ): + super().__init__(config) scheduler_class = SCHEDULER_TYPES.get(scheduler_type, None) if scheduler_class is None: raise ValueError(f"Invalid scheduler type: {scheduler_type}") self.scheduler = scheduler_class(config) - # if self.scheduler: - # self.queue_operation = self.scheduler.queue_operation + if hasattr(self.scheduler, "queue_operation"): + self.queue_operation = self.scheduler.queue_operation def send_message(self, message: MessageItem, callback: Callable): if not self.scheduler: diff --git a/src/queue_sqlite/scheduler/_async/async_listen_data_scheduler.py b/src/queue_sqlite/scheduler/_async/async_listen_data_scheduler.py index a758a46..a424c40 100644 --- a/src/queue_sqlite/scheduler/_async/async_listen_data_scheduler.py +++ b/src/queue_sqlite/scheduler/_async/async_listen_data_scheduler.py @@ -24,6 +24,8 @@ class AsyncListenDataScheduler: self.is_running = False self.thread_num = multiprocessing.cpu_count() self.listen_thread = None + self.process_lock = threading.Lock() + self.last_processed_id = 0 async def _process_listen_data(self, key, value, delete_id): listen_function = ListenMounter.get_Listener_function(key) @@ -37,7 +39,7 @@ class AsyncListenDataScheduler: max_workers=self.thread_num ) as executor: await loop.run_in_executor(executor, listen_function, value) - listen_function(value) + # listen_function(value) except Exception as e: ValueError(f"Error in {key} listener function: {e}") finally: @@ -48,17 +50,27 @@ class AsyncListenDataScheduler: while self.is_running: status, change_data_items = self.listen_operation.listen_data() tasks = [] - if status: - for data in change_data_items: + if status and isinstance(change_data_items, list): + new_change_data_items = [ + data + for data in change_data_items + if data[0] > self.last_processed_id + ] + # 按ID升序排序,确保按顺序处理 + new_change_data_items.sort(key=lambda x: x[0]) + for data in new_change_data_items: key = data[3] new_value = data[5] delete_id = data[0] + with self.process_lock: + if delete_id > self.last_processed_id: + self.last_processed_id = delete_id tasks.append( asyncio.create_task( self._process_listen_data(key, new_value, delete_id) ) ) - if tasks: + if tasks and self.is_running: await asyncio.gather(*tasks, return_exceptions=True) else: @@ -79,6 +91,7 @@ class AsyncListenDataScheduler: if self.is_running: return self.is_running = True + self.last_processed_id = 0 self.listen_thread = threading.Thread(target=self._run_listen_loop, daemon=True) self.listen_thread.start() diff --git a/src/queue_sqlite/scheduler/base/base_scheduler.py b/src/queue_sqlite/scheduler/base/base_scheduler.py index 1bdf57f..df81e84 100644 --- a/src/queue_sqlite/scheduler/base/base_scheduler.py +++ b/src/queue_sqlite/scheduler/base/base_scheduler.py @@ -2,16 +2,29 @@ from abc import ABC, abstractmethod from typing import Callable from ...model import MessageItem, SchedulerConfig from queue_sqlite_core import ShardedQueueOperation +import logging class BaseScheduler(ABC): """调度器抽象类""" - @property - def queue_operation(self) -> ShardedQueueOperation: ... - def __init__(self, config: SchedulerConfig = SchedulerConfig()): - pass + self._queue_operation = None + + @property + def queue_operation(self) -> ShardedQueueOperation: + if self._queue_operation is None: + raise RuntimeError("请先设置队列操作对象") + return self._queue_operation + + @queue_operation.setter + def queue_operation(self, value: ShardedQueueOperation | None): + """设置队列操作对象 + + Args: + value (ShardedQueueOperation): 队列操作对象 + """ + self._queue_operation = value @abstractmethod def send_message(self, message: MessageItem, callback: Callable): @@ -47,3 +60,14 @@ class BaseScheduler(ABC): def get_listen_data(self, key): """获取单个监听数据""" pass + + def get_queue_info(self) -> dict: + try: + return { + "queue_length": self.queue_operation.get_queue_length(), + "shard_num": self.queue_operation.shard_num, + "db_dir": self.queue_operation.db_dir, + } + except Exception as e: + logging.error(f"获取队列消息失败: {str(e)}") + return {} diff --git a/src/queue_sqlite/scheduler/cleanup_scheduler.py b/src/queue_sqlite/scheduler/cleanup_scheduler.py index 925b47c..de48203 100644 --- a/src/queue_sqlite/scheduler/cleanup_scheduler.py +++ b/src/queue_sqlite/scheduler/cleanup_scheduler.py @@ -65,7 +65,7 @@ class CleanupScheduler: if remove_days > 1: remove_days = remove_days - 1 - self.queue_operation.clean_old_messages(remove_days) + self.queue_operation.remove_expired_messages(remove_days) except Exception as e: logging.error(f"数据库优化错误: {str(e)}") diff --git a/src/queue_sqlite/scheduler/qt/listen_scheduler.py b/src/queue_sqlite/scheduler/qt/listen_scheduler.py index 28e286e..fed85ff 100644 --- a/src/queue_sqlite/scheduler/qt/listen_scheduler.py +++ b/src/queue_sqlite/scheduler/qt/listen_scheduler.py @@ -58,11 +58,21 @@ class QtListenScheduler: old_value TEXT, new_value TEXT, is_delete integer DEFAULT 0, - timestamp DATETIME DEFAULT CURRENT_TIMESTAMP + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + is_processed integer DEFAULT 0 ) """ ) conn.commit() + else: + # 如果表存在,检查是否已有 is_processed 列 + cursor = conn.execute("PRAGMA table_info(change_log)") + columns = [row[1] for row in cursor.fetchall()] + if "is_processed" not in columns: + conn.execute( + "ALTER TABLE change_log ADD COLUMN is_processed INTEGER DEFAULT 0" + ) + conn.commit() except Exception as e: logging.error(f"确保监听表存在失败: {str(e)}") @@ -73,8 +83,13 @@ class QtListenScheduler: try: # 使用互斥锁保护数据库操作 with QMutexLocker(self.db_mutex): - status, change_data_items = self.listen_operation.listen_data() - return status, change_data_items + conn = self.listen_operation._get_connection() + cursor = conn.execute( + "SELECT id, table_name, row_id, column_name, old_value, new_value, is_delete, timestamp " + "FROM change_log WHERE is_processed = 0 ORDER BY id ASC" + ) + change_data_items = cursor.fetchall() + return True, change_data_items except sqlite3.OperationalError as e: if "database is locked" in str(e) and attempt < self.max_retries - 1: logging.warning( @@ -90,6 +105,28 @@ class QtListenScheduler: return False, [] + def _mark_processed(self, log_id): + """标记指定ID的日志为已处理""" + try: + with QMutexLocker(self.db_mutex): + conn = self.listen_operation._get_connection() + # 添加 is_processed 字段来标记是否已经处理 + # 首先检查表结构是否包含该字段,如果没有则添加 + cursor = conn.execute("PRAGMA table_info(change_log)") + columns = [row[1] for row in cursor.fetchall()] + if "is_processed" not in columns: + conn.execute( + "ALTER TABLE change_log ADD COLUMN is_processed INTEGER DEFAULT 0" + ) + + # 更新指定ID的记录为已处理 + conn.execute( + "UPDATE change_log SET is_processed = 1 WHERE id = ?", (log_id,) + ) + conn.commit() + except Exception as e: + logging.error(f"标记已处理记录失败: {str(e)}") + def _process_listen_data(self, change_data_items): """处理监听数据""" processed_count = 0 @@ -115,6 +152,7 @@ class QtListenScheduler: self.retry_delay, ) self.thread_pool.start(task) # type: ignore + self._mark_processed(int(delete_id)) processed_count += 1 except Exception as e: diff --git a/src/queue_sqlite/scheduler/standard/listen_data_scheduler.py b/src/queue_sqlite/scheduler/standard/listen_data_scheduler.py index e15bc69..79b22a8 100644 --- a/src/queue_sqlite/scheduler/standard/listen_data_scheduler.py +++ b/src/queue_sqlite/scheduler/standard/listen_data_scheduler.py @@ -14,6 +14,8 @@ from concurrent.futures import ThreadPoolExecutor from ...mounter.listen_mounter import ListenMounter import threading import multiprocessing +import time +import logging class ListenDataScheduler: @@ -22,6 +24,10 @@ class ListenDataScheduler: self.is_running = False self.executor = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) self.listen_thread = None + # 使用一个线程锁来确保监听逻辑的原子性 + self.process_lock = threading.Lock() + # 记录最后处理的ID,避免重复处理 + self.last_processed_id = 0 def _process_listen_data(self, key, value, delete_id): listen_function = ListenMounter.get_Listener_function(key) @@ -31,25 +37,51 @@ class ListenDataScheduler: except Exception as e: ValueError(f"Error in {key} listener function: {e}") finally: - self.listen_operation.delete_change_log(delete_id=delete_id) + # 确保变更日志被删除 + try: + self.listen_operation.delete_change_log(delete_id=delete_id) + except Exception as e: + # 记录错误但不抛出异常 + logging.error(f"Error in deleting change log: {e}") def listen(self): while self.is_running: + # 获取比上次处理ID更大的变更记录,避免重复处理 status, change_data_items = self.listen_operation.listen_data() - if status: - for data in change_data_items: + if status and isinstance(change_data_items, list): + # 过滤出ID大于上次处理ID的记录 + new_change_data_items = [ + data + for data in change_data_items + if data[0] > self.last_processed_id + ] + # 按ID升序排序,确保按顺序处理 + new_change_data_items.sort(key=lambda x: x[0]) + + for data in new_change_data_items: key = data[3] new_value = data[5] delete_id = data[0] + if self.is_running: + # 提交任务处理 self.executor.submit( self._process_listen_data, key, new_value, delete_id ) + # 更新最后处理ID + with self.process_lock: + if delete_id > self.last_processed_id: + self.last_processed_id = delete_id + + # 添加小延迟,避免过度占用CPU + time.sleep(0.001) + def start_listen_data(self): if self.is_running: return self.is_running = True + self.last_processed_id = 0 # 重置最后处理ID self.listen_thread = threading.Thread(target=self.listen) self.listen_thread.start() diff --git a/src/queue_sqlite_core/src/queue_operation.rs b/src/queue_sqlite_core/src/queue_operation.rs index 3f8dd16..c4c2e99 100644 --- a/src/queue_sqlite_core/src/queue_operation.rs +++ b/src/queue_sqlite_core/src/queue_operation.rs @@ -677,6 +677,8 @@ impl QueueOperation { ) .map_err(|e| PyErr::new::(e.to_string()))?; + conn.execute_batch("VACUUM") + .map_err(|e| PyErr::new::(e.to_string()))?; Ok(()) } } diff --git a/tests/test_listen.py b/tests/test_listen.py index 9428318..2645fd9 100644 --- a/tests/test_listen.py +++ b/tests/test_listen.py @@ -5,7 +5,7 @@ import time class TestListen: def test_listen_data(self): - scheduler = QueueScheduler() + scheduler = QueueScheduler(scheduler_type="qt") scheduler.start() scheduler.update_listen_data("key_1", "value_1") time.sleep(0.001) @@ -16,5 +16,8 @@ class TestListen: scheduler.update_listen_data("key_1", "value_4") time.sleep(0.001) scheduler.stop() - print(scheduler.get_listen_datas()) - print(scheduler.get_listen_data("key_1")) + # print(scheduler.get_listen_datas()) + # print(scheduler.get_listen_data("key_1")) + + +# TestListen().test_listen_data() diff --git a/uv.lock b/uv.lock index 646d16f..59871ec 100644 --- a/uv.lock +++ b/uv.lock @@ -648,7 +648,7 @@ wheels = [ [[package]] name = "queue-sqlite" -version = "0.2.1" +version = "0.2.2" source = { editable = "." } dependencies = [ { name = "queue-sqlite-core" },