modify: listen_scheduler 更新

This commit is contained in:
chakcy 2026-01-10 18:14:20 +08:00
parent 14b3ea707f
commit 793c842bbf
10 changed files with 135 additions and 22 deletions

View File

@ -1 +1 @@
3.13 3.12

View File

@ -41,13 +41,14 @@ class QueueScheduler(BaseScheduler):
scheduler_type: SchedulerType = "standard", scheduler_type: SchedulerType = "standard",
config: SchedulerConfig = SchedulerConfig(), config: SchedulerConfig = SchedulerConfig(),
): ):
super().__init__(config)
scheduler_class = SCHEDULER_TYPES.get(scheduler_type, None) scheduler_class = SCHEDULER_TYPES.get(scheduler_type, None)
if scheduler_class is None: if scheduler_class is None:
raise ValueError(f"Invalid scheduler type: {scheduler_type}") raise ValueError(f"Invalid scheduler type: {scheduler_type}")
self.scheduler = scheduler_class(config) self.scheduler = scheduler_class(config)
# if self.scheduler: if hasattr(self.scheduler, "queue_operation"):
# self.queue_operation = self.scheduler.queue_operation self.queue_operation = self.scheduler.queue_operation
def send_message(self, message: MessageItem, callback: Callable): def send_message(self, message: MessageItem, callback: Callable):
if not self.scheduler: if not self.scheduler:

View File

@ -24,6 +24,8 @@ class AsyncListenDataScheduler:
self.is_running = False self.is_running = False
self.thread_num = multiprocessing.cpu_count() self.thread_num = multiprocessing.cpu_count()
self.listen_thread = None self.listen_thread = None
self.process_lock = threading.Lock()
self.last_processed_id = 0
async def _process_listen_data(self, key, value, delete_id): async def _process_listen_data(self, key, value, delete_id):
listen_function = ListenMounter.get_Listener_function(key) listen_function = ListenMounter.get_Listener_function(key)
@ -37,7 +39,7 @@ class AsyncListenDataScheduler:
max_workers=self.thread_num max_workers=self.thread_num
) as executor: ) as executor:
await loop.run_in_executor(executor, listen_function, value) await loop.run_in_executor(executor, listen_function, value)
listen_function(value) # listen_function(value)
except Exception as e: except Exception as e:
ValueError(f"Error in {key} listener function: {e}") ValueError(f"Error in {key} listener function: {e}")
finally: finally:
@ -48,17 +50,27 @@ class AsyncListenDataScheduler:
while self.is_running: while self.is_running:
status, change_data_items = self.listen_operation.listen_data() status, change_data_items = self.listen_operation.listen_data()
tasks = [] tasks = []
if status: if status and isinstance(change_data_items, list):
for data in change_data_items: new_change_data_items = [
data
for data in change_data_items
if data[0] > self.last_processed_id
]
# 按ID升序排序确保按顺序处理
new_change_data_items.sort(key=lambda x: x[0])
for data in new_change_data_items:
key = data[3] key = data[3]
new_value = data[5] new_value = data[5]
delete_id = data[0] delete_id = data[0]
with self.process_lock:
if delete_id > self.last_processed_id:
self.last_processed_id = delete_id
tasks.append( tasks.append(
asyncio.create_task( asyncio.create_task(
self._process_listen_data(key, new_value, delete_id) self._process_listen_data(key, new_value, delete_id)
) )
) )
if tasks: if tasks and self.is_running:
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)
else: else:
@ -79,6 +91,7 @@ class AsyncListenDataScheduler:
if self.is_running: if self.is_running:
return return
self.is_running = True self.is_running = True
self.last_processed_id = 0
self.listen_thread = threading.Thread(target=self._run_listen_loop, daemon=True) self.listen_thread = threading.Thread(target=self._run_listen_loop, daemon=True)
self.listen_thread.start() self.listen_thread.start()

View File

