添加 Home

chakcy 2025-09-14 01:33:43 +08:00
commit a5043531c0

190
Home.md Normal file

@ -0,0 +1,190 @@
# SQLite 任务队列 (Queue-SQLite) Wiki
## 概述
`queue-sqlite` 是一个基于 **SQLite** 数据库构建的、**高性能**、**轻量级** 的分布式任务队列库。其核心由 **Rust** 编写(通过 PyO3 提供 Python 绑定),确保了消息操作的极致性能与线程安全,而上层应用则使用 Python提供了灵活易用的 API。
它非常适合需要处理大量异步任务、需要任务持久化、或者构建松耦合应用的场景。
## 核心特性
- **高性能核心**:使用 Rust 处理核心的队列操作(入队、出队),并进行了 SQLite 性能优化WAL 模式、内存缓存等)。
- **任务持久化**:所有任务消息都存储在 SQLite 数据库中,服务重启后不会丢失。
- **优先级队列**:支持为任务设置优先级(低、普通、高、紧急),确保重要任务优先处理。
- **任务重试**:内置任务重试机制。
- **过期清理**:支持设置任务过期时间,并自动清理已完成/失败的历史任务。
- **监听器模式**提供了灵活的监听器Listener机制可以响应特定的数据变更事件。
- **优雅的 API**:通过装饰器(`@task``@listener`)轻松定义任务和监听器
- **水平扩展**通过分片Sharding机制支持队列水平扩展提高并发处理能力。
## 目录
[TOC]
## 核心架构
该项目采用分层架构:
1. **Rust Core Layer (`./src/core/`)**:提供最基础队列操作。
- `QueueOperation`:直接与 SQLite 交互,处理消息的 CRUD。
- `TaskMounter`:直接在 Rust 层面获取 Python 中挂载的任务函数(桥接作用)。
2. **Python Core Layer (`./src/queue_sqlite/core/`)**:对 Rust Core 的封装。
- `core`: 自动生成的 `PyO3` 模块,是调用 Rust 代码的入口。
3. **Model Layer (`./src/queue_sqlite/model`)**:定义了系统的数据模型。
- `MessageItem`:任务消息的数据类,包含任务的所有元信息和内容。
4. **Mounter Layer (`./src/queue_sqlite/mounter`)**:任务和监听器的挂载点。
- `TaskMounter`:用于挂载和获取任务函数
- `ListenMounter`:用于挂载和获取监听器函数
5. **Operation Layer (`./src/queue_sqlite/queue_operation`)**:高级队列操作。
- `QueueOperation`:实现了分片逻辑,管理多个 SQLite 数据库连接。
- `ListenOperation`:管理监听器的数据存储和变更追踪。
6. **Scheduler Layer (`./src/queue_sqlite/scheduler`)**:核心调度逻辑。
- `TaskScheduler`:从队列中取出任务并执行。
- `ReceiveScheduler`:轮询已完成的任务,并执行用户回调。
- `ListenDataScheduler`:监听数据变化并触发对应的监听器。
- `CleanupScheduler`:定期清理过期的旧消息。
- `QueueScheduler`:总调度器,整合以上所有调度器。
7. **Constant Definitions (`./src/queue_sqlite/constant`)**:枚举产量定义
## 安装
目前提供了 windows python 3.13 版本的 whl 文件可在版本中下载,其他版本的需要编译对应 python 版本的 core 模块后再进行安装,下面将演示如何进行编译并完成安装
### 前提条件
- **Python**3.8+
- **Rust Toolchain**:安装最新版本的 [Rust](https://www.rust-lang.org/tools/install)(用于编译 Rust 扩展)。
### 从源码安装
1. 初始化项目
```bash
mkdir demo
cd demo
python -m venv .venv
.venv\Scripts\activate
mkdir lib
```
2. 克隆仓库
```bash
cd lib
git clone http://124.71.68.6:3000/chakcy_code_repository/queue_sqlite.git
git clone https://gitee.com/cai-xinpenge/queue_sqlite.git
cd queue_sqlite
```
3. 安装依赖
```bash
pip install requirements.txt
```
4. 编译 Rust 模块
```bash
cd src/core
maturin develop --release
# 先删除原有的 core.pyd 再将生成的 core.dll 移动并更名到 src/queue_sqlite/core 中
rm ..\queue_sqlite\core\core.pyd
mv .\target\release\core.dll ..\queue_sqlite\core\core.pyd
```
5. 安装
安装打包工具
```bash
pip install --upgrade build
```
在打包前需要修改 pyproject.toml 文件中 require-python 参数,将该参数修改为你当前的 python 版本
生成 whl 文件
```bash
cd ../../
python -m build --wheel
```
安装 queue_sqlite
```bash
pip install dist/queue_sqlite-x.x.x-py3-none-any.whl
```
## 快速开始
### 1. 定义一个任务
创建 `my_tasks.py`
```python
from queue_sqlite.mounter.task_mounter import TaskMounter
from queue_sqlite.model import MessageItem
@TaskMounter.task(meta={"author": "dev"}) # 使用装饰器挂载任务
def process_image(message_item: MessageItem):
"""一个处理图片的任务"""
image_data = message_item.content
# ... 你的处理逻辑 ...
print(f"Processing image: {image_data['url']}")
result = {"status": "success", "size": [100, 200]}
return result
```
### 2. 定义一个监听器(可选)
创建 `my_listeners.py`
```python
from queue_sqlite.mounter.listen_mounter import ListenMounter
@ListenMounter.listener() # 使用装饰器挂载监听器
def on_config_changed(new_value):
"""监听配置变化的监听器"""
print(f"Config updated to: {new_value}")
```
### 3. 使用队列调度器
创建 `main.py`
```python
from queue_sqlite.scheduler import QueueScheduler
from queue_sqlite.model import MessageItem
from my_tasks import process_image
# 初始化调度器
# receive_thread_num: 处理回调的线程数
# task_thread_num: 处理任务的线程数
# shard_num: 数据库分片数
scheduler = QueueScheduler(receive_thread_num=2, task_thread_num=4, shard_num=4)
def my_callback(completed_message: MessageItem):
"""任务完成后的回调函数"""
print(f"Task {completed_message.id} finished!")
print(f"Result: {completed_message.result}")
# 准备消息
message = MessageItem(
content={"url": "https://example.com/image.jpg"},
destination="process_image" # 必须与任务函数名一致
)
# 启动所有调度线程
scheduler.start_queue_scheduler()
# 发送消息到队列
scheduler.send_message(message, my_callback)
# 更新监听数据,会触发 `on_config_changed` 监听器
scheduler.update_listen_data("on_config_changed", "dark_mode=true")
# ... 主程序继续运行 ...
# 当需要关闭时:
# scheduler.stop_queue_scheduler()
```