From 32e22aee187887a94451284566570b64b9df3b50 Mon Sep 17 00:00:00 2001 From: chakcy <947105045@qq.com> Date: Mon, 11 Aug 2025 14:56:25 +0800 Subject: [PATCH] =?UTF-8?q?modify:=20message=5Fitem=20result=20=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E5=BA=8F=E5=88=97=E5=8C=96=E5=A2=9E=E5=BC=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/queue_sqlite/model/message_item.py | 10 ++++++++++ .../scheduler/receive_scheduler.py | 10 ++++++---- src/queue_sqlite/task_cycle/task_cycle.py | 20 ++++++++++++++++++- 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/src/queue_sqlite/model/message_item.py b/src/queue_sqlite/model/message_item.py index 0b24dc1..0cfae71 100644 --- a/src/queue_sqlite/model/message_item.py +++ b/src/queue_sqlite/model/message_item.py @@ -88,6 +88,16 @@ class MessageItem: else: raise ValueError(f"请使用 'str' 或 'datetime' 类型设置 {field} 字段") + if 'result' in data: + if data['result'] == 'null': + data['result'] = {} + elif isinstance(data['result'], str): + data['result'] = json.loads(data['result']) + + if 'tags' in data: + if data['tags'] == 'null': + data['tags'] = None + # 处理枚举字段 if 'type' in data: if isinstance(data['type'], str): diff --git a/src/queue_sqlite/scheduler/receive_scheduler.py b/src/queue_sqlite/scheduler/receive_scheduler.py index 1a54058..c8438a8 100644 --- a/src/queue_sqlite/scheduler/receive_scheduler.py +++ b/src/queue_sqlite/scheduler/receive_scheduler.py @@ -5,6 +5,7 @@ from concurrent.futures import ThreadPoolExecutor import threading import time + class ReceiveScheduler: def __init__(self, queue_operation: QueueOperation, receive_thread_num: int = 1): self.callbacks = dict() @@ -31,13 +32,14 @@ class ReceiveScheduler: if message_list: for message in message_list: message = MessageItem.from_dict(message) - callback = None + callback_default = lambda message: message with self.lock: callback = self.callbacks.pop(message.id, None) + if callback is None: + callback = callback_default - if callback: - # 使用线程池并行执行回调 - self.executor.submit(self._safe_callback, callback, message) + # 使用线程池并行执行回调 + self.executor.submit(self._safe_callback, callback, message) else: time.sleep(0.1) # 适当休眠避免CPU空转 diff --git a/src/queue_sqlite/task_cycle/task_cycle.py b/src/queue_sqlite/task_cycle/task_cycle.py index 7cff5d0..d137de2 100644 --- a/src/queue_sqlite/task_cycle/task_cycle.py +++ b/src/queue_sqlite/task_cycle/task_cycle.py @@ -1,6 +1,7 @@ from ..model import MessageItem from typing import Callable, Optional from ..constant import MessageStatus +import json class TaskCycle: def __init__(self, message_item: MessageItem, callback: Optional[Callable]): @@ -23,7 +24,24 @@ class TaskCycle: self.task_error = None def get_task_result(self): - return self.task_result + if isinstance(self.task_result, (dict, list)): + try: + return json.dumps(self.task_result) + except: + return json.dumps({'result': str(self.task_result)}) + + elif isinstance(self.task_result, str): + try: + json.loads(self.task_result) + return self.task_result + except: + return json.dumps({'result': self.task_result}) + elif isinstance(self.task_result, (int, float, bool)): + return json.dumps({'result': self.task_result}) + elif self.task_result is None: + return 'null' + else: + return json.dumps({'result': str(self.task_result)}) def get_task_status(self): return self.task_status