@ -2,16 +2,29 @@ from abc import ABC, abstractmethod
from typing import Callable from typing import Callable
from ...model import MessageItem, SchedulerConfig from ...model import MessageItem, SchedulerConfig
from queue_sqlite_core import ShardedQueueOperation from queue_sqlite_core import ShardedQueueOperation
import logging
class BaseScheduler(ABC): class BaseScheduler(ABC):
"""调度器抽象类""" """调度器抽象类"""
@property
def queue_operation(self) -> ShardedQueueOperation: ...
def __init__(self, config: SchedulerConfig = SchedulerConfig()): def __init__(self, config: SchedulerConfig = SchedulerConfig()):
pass self._queue_operation = None
@property
def queue_operation(self) -> ShardedQueueOperation:
if self._queue_operation is None:
raise RuntimeError("请先设置队列操作对象")
return self._queue_operation
@queue_operation.setter
def queue_operation(self, value: ShardedQueueOperation | None):
"""设置队列操作对象
Args:
value (ShardedQueueOperation): 队列操作对象
"""
self._queue_operation = value
@abstractmethod @abstractmethod
def send_message(self, message: MessageItem, callback: Callable): def send_message(self, message: MessageItem, callback: Callable):
@ -47,3 +60,14 @@ class BaseScheduler(ABC):
def get_listen_data(self, key): def get_listen_data(self, key):
"""获取单个监听数据""" """获取单个监听数据"""
pass pass
def get_queue_info(self) -> dict:
try:
return {
"queue_length": self.queue_operation.get_queue_length(),
"shard_num": self.queue_operation.shard_num,
"db_dir": self.queue_operation.db_dir,
}
except Exception as e:
logging.error(f"获取队列消息失败: {str(e)}")
return {}

View File

@ -65,7 +65,7 @@ class CleanupScheduler:
if remove_days > 1: if remove_days > 1:
remove_days = remove_days - 1 remove_days = remove_days - 1
self.queue_operation.clean_old_messages(remove_days) self.queue_operation.remove_expired_messages(remove_days)
except Exception as e: except Exception as e:
logging.error(f"数据库优化错误: {str(e)}") logging.error(f"数据库优化错误: {str(e)}")

View File

@ -58,11 +58,21 @@ class QtListenScheduler:
old_value TEXT, old_value TEXT,
new_value TEXT, new_value TEXT,
is_delete integer DEFAULT 0, is_delete integer DEFAULT 0,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
is_processed integer DEFAULT 0
) )
""" """
) )
conn.commit() conn.commit()
else:
# 如果表存在,检查是否已有 is_processed 列
cursor = conn.execute("PRAGMA table_info(change_log)")
columns = [row[1] for row in cursor.fetchall()]
if "is_processed" not in columns:
conn.execute(
"ALTER TABLE change_log ADD COLUMN is_processed INTEGER DEFAULT 0"
)
conn.commit()
except Exception as e: except Exception as e:
logging.error(f"确保监听表存在失败: {str(e)}") logging.error(f"确保监听表存在失败: {str(e)}")
@ -73,8 +83,13 @@ class QtListenScheduler:
try: try:
# 使用互斥锁保护数据库操作 # 使用互斥锁保护数据库操作
with QMutexLocker(self.db_mutex): with QMutexLocker(self.db_mutex):
status, change_data_items = self.listen_operation.listen_data() conn = self.listen_operation._get_connection()
return status, change_data_items cursor = conn.execute(
"SELECT id, table_name, row_id, column_name, old_value, new_value, is_delete, timestamp "
"FROM change_log WHERE is_processed = 0 ORDER BY id ASC"
)
change_data_items = cursor.fetchall()
return True, change_data_items
except sqlite3.OperationalError as e: except sqlite3.OperationalError as e:
if "database is locked" in str(e) and attempt < self.max_retries - 1: if "database is locked" in str(e) and attempt < self.max_retries - 1:
logging.warning( logging.warning(
@ -90,6 +105,28 @@ class QtListenScheduler:
return False, [] return False, []
def _mark_processed(self, log_id):
"""标记指定ID的日志为已处理"""
try:
with QMutexLocker(self.db_mutex):
conn = self.listen_operation._get_connection()
# 添加 is_processed 字段来标记是否已经处理
# 首先检查表结构是否包含该字段,如果没有则添加
cursor = conn.execute("PRAGMA table_info(change_log)")
columns = [row[1] for row in cursor.fetchall()]
if "is_processed" not in columns:
conn.execute(
"ALTER TABLE change_log ADD COLUMN is_processed INTEGER DEFAULT 0"
)
# 更新指定ID的记录为已处理
conn.execute(
"UPDATE change_log SET is_processed = 1 WHERE id = ?", (log_id,)
)
conn.commit()
except Exception as e:
logging.error(f"标记已处理记录失败: {str(e)}")
def _process_listen_data(self, change_data_items): def _process_listen_data(self, change_data_items):
"""处理监听数据""" """处理监听数据"""
processed_count = 0 processed_count = 0
@ -115,6 +152,7 @@ class QtListenScheduler:
self.retry_delay, self.retry_delay,
) )
self.thread_pool.start(task) # type: ignore self.thread_pool.start(task) # type: ignore
self._mark_processed(int(delete_id))
processed_count += 1 processed_count += 1
except Exception as e: except Exception as e:

