queue_sqlite/README.md
2026-01-05 20:33:18 +08:00

12 KiB
Raw Permalink Blame History

Queue SQLite - 高性能 SQLite 任务队列系统

python-3.11+ rust-1.65+ license-MIT version-0.2.1

一个基于 SQLite 的高性能任务队列系统,采用 Rust 核心操作,支持任务挂载、消息监听、优先级处理、重试机制和自动清理过期消息。适合构建可靠、可扩展的后台任务处理系统。

🌟 特性

核心优势

  • 🚀 高性能Rust 核心提供毫秒级任务处理
  • 💾 持久化存储:基于 SQLite 的可靠消息存储
  • 🔄 多调度器支持标准、异步、Qt 三种调度模式
  • 🎯 智能分片:自动哈希分片,支持横向扩展
  • 📊 全面监控:内置资源使用监控和队列状态查看

功能亮点

  • 任务装饰器:使用 @task 装饰器轻松注册任务
  • 监听装饰器:使用 @listener 装饰器实现数据变更监听
  • 优先级队列:支持 LOW/NORMAL/HIGH/URGENT 四级优先级
  • 重试机制:可配置的最大重试次数和延迟重试
  • 过期清理:自动清理过期和完成的消息
  • 批量操作:支持消息批量入队和处理
  • 异步支持:原生支持 async/await 异步任务
  • Qt 集成:可选 Qt 调度器用于 GUI 应用

📦 安装

前置要求

  • Python 3.11+
  • Rust 1.65+ (用于编译核心扩展)
  • SQLite 3.35+

安装方式

方式一:从源码安装(推荐)

# 克隆仓库
git clone https://github.com/chakcy/queue_sqlite.git
cd queue_sqlite

# 安装 Rust如果未安装
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

# 安装 Python 依赖
pip install -r requirements.txt

# 安装开发模式
pip install -e .

方式二:从 PyPI 安装

pip install queue-sqlite

🚀 快速开始

基本使用

from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from queue_sqlite.constant import MessagePriority
from queue_sqlite.mounter import task


# 1. 注册任务
@task(meta={"max_retries": 3, "delay": 1})
def process_image(message_item):
    """处理图片任务"""
    data = message_item.content
    # 处理逻辑
    return {"status": "success", "processed": data["image_id"]}


# 2. 创建调度器
scheduler = QueueScheduler(scheduler_type="standard")

# 3. 启动调度器
scheduler.start()

# 4. 发送任务
for i in range(10):
    message = MessageItem(
        content={"image_id": i, "path": f"/images/{i}.jpg"},
        destination="process_image",  # 任务函数名
        priority=MessagePriority.HIGH,  # HIGH 优先级
        tags="image_processing",
    )

    def callback(result_message):
        print(f"任务完成: {result_message.id}, 结果: {result_message.result}")

    scheduler.send_message(message, callback)

# 5. 等待任务完成
import time

while scheduler.queue_operation.get_queue_length() > 0:
    print(f"剩余任务: {scheduler.queue_operation.get_queue_length()}")
    time.sleep(1)

# 6. 停止调度器
scheduler.stop()

异步任务示例

import asyncio
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from queue_sqlite.mounter import task


@task(meta={"name": "async_processor", "max_retries": 2})
async def async_data_fetcher(message_item):
    """异步数据获取任务"""
    url = message_item.content["url"]
    # 模拟异步 HTTP 请求
    await asyncio.sleep(0.5)
    return {"url": url, "data": "fetched", "status": 200}


async def main():
    scheduler = QueueScheduler(scheduler_type="async")
    scheduler.start()

    # 发送异步任务
    message = MessageItem(
        content={"url": "https://api.example.com/data"},
        destination="async_data_fetcher",
    )

    scheduler.send_message(message, lambda m: print(f"完成: {m.id}"))

    await asyncio.sleep(5)
    scheduler.stop()


asyncio.run(main())

数据监听示例

from queue_sqlite import QueueScheduler
from queue_sqlite.mounter import listener

# 注册监听器
@listener()
def user_activity_log(data):
    """监听用户活动数据"""
    print(f"用户活动: {data}")

@listener()
def system_alert(data):
    """监听系统告警"""
    print(f"系统告警: {data}")

# 创建调度器
scheduler = QueueScheduler()
scheduler.start()

# 更新监听数据(会自动触发监听函数)
scheduler.update_listen_data("user_activity_log", "用户登录")
scheduler.update_listen_data("user_activity_log", "用户购买")
scheduler.update_listen_data("system_alert", "CPU使用率过高")

⚙️ 配置选项

调度器配置

from queue_sqlite import SchedulerConfig, QueueScheduler

