diff --git a/src/queue_sqlite/model/message_item.py b/src/queue_sqlite/model/message_item.py index 0b24dc1..0cfae71 100644 --- a/src/queue_sqlite/model/message_item.py +++ b/src/queue_sqlite/model/message_item.py @@ -88,6 +88,16 @@ class MessageItem: else: raise ValueError(f"请使用 'str' 或 'datetime' 类型设置 {field} 字段") + if 'result' in data: + if data['result'] == 'null': + data['result'] = {} + elif isinstance(data['result'], str): + data['result'] = json.loads(data['result']) + + if 'tags' in data: + if data['tags'] == 'null': + data['tags'] = None + # 处理枚举字段 if 'type' in data: if isinstance(data['type'], str): diff --git a/src/queue_sqlite/scheduler/receive_scheduler.py b/src/queue_sqlite/scheduler/receive_scheduler.py index 1a54058..c8438a8 100644 --- a/src/queue_sqlite/scheduler/receive_scheduler.py +++ b/src/queue_sqlite/scheduler/receive_scheduler.py @@ -5,6 +5,7 @@ from concurrent.futures import ThreadPoolExecutor import threading import time + class ReceiveScheduler: def __init__(self, queue_operation: QueueOperation, receive_thread_num: int = 1): self.callbacks = dict() @@ -31,13 +32,14 @@ class ReceiveScheduler: if message_list: for message in message_list: message = MessageItem.from_dict(message) - callback = None + callback_default = lambda message: message with self.lock: callback = self.callbacks.pop(message.id, None) + if callback is None: + callback = callback_default - if callback: - # 使用线程池并行执行回调 - self.executor.submit(self._safe_callback, callback, message) + # 使用线程池并行执行回调 + self.executor.submit(self._safe_callback, callback, message) else: time.sleep(0.1) # 适当休眠避免CPU空转 diff --git a/src/queue_sqlite/task_cycle/task_cycle.py b/src/queue_sqlite/task_cycle/task_cycle.py index 7cff5d0..d137de2 100644 --- a/src/queue_sqlite/task_cycle/task_cycle.py +++ b/src/queue_sqlite/task_cycle/task_cycle.py @@ -1,6 +1,7 @@ from ..model import MessageItem from typing import Callable, Optional from ..constant import MessageStatus +import json class TaskCycle: def __init__(self, message_item: MessageItem, callback: Optional[Callable]): @@ -23,7 +24,24 @@ class TaskCycle: self.task_error = None def get_task_result(self): - return self.task_result + if isinstance(self.task_result, (dict, list)): + try: + return json.dumps(self.task_result) + except: + return json.dumps({'result': str(self.task_result)}) + + elif isinstance(self.task_result, str): + try: + json.loads(self.task_result) + return self.task_result + except: + return json.dumps({'result': self.task_result}) + elif isinstance(self.task_result, (int, float, bool)): + return json.dumps({'result': self.task_result}) + elif self.task_result is None: + return 'null' + else: + return json.dumps({'result': str(self.task_result)}) def get_task_status(self): return self.task_status