From 82dc57eae191d9fe83ca890ea6af4ce6f4c8f0df Mon Sep 17 00:00:00 2001 From: chakcy <947105045@qq.com> Date: Mon, 18 Aug 2025 20:57:31 +0800 Subject: [PATCH] =?UTF-8?q?modify:=20=E4=BC=98=E5=8C=96=E7=9B=91=E5=90=AC?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../queue_operation/listen_operation.py | 71 ++++++++++++++----- src/queue_sqlite/scheduler/__init__.py | 6 ++ .../scheduler/listen_data_scheduler.py | 17 +++-- tests/test_listen.py | 3 +- 4 files changed, 72 insertions(+), 25 deletions(-) diff --git a/src/queue_sqlite/queue_operation/listen_operation.py b/src/queue_sqlite/queue_operation/listen_operation.py index 984278c..7004dab 100644 --- a/src/queue_sqlite/queue_operation/listen_operation.py +++ b/src/queue_sqlite/queue_operation/listen_operation.py @@ -1,5 +1,6 @@ from ..mounter.listen_mounter import ListenMounter import sqlite3 +from typing import List, Tuple, Union class ListenOperation: @@ -15,14 +16,32 @@ class ListenOperation: CREATE TABLE IF NOT EXISTS listen_table ( id INTEGER PRIMARY KEY AUTOINCREMENT, key Text, - value Text + value JSON ); + CREATE TABLE IF NOT EXISTS change_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + table_name TEXT, + row_id INTEGER, + column_name TEXT, + old_value TEXT, + new_value TEXT, + is_delete integer DEFAULT 0, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP + ); + CREATE TRIGGER IF NOT EXISTS track_value_change + AFTER UPDATE OF value ON listen_table -- 监听特定列 + FOR EACH ROW + WHEN OLD.value <> NEW.value -- 仅当值实际变化时触发 + BEGIN + INSERT INTO change_log (table_name, row_id, column_name, old_value, new_value) + VALUES ('listen_table', NEW.id, 'key', OLD.key, NEW.key); + END; """ self.conn = sqlite3.connect( self.db_dir, check_same_thread = False, ) - self.conn.execute(sql) + self.conn.executescript(sql) self.conn.execute("PRAGMA journal_mode=WAL;") self.conn.execute("PRAGMA synchronous=NORMAL;") self.conn.execute("PRAGMA cache_size=-20000;") @@ -32,11 +51,11 @@ class ListenOperation: self.conn.commit() # 删除原有数据 - sql = f""" - DELETE FROM listen_table - """ - self.conn.execute(sql) - self.conn.commit() + # sql = f""" + # DELETE FROM listen_table + # """ + # self.conn.execute(sql) + # self.conn.commit() for listen_field in self.listen_fields: sql = f""" @@ -48,18 +67,21 @@ class ListenOperation: self.conn.execute(sql, (listen_field, "null")) self.conn.commit() - def listen_data(self, key, value) -> tuple[bool, str]: + def listen_data(self) -> Tuple[bool, Union[List[Tuple], str]]: sql = f""" - Select value from listen_table where key = '{key}' + SELECT * FROM change_log where is_delete = 0 ORDER BY id DESC LIMIT 100 """ - cursor = self.conn.execute(sql) - result = cursor.fetchone() - if result is None: - return False, "key not found" - if result[0] == value: - return False, "value not changed" - else: - return True, result[0] + result = self.conn.execute(sql).fetchall() + if len(result) == 0: + return False, "No data found" + return True, result + + + def delete_change_log(self, delete_id): + sql = f""" + DELETE FROM change_log WHERE id = {delete_id} + """ + self.conn.execute(sql) def update_listen_data(self, key, value): sql = f""" @@ -68,3 +90,18 @@ class ListenOperation: self.conn.execute(sql) self.conn.commit() + def get_value(self, key): + sql = f""" + SELECT value FROM listen_table WHERE key = '{key}' + """ + result = self.conn.execute(sql).fetchone() + if result is None: + return None + return result[0] + + def get_values(self): + sql = f""" + SELECT key, value FROM listen_table + """ + result = self.conn.execute(sql).fetchall() + return result diff --git a/src/queue_sqlite/scheduler/__init__.py b/src/queue_sqlite/scheduler/__init__.py index 71f369d..13bf376 100644 --- a/src/queue_sqlite/scheduler/__init__.py +++ b/src/queue_sqlite/scheduler/__init__.py @@ -25,6 +25,12 @@ class QueueScheduler: def update_listen_data(self, key, value): self.listen_operation.update_listen_data(key, value) + def get_listen_datas(self): + return self.listen_operation.get_values() + + def get_listen_data(self, key): + return self.listen_operation.get_value(key) + def start_queue_scheduler(self): self.receive_scheduler.start_receive_thread() self.task_scheduler.start_task_thread() diff --git a/src/queue_sqlite/scheduler/listen_data_scheduler.py b/src/queue_sqlite/scheduler/listen_data_scheduler.py index b10ce8f..173a965 100644 --- a/src/queue_sqlite/scheduler/listen_data_scheduler.py +++ b/src/queue_sqlite/scheduler/listen_data_scheduler.py @@ -10,23 +10,26 @@ class ListenDataScheduler: self.is_running = False self.executor = ThreadPoolExecutor(max_workers=1) self.listen_thread = None - self.listen_data = {field: "null" for field in ListenMounter.get_Listener_list()} - def _process_listen_data(self, key, value): + def _process_listen_data(self, key, value, delete_id): listen_function = ListenMounter.get_Listener_function(key) if listen_function: try: listen_function(value) except Exception as e: ValueError(f"Error in {key} listener function: {e}") + finally: + self.listen_operation.delete_change_log(delete_id=delete_id) def listen(self): while self.is_running: - for key, value in self.listen_data.items(): - status, new_value = self.listen_operation.listen_data(key, value) - if status: - self.listen_data[key] = new_value - self.executor.submit(self._process_listen_data, key, new_value) + status, change_data_items = self.listen_operation.listen_data() + if status: + for data in change_data_items: + key = data[6] + new_value = data[7] + delete_id = data[0] + self.executor.submit(self._process_listen_data, key, new_value, delete_id) def start_listen_data(self): if self.is_running: diff --git a/tests/test_listen.py b/tests/test_listen.py index f31f20a..915c30f 100644 --- a/tests/test_listen.py +++ b/tests/test_listen.py @@ -16,4 +16,5 @@ class TestListen: scheduler.update_listen_data("key_1", "value_4") time.sleep(0.001) scheduler.stop_queue_scheduler() - print(scheduler.listen_scheduler.listen_data) + print(scheduler.get_listen_datas()) + print(scheduler.get_listen_data("key_1"))