config = SchedulerConfig(
    receive_thread_num=2,    # 接收线程数
    task_thread_num=8,       # 任务执行线程数
    shard_num=4,             # 数据库分片数
    queue_name="production", # 队列名称
    meta={"app": "myapp"}    # 自定义元数据
)

scheduler = QueueScheduler(
    scheduler_type="standard",  # standard | async | qt
    config=config
)

消息配置

from queue_sqlite import MessageItem
from queue_sqlite.constant import MessagePriority, MessageType
from datetime import datetime, timedelta

message = MessageItem(
    # 必需字段
    content={"data": "任务数据"},
    destination="task_function_name",
    
    # 可选字段
    id="custom-uuid",  # 默认自动生成
    type=MessageType.TASK,
    priority=MessagePriority.HIGH,
    source="web_api",
    tags="urgent,processing",
    
    # 时间控制
    expire_time=datetime.now() + timedelta(hours=1),  # 1小时后过期
    retry_count=0,
    
    # 自定义元数据
    metadata={"user_id": 123, "request_id": "abc123"}
)

📊 系统架构

架构图

┌─────────────────────────────────────────────────────────┐
│                    Python application                   |
│  ┌─────────────┐  ┌─────────────┐  ┌────────────────┐   │
│  │   @task     │  │ @listener   │  │ QueueScheduler │   │
│  │             │  │             │  │                │   │
│  └─────────────┘  └─────────────┘  └────────────────┘   │
└─────────────────────────────────────────────────────────┘
                            │
┌─────────────────────────────────────────────────────────┐
│                  Python Service                         │
│  ┌──────────────┐  ┌─────────────┐  ┌─────────────┐     │
│  │ TaskMounter  │  │ TaskCycle   │  │ Schedulers  │     │
│  │ ListenMounter│  │ AsyncCycle  │  │             │     │
│  └──────────────┘  └─────────────┘  └─────────────┘     │
└─────────────────────────────────────────────────────────┘
                            │
┌─────────────────────────────────────────────────────────┐
│                   Rust core                             │
│  ┌─────────────────────────────────────────┐            │
│  │      queue_sqlite_core                  │            │
│  │  • shared sqlite database               │            │
│  │  • SQLite Optimization                  │            │
│  │  • Connection pool                      │            │
│  └─────────────────────────────────────────┘            │
└─────────────────────────────────────────────────────────┘
                            │
┌─────────────────────────────────────────────────────────┐
│                  SQLite database                        │
│  ┌───────────────────────────────────────────────┐      │
│  │      shared database (cache/queue_name/)      │      │
│  │  • queue_shard_0.db                           │      │
│  │  • queue_shard_1.db                           │      │
│  │  • listen.db                                  │      │
│  └───────────────────────────────────────────────┘      │
└─────────────────────────────────────────────────────────┘

组件说明

  1. MessageIte: 核心数据模型,包含消息的所有属性和方法
  2. TaskMounter: 任务过载器,通过装饰器注册任务函数
  3. ListenMounter:监听挂载器,通过装饰器注册监听函数
  4. TaskCycle:任务生命周期管理器,处理重试和状态更新
  5. QueueScheduler:统一调度器接口,支持三种实现:
    • StandardQueueScheduler:统一调度器接口,支持三种实现:
    • AsyncQueueScheduler:异步/等待实现
    • QtQueueSchedulerQt 线程池实现GUI应用
  6. CleanupScheduler:自动清理过期消息
  7. ShardedQueueOperationRust 实现的高性能分片队列操作

🧪 测试

运行测试套件

# 运行所有测试
python -m -v -s pytest tests/

# 运行特定测试
python -m pytest tests/test_stress.py -v
python -m pytest tests/test_async_scheduler.py -v

性能测试示例

from tests.test_stress import TestStress

# 压力测试:处理 10000 个任务
TestStress.test_stress()

# 异步调度器测试
from tests.test_async_scheduler import TestAsyncScheduler
TestAsyncScheduler.test_async_scheduler()

📈 性能指标

基准测试结果

指标 标准调度器 异步调度器 Qt 调度器
单核 QPS 5,000+ 8,000+ 6,000+
内存占用 50-100MB 60-120MB 70-150MB
延迟p95 <50ms <30ms <40ms
最大并发 1,000+ 2,000+ 1500+

扩展性测试

  • 10 分片:支持 50,000+ 并发任务
  • 自动负载均衡:分片间任务均匀分布
  • 线性扩展:增加分片数可线性提升吞吐量

📄 许可证

本项目采用 MIT 许可证 - 查看 LICENSE 文件了解详情。

📞 联系与支持

🙏 致谢

感谢以下开源项目:

  • SQLite - 轻量级嵌入式数据库
  • PyO3 - Rust-Python 绑定
  • r2d2 - Rust 数据库连接池

Queue SQLite - 为您的应用提供可靠、高效的任务队列解决方案。