queue_sqlite/tests/tasks/example.py

19 lines
527 B
Python
Raw Normal View History

2025-08-08 16:03:02 +08:00
from queue_sqlite.task_cycle.task_mounter import TaskMounter
from queue_sqlite.model import MessageItem
@TaskMounter.task(meta={"task_name": "test"})
def example(message_item: MessageItem):
def fibonacci_generator():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# 示例获取前10项
fib = fibonacci_generator()
message_item.result = {"fibonacci": [next(fib) for _ in range(500)]}
return message_item.to_json()
# 输出:[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]