SQLite 任务队列系统
这个项目是一个基于 SQLite 的高性能任务队列系统,使用 Rust 和 Python 混合编程实现。系统提供了任务调度、消息队列管理和任务生命周期管理的完整解决方案。
主要特性
-
🚀 高性能:使用 Rust 实现核心操作,确保高性能
-
📚 多分片支持:支持数据库分片,提高并发处理能力
-
⏱️ 智能调度:提供接收调度器、任务调度器和清理调度器
-
🔒 任务生命周期管理:支持任务状态跟踪、重试机制和过期处理
-
📊 监控支持:内置资源监控功能
-
🧩 任务挂载系统:通过装饰器轻松添加新任务
项目结构
src/
├── core/ # Rust 核心实现
│ ├── lib.rs # 主模块
│ ├── queue_operation.rs # 队列操作实现
│ └── task_mounter.rs # 任务挂载实现
│
├── queue_sqlite/ # Python 实现
│ ├── constant/ # 常量定义(消息优先级、状态、类型)
│ ├── core/ # 核心接口
│ ├── model/ # 数据模型
│ ├── queue_operation/ # 队列操作封装
│ ├── scheduler/ # 调度器实现
│ └── task_cycle/ # 任务生命周期管理
│
tests/ # 测试代码
核心组件
-
消息模型 (MessageItem)
定义了任务消息的数据结构,包含:
-
消息ID、类型、状态
-
内容、创建时间、更新时间
-
优先级、来源、目标
-
重试次数、过期时间
-
标签和元数据
-
-
队列操作 (QueueOperation)
提供对 SQLite 队列的基本操作:
-
初始化数据库
-
入队和出队操作
-
获取队列长度和已完成消息
-
更新状态和结果
-
删除消息和清理过期消息
-
-
调度系统
包含三个主要调度器:
接收调度器 (ReceiveScheduler)
-
处理消息发送
-
管理回调函数
-
接收已完成消息
任务调度器 (TaskScheduler)
-
从队列中获取任务
-
调用任务函数
-
更新任务状态和结果
清理调度器 (CleanupScheduler)
-
清理过期消息
-
删除旧消息(默认清理7天前的消息)
-
-
任务挂载系统 (TaskMounter) 提供装饰器挂载任务函数:
@TaskMounter.task(meta={"task_name": "example"}) def example_task(message_item: MessageItem): # 任务逻辑 return result
使用示例
基本使用
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
# 初始化调度器
scheduler = QueueScheduler(
receive_thread_num=4,
task_thread_num=8,
shard_num=12
)
# 启动调度器
scheduler.start_queue_scheduler()
# 创建消息
message = MessageItem(
content={"data": "example"},
destination="task_name"
)
# 定义回调函数
def callback(message_item):
print(f"任务完成: {message_item.id}")
# 发送消息
scheduler.send_message(message, callback)
# ... 程序运行 ...
# 停止调度器
scheduler.stop_queue_scheduler()
压力测试
# tests/test_stress.py
class TestStress:
@classmethod
def test_stress(cls):
TASK_NUM = 10000
scheduler = QueueScheduler(receive_thread_num=4, task_thread_num=8, shard_num=12)
scheduler.start_queue_scheduler()
# 发送大量任务
for i in range(TASK_NUM):
message = MessageItem(content={"num": i}, destination="example")
scheduler.send_message(message, cls._callback)
# 等待所有任务完成
while scheduler.queue_operation.get_queue_length() > 0:
time.sleep(0.5)
scheduler.stop_queue_scheduler()
安装与运行
前提条件
Python 3.7+
Rust 工具链
SQLite 开发文件
安装步骤
-
克隆仓库:
git clone https://github.com/your-repo/sqlite-task-queue.git cd sqlite-task-queue -
安装 Python 依赖:
pip install -r requirements.txt -
构建 Rust 核心模块:
cd src/core maturin develop --release将会在
src/core/target/release目录下生成core.dll或core.so文件。 将该文件复制到queue_sqlite/core目录下(dll文件需改名为pyd后缀)。 -
运行测试:
pytest tests/ -
性能指标
在标准开发机器上(8核CPU,16GB内存):
可处理 10,000+ 任务(斐波那契数列前500项计算)/分钟
平均任务延迟 < 50ms
CPU 使用率 < 60%
内存占用 < 500MB
贡献指南
欢迎贡献代码!请遵循以下步骤:
-
Fork 仓库
创建新分支 (git checkout -b feature/your-feature)
提交更改 (git commit -am 'Add some feature')
推送到分支 (git push origin feature/your-feature)
-
创建 Pull Request
许可证
本项目采用 MIT 许可证。