modify: 优化监听数据功能
This commit is contained in:
parent
3f03290e4f
commit
82dc57eae1
@ -1,5 +1,6 @@
|
||||
from ..mounter.listen_mounter import ListenMounter
|
||||
import sqlite3
|
||||
from typing import List, Tuple, Union
|
||||
|
||||
|
||||
class ListenOperation:
|
||||
@ -15,14 +16,32 @@ class ListenOperation:
|
||||
CREATE TABLE IF NOT EXISTS listen_table (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
key Text,
|
||||
value Text
|
||||
value JSON
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS change_log (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
table_name TEXT,
|
||||
row_id INTEGER,
|
||||
column_name TEXT,
|
||||
old_value TEXT,
|
||||
new_value TEXT,
|
||||
is_delete integer DEFAULT 0,
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
CREATE TRIGGER IF NOT EXISTS track_value_change
|
||||
AFTER UPDATE OF value ON listen_table -- 监听特定列
|
||||
FOR EACH ROW
|
||||
WHEN OLD.value <> NEW.value -- 仅当值实际变化时触发
|
||||
BEGIN
|
||||
INSERT INTO change_log (table_name, row_id, column_name, old_value, new_value)
|
||||
VALUES ('listen_table', NEW.id, 'key', OLD.key, NEW.key);
|
||||
END;
|
||||
"""
|
||||
self.conn = sqlite3.connect(
|
||||
self.db_dir,
|
||||
check_same_thread = False,
|
||||
)
|
||||
self.conn.execute(sql)
|
||||
self.conn.executescript(sql)
|
||||
self.conn.execute("PRAGMA journal_mode=WAL;")
|
||||
self.conn.execute("PRAGMA synchronous=NORMAL;")
|
||||
self.conn.execute("PRAGMA cache_size=-20000;")
|
||||
@ -32,11 +51,11 @@ class ListenOperation:
|
||||
self.conn.commit()
|
||||
|
||||
# 删除原有数据
|
||||
sql = f"""
|
||||
DELETE FROM listen_table
|
||||
"""
|
||||
self.conn.execute(sql)
|
||||
self.conn.commit()
|
||||
# sql = f"""
|
||||
# DELETE FROM listen_table
|
||||
# """
|
||||
# self.conn.execute(sql)
|
||||
# self.conn.commit()
|
||||
|
||||
for listen_field in self.listen_fields:
|
||||
sql = f"""
|
||||
@ -48,18 +67,21 @@ class ListenOperation:
|
||||
self.conn.execute(sql, (listen_field, "null"))
|
||||
self.conn.commit()
|
||||
|
||||
def listen_data(self, key, value) -> tuple[bool, str]:
|
||||
def listen_data(self) -> Tuple[bool, Union[List[Tuple], str]]:
|
||||
sql = f"""
|
||||
Select value from listen_table where key = '{key}'
|
||||
SELECT * FROM change_log where is_delete = 0 ORDER BY id DESC LIMIT 100
|
||||
"""
|
||||
cursor = self.conn.execute(sql)
|
||||
result = cursor.fetchone()
|
||||
if result is None:
|
||||
return False, "key not found"
|
||||
if result[0] == value:
|
||||
return False, "value not changed"
|
||||
else:
|
||||
return True, result[0]
|
||||
result = self.conn.execute(sql).fetchall()
|
||||
if len(result) == 0:
|
||||
return False, "No data found"
|
||||
return True, result
|
||||
|
||||
|
||||
def delete_change_log(self, delete_id):
|
||||
sql = f"""
|
||||
DELETE FROM change_log WHERE id = {delete_id}
|
||||
"""
|
||||
self.conn.execute(sql)
|
||||
|
||||
def update_listen_data(self, key, value):
|
||||
sql = f"""
|
||||
@ -68,3 +90,18 @@ class ListenOperation:
|
||||
self.conn.execute(sql)
|
||||
self.conn.commit()
|
||||
|
||||
def get_value(self, key):
|
||||
sql = f"""
|
||||
SELECT value FROM listen_table WHERE key = '{key}'
|
||||
"""
|
||||
result = self.conn.execute(sql).fetchone()
|
||||
if result is None:
|
||||
return None
|
||||
return result[0]
|
||||
|
||||
def get_values(self):
|
||||
sql = f"""
|
||||
SELECT key, value FROM listen_table
|
||||
"""
|
||||
result = self.conn.execute(sql).fetchall()
|
||||
return result
|
||||
|
||||
@ -25,6 +25,12 @@ class QueueScheduler:
|
||||
def update_listen_data(self, key, value):
|
||||
self.listen_operation.update_listen_data(key, value)
|
||||
|
||||
def get_listen_datas(self):
|
||||
return self.listen_operation.get_values()
|
||||
|
||||
def get_listen_data(self, key):
|
||||
return self.listen_operation.get_value(key)
|
||||
|
||||
def start_queue_scheduler(self):
|
||||
self.receive_scheduler.start_receive_thread()
|
||||
self.task_scheduler.start_task_thread()
|
||||
|
||||
@ -10,23 +10,26 @@ class ListenDataScheduler:
|
||||
self.is_running = False
|
||||
self.executor = ThreadPoolExecutor(max_workers=1)
|
||||
self.listen_thread = None
|
||||
self.listen_data = {field: "null" for field in ListenMounter.get_Listener_list()}
|
||||
|
||||
def _process_listen_data(self, key, value):
|
||||
def _process_listen_data(self, key, value, delete_id):
|
||||
listen_function = ListenMounter.get_Listener_function(key)
|
||||
if listen_function:
|
||||
try:
|
||||
listen_function(value)
|
||||
except Exception as e:
|
||||
ValueError(f"Error in {key} listener function: {e}")
|
||||
finally:
|
||||
self.listen_operation.delete_change_log(delete_id=delete_id)
|
||||
|
||||
def listen(self):
|
||||
while self.is_running:
|
||||
for key, value in self.listen_data.items():
|
||||
status, new_value = self.listen_operation.listen_data(key, value)
|
||||
if status:
|
||||
self.listen_data[key] = new_value
|
||||
self.executor.submit(self._process_listen_data, key, new_value)
|
||||
status, change_data_items = self.listen_operation.listen_data()
|
||||
if status:
|
||||
for data in change_data_items:
|
||||
key = data[6]
|
||||
new_value = data[7]
|
||||
delete_id = data[0]
|
||||
self.executor.submit(self._process_listen_data, key, new_value, delete_id)
|
||||
|
||||
def start_listen_data(self):
|
||||
if self.is_running:
|
||||
|
||||
@ -16,4 +16,5 @@ class TestListen:
|
||||
scheduler.update_listen_data("key_1", "value_4")
|
||||
time.sleep(0.001)
|
||||
scheduler.stop_queue_scheduler()
|
||||
print(scheduler.listen_scheduler.listen_data)
|
||||
print(scheduler.get_listen_datas())
|
||||
print(scheduler.get_listen_data("key_1"))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user