queue_sqlite/README.md
2025-08-08 18:15:29 +08:00

226 lines
5.0 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# SQLite 任务队列系统
这个项目是一个基于 SQLite 的高性能任务队列系统,使用 Rust 和 Python 混合编程实现。系统提供了任务调度、消息队列管理和任务生命周期管理的完整解决方案。
## 主要特性
- 🚀 高性能:使用 Rust 实现核心操作,确保高性能
- 📚 多分片支持:支持数据库分片,提高并发处理能力
- ⏱️ 智能调度:提供接收调度器、任务调度器和清理调度器
- 🔒 任务生命周期管理:支持任务状态跟踪、重试机制和过期处理
- 📊 监控支持:内置资源监控功能
- 🧩 任务挂载系统:通过装饰器轻松添加新任务
## 项目结构
```
src/
├── core/ # Rust 核心实现
│ ├── lib.rs # 主模块
│ ├── queue_operation.rs # 队列操作实现
│ └── task_mounter.rs # 任务挂载实现
├── queue_sqlite/ # Python 实现
│ ├── constant/ # 常量定义(消息优先级、状态、类型)
│ ├── core/ # 核心接口
│ ├── model/ # 数据模型
│ ├── queue_operation/ # 队列操作封装
│ ├── scheduler/ # 调度器实现
│ └── task_cycle/ # 任务生命周期管理
tests/ # 测试代码
```
## 核心组件
1. 消息模型 (MessageItem)
定义了任务消息的数据结构,包含:
- 消息ID、类型、状态
- 内容、创建时间、更新时间
- 优先级、来源、目标
- 重试次数、过期时间
- 标签和元数据
2. 队列操作 (QueueOperation)
提供对 SQLite 队列的基本操作:
- 初始化数据库
- 入队和出队操作
- 获取队列长度和已完成消息
- 更新状态和结果
- 删除消息和清理过期消息
3. 调度系统
包含三个主要调度器:
***接收调度器 (ReceiveScheduler)***
- 处理消息发送
- 管理回调函数
- 接收已完成消息
***任务调度器 (TaskScheduler)***
- 从队列中获取任务
- 调用任务函数
- 更新任务状态和结果
***清理调度器 (CleanupScheduler)***
- 清理过期消息
- 删除旧消息默认清理7天前的消息
4. 任务挂载系统 (TaskMounter)
提供装饰器挂载任务函数:
```python
@TaskMounter.task(meta={"task_name": "example"})
def example_task(message_item: MessageItem):
# 任务逻辑
return result
```
## 使用示例
### 基本使用
```python
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
# 初始化调度器
scheduler = QueueScheduler(
receive_thread_num=4,
task_thread_num=8,
shard_num=12
)
# 启动调度器
scheduler.start_queue_scheduler()
# 创建消息
message = MessageItem(
content={"data": "example"},
destination="task_name"
)
# 定义回调函数
def callback(message_item):
print(f"任务完成: {message_item.id}")
# 发送消息
scheduler.send_message(message, callback)
# ... 程序运行 ...
# 停止调度器
scheduler.stop_queue_scheduler()
```
### 压力测试
```python
# tests/test_stress.py
class TestStress:
@classmethod
def test_stress(cls):
TASK_NUM = 10000
scheduler = QueueScheduler(receive_thread_num=4, task_thread_num=8, shard_num=12)
scheduler.start_queue_scheduler()
# 发送大量任务
for i in range(TASK_NUM):
message = MessageItem(content={"num": i}, destination="example")
scheduler.send_message(message, cls._callback)
# 等待所有任务完成
while scheduler.queue_operation.get_queue_length() > 0:
time.sleep(0.5)
scheduler.stop_queue_scheduler()
```
## 安装与运行
### 前提条件
Python 3.7+
Rust 工具链
SQLite 开发文件
### 安装步骤
1. 克隆仓库:
```bash
git clone https://github.com/your-repo/sqlite-task-queue.git
cd sqlite-task-queue
```
2. 安装 Python 依赖:
```bash
pip install -r requirements.txt
```
3. 构建 Rust 核心模块:
```bash
cd src/core
maturin develop --release
```
将会在 `src/core/target/release` 目录下生成 `core.dll` 或 `core.so` 文件。
将该文件复制到 `queue_sqlite/core` 目录下dll文件需改名为pyd后缀
4. 运行测试:
```bash
pytest tests/
```
5. 性能指标
在标准开发机器上8核CPU16GB内存
可处理 10,000+ 任务(斐波那契数列前500项计算)/分钟
平均任务延迟 < 50ms
CPU 使用率 < 60%
内存占用 < 500MB
### 贡献指南
欢迎贡献代码请遵循以下步骤
1. Fork 仓库
创建新分支 (git checkout -b feature/your-feature)
提交更改 (git commit -am 'Add some feature')
推送到分支 (git push origin feature/your-feature)
2. 创建 Pull Request
### 许可证
本项目采用 MIT 许可证