commit a5043531c075d490f0b02e71c3ad02a91dee2877 Author: chakcy <947105045@qq.com> Date: Sun Sep 14 01:33:43 2025 +0800 添加 Home diff --git a/Home.md b/Home.md new file mode 100644 index 0000000..272083b --- /dev/null +++ b/Home.md @@ -0,0 +1,190 @@ +# SQLite 任务队列 (Queue-SQLite) Wiki + +## 概述 + +`queue-sqlite` 是一个基于 **SQLite** 数据库构建的、**高性能**、**轻量级** 的分布式任务队列库。其核心由 **Rust** 编写(通过 PyO3 提供 Python 绑定),确保了消息操作的极致性能与线程安全,而上层应用则使用 Python,提供了灵活易用的 API。 + +它非常适合需要处理大量异步任务、需要任务持久化、或者构建松耦合应用的场景。 + +## 核心特性 + +- **高性能核心**:使用 Rust 处理核心的队列操作(入队、出队),并进行了 SQLite 性能优化(WAL 模式、内存缓存等)。 +- **任务持久化**:所有任务消息都存储在 SQLite 数据库中,服务重启后不会丢失。 +- **优先级队列**:支持为任务设置优先级(低、普通、高、紧急),确保重要任务优先处理。 +- **任务重试**:内置任务重试机制。 +- **过期清理**:支持设置任务过期时间,并自动清理已完成/失败的历史任务。 +- **监听器模式**:提供了灵活的监听器(Listener)机制,可以响应特定的数据变更事件。 +- **优雅的 API**:通过装饰器(`@task`,`@listener`)轻松定义任务和监听器 +- **水平扩展**:通过分片(Sharding)机制支持队列水平扩展,提高并发处理能力。 + +## 目录 + +[TOC] + +## 核心架构 + +该项目采用分层架构: + +1. **Rust Core Layer (`./src/core/`)**:提供最基础队列操作。 + - `QueueOperation`:直接与 SQLite 交互,处理消息的 CRUD。 + - `TaskMounter`:直接在 Rust 层面获取 Python 中挂载的任务函数(桥接作用)。 +2. **Python Core Layer (`./src/queue_sqlite/core/`)**:对 Rust Core 的封装。 + - `core`: 自动生成的 `PyO3` 模块,是调用 Rust 代码的入口。 +3. **Model Layer (`./src/queue_sqlite/model`)**:定义了系统的数据模型。 + - `MessageItem`:任务消息的数据类,包含任务的所有元信息和内容。 +4. **Mounter Layer (`./src/queue_sqlite/mounter`)**:任务和监听器的挂载点。 + - `TaskMounter`:用于挂载和获取任务函数 + - `ListenMounter`:用于挂载和获取监听器函数 +5. **Operation Layer (`./src/queue_sqlite/queue_operation`)**:高级队列操作。 + - `QueueOperation`:实现了分片逻辑,管理多个 SQLite 数据库连接。 + - `ListenOperation`:管理监听器的数据存储和变更追踪。 +6. **Scheduler Layer (`./src/queue_sqlite/scheduler`)**:核心调度逻辑。 + - `TaskScheduler`:从队列中取出任务并执行。 + - `ReceiveScheduler`:轮询已完成的任务,并执行用户回调。 + - `ListenDataScheduler`:监听数据变化并触发对应的监听器。 + - `CleanupScheduler`:定期清理过期的旧消息。 + - `QueueScheduler`:总调度器,整合以上所有调度器。 +7. **Constant Definitions (`./src/queue_sqlite/constant`)**:枚举产量定义 + +## 安装 + +目前提供了 windows python 3.13 版本的 whl 文件可在版本中下载,其他版本的需要编译对应 python 版本的 core 模块后再进行安装,下面将演示如何进行编译并完成安装 + +### 前提条件 + +- **Python**:3.8+ +- **Rust Toolchain**:安装最新版本的 [Rust](https://www.rust-lang.org/tools/install)(用于编译 Rust 扩展)。 + +### 从源码安装 + +1. 初始化项目 + + ```bash + mkdir demo + cd demo + python -m venv .venv + .venv\Scripts\activate + mkdir lib + ``` + +2. 克隆仓库 + + ```bash + cd lib + git clone http://124.71.68.6:3000/chakcy_code_repository/queue_sqlite.git + 或 + git clone https://gitee.com/cai-xinpenge/queue_sqlite.git + cd queue_sqlite + ``` + +3. 安装依赖 + + ```bash + pip install requirements.txt + ``` + +4. 编译 Rust 模块 + + ```bash + cd src/core + maturin develop --release + # 先删除原有的 core.pyd 再将生成的 core.dll 移动并更名到 src/queue_sqlite/core 中 + rm ..\queue_sqlite\core\core.pyd + mv .\target\release\core.dll ..\queue_sqlite\core\core.pyd + ``` + +5. 安装 + + 安装打包工具 + + ```bash + pip install --upgrade build + ``` + + 在打包前需要修改 pyproject.toml 文件中 require-python 参数,将该参数修改为你当前的 python 版本 + + 生成 whl 文件 + + ```bash + cd ../../ + python -m build --wheel + ``` + + 安装 queue_sqlite + + ```bash + pip install dist/queue_sqlite-x.x.x-py3-none-any.whl + ``` + +## 快速开始 + +### 1. 定义一个任务 + +创建 `my_tasks.py` + +```python +from queue_sqlite.mounter.task_mounter import TaskMounter +from queue_sqlite.model import MessageItem + +@TaskMounter.task(meta={"author": "dev"}) # 使用装饰器挂载任务 +def process_image(message_item: MessageItem): + """一个处理图片的任务""" + image_data = message_item.content + # ... 你的处理逻辑 ... + print(f"Processing image: {image_data['url']}") + result = {"status": "success", "size": [100, 200]} + return result +``` + +### 2. 定义一个监听器(可选) + +创建 `my_listeners.py` + +```python +from queue_sqlite.mounter.listen_mounter import ListenMounter + +@ListenMounter.listener() # 使用装饰器挂载监听器 +def on_config_changed(new_value): + """监听配置变化的监听器""" + print(f"Config updated to: {new_value}") +``` + +### 3. 使用队列调度器 + +创建 `main.py` + +```python +from queue_sqlite.scheduler import QueueScheduler +from queue_sqlite.model import MessageItem +from my_tasks import process_image + +# 初始化调度器 +# receive_thread_num: 处理回调的线程数 +# task_thread_num: 处理任务的线程数 +# shard_num: 数据库分片数 +scheduler = QueueScheduler(receive_thread_num=2, task_thread_num=4, shard_num=4) + +def my_callback(completed_message: MessageItem): + """任务完成后的回调函数""" + print(f"Task {completed_message.id} finished!") + print(f"Result: {completed_message.result}") + +# 准备消息 +message = MessageItem( + content={"url": "https://example.com/image.jpg"}, + destination="process_image" # 必须与任务函数名一致 +) + +# 启动所有调度线程 +scheduler.start_queue_scheduler() + +# 发送消息到队列 +scheduler.send_message(message, my_callback) + +# 更新监听数据,会触发 `on_config_changed` 监听器 +scheduler.update_listen_data("on_config_changed", "dark_mode=true") + +# ... 主程序继续运行 ... +# 当需要关闭时: +# scheduler.stop_queue_scheduler() +``` \ No newline at end of file