Clone
6
Home
chakcy edited this page 2025-09-15 12:51:23 +08:00
This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

SQLite 任务队列 (Queue-SQLite) Wiki

概述

queue-sqlite 是一个基于 SQLite 数据库构建的、高性能轻量级 的分布式任务队列库。其核心由 Rust 编写(通过 PyO3 提供 Python 绑定),确保了消息操作的极致性能与线程安全,而上层应用则使用 Python提供了灵活易用的 API。

它非常适合需要处理大量异步任务、需要任务持久化、或者构建松耦合应用的场景。

核心特性

  • 高性能核心:使用 Rust 处理核心的队列操作(入队、出队),并进行了 SQLite 性能优化WAL 模式、内存缓存等)。
  • 任务持久化:所有任务消息都存储在 SQLite 数据库中,服务重启后不会丢失。
  • 优先级队列:支持为任务设置优先级(低、普通、高、紧急),确保重要任务优先处理。
  • 任务重试:内置任务重试机制。
  • 过期清理:支持设置任务过期时间,并自动清理已完成/失败的历史任务。
  • 监听器模式提供了灵活的监听器Listener机制可以响应特定的数据变更事件。
  • 优雅的 API:通过装饰器(@task@listener)轻松定义任务和监听器
  • 水平扩展通过分片Sharding机制支持队列水平扩展提高并发处理能力。

核心架构

该项目采用分层架构:

  1. Rust Core Layer (./src/core/):提供最基础队列操作。
    • QueueOperation:直接与 SQLite 交互,处理消息的 CRUD。
    • TaskMounter:直接在 Rust 层面获取 Python 中挂载的任务函数(桥接作用)。
  2. Python Core Layer (./src/queue_sqlite/core/):对 Rust Core 的封装。
    • core: 自动生成的 PyO3 模块,是调用 Rust 代码的入口。
  3. Model Layer (./src/queue_sqlite/model):定义了系统的数据模型。
    • MessageItem:任务消息的数据类,包含任务的所有元信息和内容。
  4. Mounter Layer (./src/queue_sqlite/mounter):任务和监听器的挂载点。
    • TaskMounter:用于挂载和获取任务函数
    • ListenMounter:用于挂载和获取监听器函数
  5. Operation Layer (./src/queue_sqlite/queue_operation):高级队列操作。
    • QueueOperation:实现了分片逻辑,管理多个 SQLite 数据库连接。
    • ListenOperation:管理监听器的数据存储和变更追踪。
  6. Scheduler Layer (./src/queue_sqlite/scheduler):核心调度逻辑。
    • TaskScheduler:从队列中取出任务并执行。
    • ReceiveScheduler:轮询已完成的任务,并执行用户回调。
    • ListenDataScheduler:监听数据变化并触发对应的监听器。
    • CleanupScheduler:定期清理过期的旧消息。
    • QueueScheduler:总调度器,整合以上所有调度器。
  7. Constant Definitions (./src/queue_sqlite/constant):枚举产量定义

安装

目前提供了 windows python 3.13 版本的 whl 文件可在版本中下载,其他版本的需要编译对应 python 版本的 core 模块后再进行安装,下面将演示如何进行编译并完成安装

前提条件

  • Python3.8+
  • Rust Toolchain:安装最新版本的 Rust(用于编译 Rust 扩展)。

从源码安装

  1. 初始化项目

    mkdir demo
    cd demo
    python -m venv .venv
    .venv\Scripts\activate
    mkdir lib
    
  2. 克隆仓库

    cd lib
    git clone http://124.71.68.6:3000/chakcy_code_repository/queue_sqlite.git 
    或
    git clone https://gitee.com/cai-xinpenge/queue_sqlite.git
    cd queue_sqlite
    
  3. 安装依赖

    pip install requirements.txt
    
  4. 编译 Rust 模块

    cd src/core
    maturin develop --release
    # 先删除原有的 core.pyd 再将生成的 core.dll 移动并更名到 src/queue_sqlite/core 中
    rm ..\queue_sqlite\core\core.pyd
    mv .\target\release\core.dll ..\queue_sqlite\core\core.pyd
    
  5. 安装

    安装打包工具

    pip install --upgrade build
    

    在打包前需要修改 pyproject.toml 文件中 require-python 参数,将该参数修改为你当前的 python 版本

    生成 whl 文件

    cd ../../
    python -m build --wheel
    

    安装 queue_sqlite

    pip install dist/queue_sqlite-x.x.x-py3-none-any.whl
    

快速开始

1. 定义一个任务

创建 my_tasks.py

from queue_sqlite.mounter.task_mounter import TaskMounter
from queue_sqlite.model import MessageItem

@TaskMounter.task(meta={"author": "dev"}) # 使用装饰器挂载任务
def process_image(message_item: MessageItem):
    """一个处理图片的任务"""
    image_data = message_item.content
    # ... 你的处理逻辑 ...
    print(f"Processing image: {image_data['url']}")
    result = {"status": "success", "size": [100, 200]}
    return result

2. 定义一个监听器(可选)

创建 my_listeners.py

from queue_sqlite.mounter.listen_mounter import ListenMounter

