modify: message_item result 字段序列化增强
This commit is contained in:
parent
38e7cb55fe
commit
32e22aee18
@ -88,6 +88,16 @@ class MessageItem:
|
|||||||
else:
|
else:
|
||||||
raise ValueError(f"请使用 'str' 或 'datetime' 类型设置 {field} 字段")
|
raise ValueError(f"请使用 'str' 或 'datetime' 类型设置 {field} 字段")
|
||||||
|
|
||||||
|
if 'result' in data:
|
||||||
|
if data['result'] == 'null':
|
||||||
|
data['result'] = {}
|
||||||
|
elif isinstance(data['result'], str):
|
||||||
|
data['result'] = json.loads(data['result'])
|
||||||
|
|
||||||
|
if 'tags' in data:
|
||||||
|
if data['tags'] == 'null':
|
||||||
|
data['tags'] = None
|
||||||
|
|
||||||
# 处理枚举字段
|
# 处理枚举字段
|
||||||
if 'type' in data:
|
if 'type' in data:
|
||||||
if isinstance(data['type'], str):
|
if isinstance(data['type'], str):
|
||||||
|
|||||||
@ -5,6 +5,7 @@ from concurrent.futures import ThreadPoolExecutor
|
|||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
class ReceiveScheduler:
|
class ReceiveScheduler:
|
||||||
def __init__(self, queue_operation: QueueOperation, receive_thread_num: int = 1):
|
def __init__(self, queue_operation: QueueOperation, receive_thread_num: int = 1):
|
||||||
self.callbacks = dict()
|
self.callbacks = dict()
|
||||||
@ -31,13 +32,14 @@ class ReceiveScheduler:
|
|||||||
if message_list:
|
if message_list:
|
||||||
for message in message_list:
|
for message in message_list:
|
||||||
message = MessageItem.from_dict(message)
|
message = MessageItem.from_dict(message)
|
||||||
callback = None
|
callback_default = lambda message: message
|
||||||
with self.lock:
|
with self.lock:
|
||||||
callback = self.callbacks.pop(message.id, None)
|
callback = self.callbacks.pop(message.id, None)
|
||||||
|
if callback is None:
|
||||||
|
callback = callback_default
|
||||||
|
|
||||||
if callback:
|
# 使用线程池并行执行回调
|
||||||
# 使用线程池并行执行回调
|
self.executor.submit(self._safe_callback, callback, message)
|
||||||
self.executor.submit(self._safe_callback, callback, message)
|
|
||||||
else:
|
else:
|
||||||
time.sleep(0.1) # 适当休眠避免CPU空转
|
time.sleep(0.1) # 适当休眠避免CPU空转
|
||||||
|
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
from ..model import MessageItem
|
from ..model import MessageItem
|
||||||
from typing import Callable, Optional
|
from typing import Callable, Optional
|
||||||
from ..constant import MessageStatus
|
from ..constant import MessageStatus
|
||||||
|
import json
|
||||||
|
|
||||||
class TaskCycle:
|
class TaskCycle:
|
||||||
def __init__(self, message_item: MessageItem, callback: Optional[Callable]):
|
def __init__(self, message_item: MessageItem, callback: Optional[Callable]):
|
||||||
@ -23,7 +24,24 @@ class TaskCycle:
|
|||||||
self.task_error = None
|
self.task_error = None
|
||||||
|
|
||||||
def get_task_result(self):
|
def get_task_result(self):
|
||||||
return self.task_result
|
if isinstance(self.task_result, (dict, list)):
|
||||||
|
try:
|
||||||
|
return json.dumps(self.task_result)
|
||||||
|
except:
|
||||||
|
return json.dumps({'result': str(self.task_result)})
|
||||||
|
|
||||||
|
elif isinstance(self.task_result, str):
|
||||||
|
try:
|
||||||
|
json.loads(self.task_result)
|
||||||
|
return self.task_result
|
||||||
|
except:
|
||||||
|
return json.dumps({'result': self.task_result})
|
||||||
|
elif isinstance(self.task_result, (int, float, bool)):
|
||||||
|
return json.dumps({'result': self.task_result})
|
||||||
|
elif self.task_result is None:
|
||||||
|
return 'null'
|
||||||
|
else:
|
||||||
|
return json.dumps({'result': str(self.task_result)})
|
||||||
|
|
||||||
def get_task_status(self):
|
def get_task_status(self):
|
||||||
return self.task_status
|
return self.task_status
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user