This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
SQLite 任务队列 (Queue-SQLite) Wiki
概述
queue-sqlite 是一个基于 SQLite 数据库构建的、高性能、轻量级 的分布式任务队列库。其核心由 Rust 编写(通过 PyO3 提供 Python 绑定),确保了消息操作的极致性能与线程安全,而上层应用则使用 Python,提供了灵活易用的 API。
它非常适合需要处理大量异步任务、需要任务持久化、或者构建松耦合应用的场景。
核心特性
- 高性能核心:使用 Rust 处理核心的队列操作(入队、出队),并进行了 SQLite 性能优化(WAL 模式、内存缓存等)。
- 任务持久化:所有任务消息都存储在 SQLite 数据库中,服务重启后不会丢失。
- 优先级队列:支持为任务设置优先级(低、普通、高、紧急),确保重要任务优先处理。
- 任务重试:内置任务重试机制。
- 过期清理:支持设置任务过期时间,并自动清理已完成/失败的历史任务。
- 监听器模式:提供了灵活的监听器(Listener)机制,可以响应特定的数据变更事件。
- 优雅的 API:通过装饰器(
@task,@listener)轻松定义任务和监听器 - 水平扩展:通过分片(Sharding)机制支持队列水平扩展,提高并发处理能力。
核心架构
该项目采用分层架构:
- Rust Core Layer (
./src/core/):提供最基础队列操作。QueueOperation:直接与 SQLite 交互,处理消息的 CRUD。TaskMounter:直接在 Rust 层面获取 Python 中挂载的任务函数(桥接作用)。
- Python Core Layer (
./src/queue_sqlite/core/):对 Rust Core 的封装。core: 自动生成的PyO3模块,是调用 Rust 代码的入口。
- Model Layer (
./src/queue_sqlite/model):定义了系统的数据模型。MessageItem:任务消息的数据类,包含任务的所有元信息和内容。
- Mounter Layer (
./src/queue_sqlite/mounter):任务和监听器的挂载点。TaskMounter:用于挂载和获取任务函数ListenMounter:用于挂载和获取监听器函数
- Operation Layer (
./src/queue_sqlite/queue_operation):高级队列操作。QueueOperation:实现了分片逻辑,管理多个 SQLite 数据库连接。ListenOperation:管理监听器的数据存储和变更追踪。
- Scheduler Layer (
./src/queue_sqlite/scheduler):核心调度逻辑。TaskScheduler:从队列中取出任务并执行。ReceiveScheduler:轮询已完成的任务,并执行用户回调。ListenDataScheduler:监听数据变化并触发对应的监听器。CleanupScheduler:定期清理过期的旧消息。QueueScheduler:总调度器,整合以上所有调度器。
- Constant Definitions (
./src/queue_sqlite/constant):枚举产量定义
安装
目前提供了 windows python 3.13 版本的 whl 文件可在版本中下载,其他版本的需要编译对应 python 版本的 core 模块后再进行安装,下面将演示如何进行编译并完成安装
前提条件
- Python:3.8+
- Rust Toolchain:安装最新版本的 Rust(用于编译 Rust 扩展)。
从源码安装
-
初始化项目
mkdir demo cd demo python -m venv .venv .venv\Scripts\activate mkdir lib -
克隆仓库
cd lib git clone http://124.71.68.6:3000/chakcy_code_repository/queue_sqlite.git 或 git clone https://gitee.com/cai-xinpenge/queue_sqlite.git cd queue_sqlite -
安装依赖
pip install requirements.txt -
编译 Rust 模块
cd src/core maturin develop --release # 先删除原有的 core.pyd 再将生成的 core.dll 移动并更名到 src/queue_sqlite/core 中 rm ..\queue_sqlite\core\core.pyd mv .\target\release\core.dll ..\queue_sqlite\core\core.pyd -
安装
安装打包工具
pip install --upgrade build在打包前需要修改 pyproject.toml 文件中 require-python 参数,将该参数修改为你当前的 python 版本
生成 whl 文件
cd ../../ python -m build --wheel安装 queue_sqlite
pip install dist/queue_sqlite-x.x.x-py3-none-any.whl
快速开始
1. 定义一个任务
创建 my_tasks.py
from queue_sqlite.mounter.task_mounter import TaskMounter
from queue_sqlite.model import MessageItem
@TaskMounter.task(meta={"author": "dev"}) # 使用装饰器挂载任务
def process_image(message_item: MessageItem):
"""一个处理图片的任务"""
image_data = message_item.content
# ... 你的处理逻辑 ...
print(f"Processing image: {image_data['url']}")
result = {"status": "success", "size": [100, 200]}
return result
2. 定义一个监听器(可选)
创建 my_listeners.py
from queue_sqlite.mounter.listen_mounter import ListenMounter
@ListenMounter.listener() # 使用装饰器挂载监听器
def on_config_changed(new_value):
"""监听配置变化的监听器"""
print(f"Config updated to: {new_value}")
3. 使用队列调度器
创建 main.py
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from my_tasks import process_image
# 初始化调度器
# receive_thread_num: 处理回调的线程数
# task_thread_num: 处理任务的线程数
# shard_num: 数据库分片数
scheduler = QueueScheduler(receive_thread_num=2, task_thread_num=4, shard_num=4)
def my_callback(completed_message: MessageItem):
"""任务完成后的回调函数"""
print(f"Task {completed_message.id} finished!")
print(f"Result: {completed_message.result}")
# 准备消息
message = MessageItem(
content={"url": "https://example.com/image.jpg"},
destination="process_image" # 必须与任务函数名一致
)
# 启动所有调度线程
scheduler.start_queue_scheduler()
# 发送消息到队列
scheduler.send_message(message, my_callback)
# 更新监听数据,会触发 `on_config_changed` 监听器
scheduler.update_listen_data("on_config_changed", "dark_mode=true")
# ... 主程序继续运行 ...
# 当需要关闭时:
# scheduler.stop_queue_scheduler()
核心概念
MessageItem
任务消息的载体。重要字段:
id: 唯一 ID(自动生成)content: 任务的主要内容(dict)。destination: 必须与任务函数名一致。priority: 优先级(见 MessagePriority)。status: 状态(见 MessageStatus)。expire_time: 任务过期时间。
任务(Task)
一个被 @TaskMounter.task 装饰的普通 Python 函数。它接收一个 MessageItem 参数,执行业务逻辑,并返回一个可 JSON 序列化的结果。
监听器 (Listener)
一个被 @ListenMounter.listener 装饰的普通 Python 函数。它监听通过 scheduler.update_listen_data(key, value) 更新的数据,并在值发生变化时被触发。
调度器 (Scheduler)
系统的核心,包含四个部分:
- TaskScheduler: 消费者,从队列取任务并执行。
- ReceiveScheduler: 消费者,检查已完成的任务,执行用户回调。
- ListenDataScheduler: 消费者,检查数据变化,触发监听器。
- CleanupScheduler: 管理员,定期清理垃圾数据。
详细使用指南
消息优先级的使用
设置任务过期时间
错误处理与重试机制
如何扩张分片数
监听器的高级用法
API 文档
MessageItem
to_dict() -> Dict:转化为字典to_dict_by_core() -> Dict:转换为核心库所需的格式(字符串化的 JSON)。classmethod from_dict(data: Dict) -> MessageItem:从字典创建对象。is_expired() -> bool:检查消息是否过期。
QueueScheduler
__init__(receive_thread_num=1, task_thread_num=1, shard_num=4)start_queue_scheduler():启动所有后台线程。stop_queue_scheduler():停止所有后台线程。send_message(message: MessageItem, callback: Callable):发送任务。update_listen_data(key: str, value: Any):更新监听数据。get_listen_data(key: str) -> Any:获取监听数据当前值。
装饰器
@TaskMounter.task(meta: dict = {})@ListenMounter.listener()
性能调优
- 分片 (
shard_num):这是最重要的参数。根据你的工作负载和 CPU 核心数增加分片(例如 8,16),可以显著提高入队/出队的吞吐量。 - 线程数:
task_thread_num应根据任务类型(I/O 密集型或 CPU 密集型)进行调整。 - SQLite 优化:Rust 核心层已经设置了积极的 PRAGMA 选项(WAL,NORMAL sync,大缓存)。除非非常了解,否则不建议修改。
- 消息大小:保持
MessageItem.content尽可能小,大的数据应该存储在其他地方(如 S3、数据库),只在消息中存储引用 ID。
常见问题
Q:任务函数执行失败了怎么办?
A:任务状态会被标记为 FAILED。根据 retry_count 和你的逻辑,你可以选择重新发送该消息
Q:如何查询队列中当前的任务数量
A:使用 scheduler.queue_operation.get_queue_length()。
Q:如何手动触发清理?
A:可以调用 scheduler.cleanup_scheduler.cleanup_expired_message(),但通常清理调度器会自动处理。
Q:编译 Rust 扩展时出错?
A:确保你的 Rust 工具链是最新的 (rustup update)。在 Windows 上,你可能需要安装 Microsoft C++ Build Tools。
Q:destination 找不到对应的任务函数?
A:确保任务函数被正确装饰(@TaskMounter.task)并且其 __name__ 与 destination 字符串完全一致。确保包含任务函数的模块在运行主程序前已被导入。
阶段规划
- 完善队列功能:优化重试和过期机制
- 增加其他调度器类型:当前调度器类型是基于 python 线程池实现的,后续可能增加携程版本和 PySide6 QThead 版本等
- 持续 rust 化:目前 rust 只处理 sqlite 相关操作,后续调度器可能由 Rust 实现
- 增加对其他语言的支持:当前仅仅提供 python 相关接口,后续可能为了 Rust、JavaScript 提供接口。