View File

@ -14,6 +14,8 @@ from concurrent.futures import ThreadPoolExecutor
from ...mounter.listen_mounter import ListenMounter from ...mounter.listen_mounter import ListenMounter
import threading import threading
import multiprocessing import multiprocessing
import time
import logging
class ListenDataScheduler: class ListenDataScheduler:
@ -22,6 +24,10 @@ class ListenDataScheduler:
self.is_running = False self.is_running = False
self.executor = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) self.executor = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())
self.listen_thread = None self.listen_thread = None
# 使用一个线程锁来确保监听逻辑的原子性
self.process_lock = threading.Lock()
# 记录最后处理的ID避免重复处理
self.last_processed_id = 0
def _process_listen_data(self, key, value, delete_id): def _process_listen_data(self, key, value, delete_id):
listen_function = ListenMounter.get_Listener_function(key) listen_function = ListenMounter.get_Listener_function(key)
@ -31,25 +37,51 @@ class ListenDataScheduler:
except Exception as e: except Exception as e:
ValueError(f"Error in {key} listener function: {e}") ValueError(f"Error in {key} listener function: {e}")
finally: finally:
self.listen_operation.delete_change_log(delete_id=delete_id) # 确保变更日志被删除
try:
self.listen_operation.delete_change_log(delete_id=delete_id)
except Exception as e:
# 记录错误但不抛出异常
logging.error(f"Error in deleting change log: {e}")
def listen(self): def listen(self):
while self.is_running: while self.is_running:
# 获取比上次处理ID更大的变更记录避免重复处理
status, change_data_items = self.listen_operation.listen_data() status, change_data_items = self.listen_operation.listen_data()
if status: if status and isinstance(change_data_items, list):
for data in change_data_items: # 过滤出ID大于上次处理ID的记录
new_change_data_items = [
data
for data in change_data_items
if data[0] > self.last_processed_id
]
# 按ID升序排序确保按顺序处理
new_change_data_items.sort(key=lambda x: x[0])
for data in new_change_data_items:
key = data[3] key = data[3]
new_value = data[5] new_value = data[5]
delete_id = data[0] delete_id = data[0]
if self.is_running: if self.is_running:
# 提交任务处理
self.executor.submit( self.executor.submit(
self._process_listen_data, key, new_value, delete_id self._process_listen_data, key, new_value, delete_id
) )
# 更新最后处理ID
with self.process_lock:
if delete_id > self.last_processed_id:
self.last_processed_id = delete_id
# 添加小延迟避免过度占用CPU
time.sleep(0.001)
def start_listen_data(self): def start_listen_data(self):
if self.is_running: if self.is_running:
return return
self.is_running = True self.is_running = True
self.last_processed_id = 0 # 重置最后处理ID
self.listen_thread = threading.Thread(target=self.listen) self.listen_thread = threading.Thread(target=self.listen)
self.listen_thread.start() self.listen_thread.start()

View File

@ -677,6 +677,8 @@ impl QueueOperation {
) )
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?; .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
conn.execute_batch("VACUUM")
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok(()) Ok(())
} }
} }

View File

@ -5,7 +5,7 @@ import time
class TestListen: class TestListen:
def test_listen_data(self): def test_listen_data(self):
scheduler = QueueScheduler() scheduler = QueueScheduler(scheduler_type="qt")
scheduler.start() scheduler.start()
scheduler.update_listen_data("key_1", "value_1") scheduler.update_listen_data("key_1", "value_1")
time.sleep(0.001) time.sleep(0.001)
@ -16,5 +16,8 @@ class TestListen:
scheduler.update_listen_data("key_1", "value_4") scheduler.update_listen_data("key_1", "value_4")
time.sleep(0.001) time.sleep(0.001)
scheduler.stop() scheduler.stop()
print(scheduler.get_listen_datas()) # print(scheduler.get_listen_datas())
print(scheduler.get_listen_data("key_1")) # print(scheduler.get_listen_data("key_1"))
# TestListen().test_listen_data()

2
uv.lock generated
View File

@ -648,7 +648,7 @@ wheels = [
[[package]] [[package]]
name = "queue-sqlite" name = "queue-sqlite"
version = "0.2.1" version = "0.2.2"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "queue-sqlite-core" }, { name = "queue-sqlite-core" },