@ListenMounter.listener() # 使用装饰器挂载监听器
def on_config_changed(new_value):
    """监听配置变化的监听器"""
    print(f"Config updated to: {new_value}")

3. 使用队列调度器

创建 main.py

from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from my_tasks import process_image

# 初始化调度器
# receive_thread_num: 处理回调的线程数
# task_thread_num: 处理任务的线程数
# shard_num: 数据库分片数
scheduler = QueueScheduler(receive_thread_num=2, task_thread_num=4, shard_num=4)

def my_callback(completed_message: MessageItem):
    """任务完成后的回调函数"""
    print(f"Task {completed_message.id} finished!")
    print(f"Result: {completed_message.result}")

# 准备消息
message = MessageItem(
    content={"url": "https://example.com/image.jpg"},
    destination="process_image" # 必须与任务函数名一致
)

# 启动所有调度线程
scheduler.start_queue_scheduler()

# 发送消息到队列
scheduler.send_message(message, my_callback)

# 更新监听数据,会触发 `on_config_changed` 监听器
scheduler.update_listen_data("on_config_changed", "dark_mode=true")

# ... 主程序继续运行 ...
# 当需要关闭时:
# scheduler.stop_queue_scheduler()

核心概念

MessageItem

任务消息的载体。重要字段:

  • id: 唯一 ID自动生成
  • content: 任务的主要内容dict
  • destination: 必须与任务函数名一致。
  • priority: 优先级(见 MessagePriority
  • status: 状态(见 MessageStatus
  • expire_time: 任务过期时间。

任务Task

一个被 @TaskMounter.task 装饰的普通 Python 函数。它接收一个 MessageItem 参数,执行业务逻辑,并返回一个可 JSON 序列化的结果。

监听器 (Listener)

一个被 @ListenMounter.listener 装饰的普通 Python 函数。它监听通过 scheduler.update_listen_data(key, value) 更新的数据,并在值发生变化时被触发。

调度器 (Scheduler)

系统的核心,包含四个部分:

  1. TaskScheduler: 消费者,从队列取任务并执行。
  2. ReceiveScheduler: 消费者,检查已完成的任务,执行用户回调。
  3. ListenDataScheduler: 消费者,检查数据变化,触发监听器。
  4. CleanupScheduler: 管理员,定期清理垃圾数据。

详细使用指南

消息优先级的使用

设置任务过期时间

错误处理与重试机制

如何扩张分片数

监听器的高级用法

API 文档

MessageItem

  • to_dict() -> Dict:转化为字典
  • to_dict_by_core() -> Dict:转换为核心库所需的格式(字符串化的 JSON
  • classmethod from_dict(data: Dict) -> MessageItem:从字典创建对象。
  • is_expired() -> bool:检查消息是否过期。

QueueScheduler

  • __init__(receive_thread_num=1, task_thread_num=1, shard_num=4)
  • start_queue_scheduler():启动所有后台线程。
  • stop_queue_scheduler():停止所有后台线程。
  • send_message(message: MessageItem, callback: Callable):发送任务。
  • update_listen_data(key: str, value: Any):更新监听数据。
  • get_listen_data(key: str) -> Any:获取监听数据当前值。

装饰器

  • @TaskMounter.task(meta: dict = {})
  • @ListenMounter.listener()

性能调优

  1. 分片 (shard_num):这是最重要的参数。根据你的工作负载和 CPU 核心数增加分片(例如 816可以显著提高入队/出队的吞吐量。
  2. 线程数task_thread_num 应根据任务类型I/O 密集型或 CPU 密集型)进行调整。
  3. SQLite 优化Rust 核心层已经设置了积极的 PRAGMA 选项WALNORMAL sync大缓存。除非非常了解否则不建议修改。
  4. 消息大小:保持 MessageItem.content 尽可能小,大的数据应该存储在其他地方(如 S3、数据库只在消息中存储引用 ID。

常见问题

Q任务函数执行失败了怎么办

A任务状态会被标记为 FAILED。根据 retry_count 和你的逻辑,你可以选择重新发送该消息

Q如何查询队列中当前的任务数量

A使用 scheduler.queue_operation.get_queue_length()

Q如何手动触发清理

A可以调用 scheduler.cleanup_scheduler.cleanup_expired_message(),但通常清理调度器会自动处理。

Q编译 Rust 扩展时出错?

A确保你的 Rust 工具链是最新的 (rustup update)。在 Windows 上,你可能需要安装 Microsoft C++ Build Tools。

Qdestination 找不到对应的任务函数

A确保任务函数被正确装饰@TaskMounter.task)并且其 __name__destination 字符串完全一致。确保包含任务函数的模块在运行主程序前已被导入。

阶段规划

  1. 完善队列功能:优化重试和过期机制
  2. 增加其他调度器类型:当前调度器类型是基于 python 线程池实现的,后续可能增加携程版本和 PySide6 QThead 版本等
  3. 持续 rust 化:目前 rust 只处理 sqlite 相关操作,后续调度器可能由 Rust 实现
  4. 增加对其他语言的支持:当前仅仅提供 python 相关接口,后续可能为了 Rust、JavaScript 提供接口。