From 80cde8aff37e7e14669f85c23d30503bbe60a473 Mon Sep 17 00:00:00 2001 From: chakcy <947105045@qq.com> Date: Sat, 13 Sep 2025 19:02:40 +0800 Subject: [PATCH] =?UTF-8?q?modify:=20=E8=A7=A3=E5=86=B3=20message=5Fitem?= =?UTF-8?q?=20result=20=E5=AD=97=E6=AE=B5=E9=87=8D=E5=A4=8D=E5=BA=8F?= =?UTF-8?q?=E5=88=97=E5=8C=96=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../queue_operation/queue_operation.py | 19 ++++---- src/queue_sqlite/scheduler/task_scheduler.py | 30 ++++++------ tests/test_message_item.py | 47 ++++++++++++++++++- 3 files changed, 70 insertions(+), 26 deletions(-) diff --git a/src/queue_sqlite/queue_operation/queue_operation.py b/src/queue_sqlite/queue_operation/queue_operation.py index 1604384..19a070d 100644 --- a/src/queue_sqlite/queue_operation/queue_operation.py +++ b/src/queue_sqlite/queue_operation/queue_operation.py @@ -8,11 +8,11 @@ from ..core import core from ..constant import MessageStatus -class QueueOperation(): +class QueueOperation: def __init__(self, shard_num: int = 4, queue_name: str = "default"): self.shard_num = shard_num self.db_dir = os.path.join("cache", queue_name) - self.shard_connections = threading.local() + self.shard_connections = threading.local() if not os.path.exists(self.db_dir): os.makedirs(self.db_dir) self.init_shards() @@ -62,7 +62,7 @@ class QueueOperation(): for shard_index in shard_order: if collected >= size: break - + conn = self._get_shard_conn(shard_index) shard_messages = conn.dequeue(size - collected) messages.extend(shard_messages) @@ -73,7 +73,7 @@ class QueueOperation(): # 获取队列长度 def get_queue_length(self) -> int: """获取队列中待处理消息的数量 - + Returns: int: 待处理消息数量 """ @@ -82,7 +82,7 @@ class QueueOperation(): conn = self._get_shard_conn(i) total += conn.get_queue_length() return total - + # 获取完成/失败的消息 def get_completed_messages(self) -> List[dict]: messages = [] @@ -96,16 +96,16 @@ class QueueOperation(): def get_result(self, message_id: str) -> Tuple[bool, Union[str, dict]]: conn = self._get_shard_conn(self._get_shard_index(message_id)) return conn.get_result(message_id) - + # 更新消息状态 def update_status(self, message_id: str, status: MessageStatus): conn = self._get_shard_conn(self._get_shard_index(message_id)) conn.update_status(message_id, status) - + # 更新消息结果 - def update_result(self, message_id: str, result: dict): + def update_result(self, message_id: str, result: str): conn = self._get_shard_conn(self._get_shard_index(message_id)) - conn.update_result(message_id, json.dumps(result)) + conn.update_result(message_id, result) # 删除消息 def delete_message(self, message_id: str): @@ -121,4 +121,3 @@ class QueueOperation(): def clean_expired_messages(self, shard_index: int): conn = self._get_shard_conn(shard_index) conn.clean_expired_messages() - \ No newline at end of file diff --git a/src/queue_sqlite/scheduler/task_scheduler.py b/src/queue_sqlite/scheduler/task_scheduler.py index 03d7fc4..12e1da9 100644 --- a/src/queue_sqlite/scheduler/task_scheduler.py +++ b/src/queue_sqlite/scheduler/task_scheduler.py @@ -2,7 +2,7 @@ from ..queue_operation import QueueOperation from ..model import MessageItem from ..constant import MessageStatus from ..cycle.task_cycle import TaskCycle -from ..mounter.task_mounter import TaskMounter +from ..mounter.task_mounter import TaskMounter from concurrent.futures import ThreadPoolExecutor from datetime import datetime import time @@ -25,22 +25,22 @@ class TaskScheduler: message.result = {"result": "success"} message.updatetime = datetime.now() return message - + task_function = TaskMounter.get_task_function(message.destination) task_cycle = TaskCycle(message, task_function) task_cycle.run() - - message.status = task_cycle.get_task_status() # type: ignore + + message.status = task_cycle.get_task_status() # type: ignore if message.status == MessageStatus.FAILED: message.result = {"error": task_cycle.get_task_error()} else: - message.result = task_cycle.get_task_result() # type: ignore + message.result = task_cycle.get_task_result() # type: ignore message.updatetime = datetime.now() except Exception as e: message.status = MessageStatus.FAILED message.result = {"error": str(e)} message.updatetime = datetime.now() - + return message def _update_result(self, message): @@ -55,24 +55,26 @@ class TaskScheduler: """单一轮询线程,并行执行任务""" while self.is_running: try: - message_list = self.queue_operation.dequeue(size=self.task_thread_num * 2) + message_list = self.queue_operation.dequeue( + size=self.task_thread_num * 2 + ) if message_list: # 并行处理所有获取到的消息 for message in message_list: self.executor.submit( - lambda m: self._update_result(self._process_message(m)), - MessageItem.from_dict(message) + lambda m: self._update_result(self._process_message(m)), + MessageItem.from_dict(message), ) else: time.sleep(0.1) # 适当休眠 except Exception as e: print(f"任务调度错误: {str(e)}") time.sleep(1) - + def start_task_thread(self): if self.is_running: return - + self.is_running = True # 创建单一轮询线程 self.task_thread = threading.Thread(target=self.task_callback, daemon=True) @@ -81,11 +83,11 @@ class TaskScheduler: def stop_task_thread(self): if not self.is_running: return - + self.is_running = False # 等待轮询线程结束 if self.task_thread and self.task_thread.is_alive(): self.task_thread.join(timeout=2.0) - + # 关闭线程池 - self.executor.shutdown(wait=True) \ No newline at end of file + self.executor.shutdown(wait=True) diff --git a/tests/test_message_item.py b/tests/test_message_item.py index 7663c6e..c3f8385 100644 --- a/tests/test_message_item.py +++ b/tests/test_message_item.py @@ -1,9 +1,10 @@ from queue_sqlite.model import MessageItem +import json class TestMessageItem: messageItem = MessageItem(content={"num": 1}, destination="test") - + @classmethod def test_to_dict(cls): print(cls.messageItem.to_dict()) @@ -15,4 +16,46 @@ class TestMessageItem: @classmethod def test_from_dict(cls): messageItem = MessageItem.from_dict(cls.messageItem.to_dict()) - print(messageItem.to_dict()) \ No newline at end of file + data = { + "id": "ddb66277-503c-4921-8e7f-5091eace49e3", + "type": "task", + "status": 2, + "content": '{"num": 1}', + "createtime": "2025-09-12T18:41:08.221531", + "updatetime": "2025-09-12T18:41:08.221531", + "result": '"{\\"message\\": \\"\\\\u6d4b\\\\u8bd5\\\\u6210\\\\u529f\\"}"', + "priority": 1, + "source": "client", + "destination": "test", + "retry_count": 0, + "expire_time": "null", + "tags": "null", + "metadata": "{}", + } + print(messageItem.to_dict()) + print(MessageItem.from_dict(data).to_dict()) + + @classmethod + def test_from_json(cls): + json_str = """ + { + "content": { + "num": 1 + }, + "createtime": "2025-09-12T16:42:50.663248", + "destination": "test", + "expire_time": null, + "id": "b83c3d72-0b06-4c34-ab4f-32d696aa3875", + "metadata": {}, + "priority": 1, + "result": "{\"message\": \"\\u6d4b\\u8bd5\\u6210\\u529f\"}", + "retry_count": 0, + "source": "client", + "status": 2, + "tags": null, + "type": "task", + "updatetime": "2025-09-12T16:42:50.663248" + } + """ + messageItem = MessageItem.from_dict(json.loads(json_str)) + print(messageItem.to_json())