init chongming fastapi 脚手架

This commit is contained in:
chakcy 2026-01-31 16:00:12 +08:00
parent aed8792664
commit 4b4772ac6e
42 changed files with 1785 additions and 0 deletions

6
cookiecutter.json Normal file
View File

@ -0,0 +1,6 @@
{
"project_name": "My Project",
"author_name": "Your Name",
"version": "0.1.0",
"python_version": "3.8"
}

View File

@ -0,0 +1,13 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
logs/
resources/
output.md

View File

@ -0,0 +1 @@
{{ cookiecutter.python_version }}

View File

@ -0,0 +1,9 @@
MIT License
Copyright (c) 2026 chakcy
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1,44 @@
# {{ cookiecutter.project_name }}
[TOC]
作者: {{ cookiecutter.author_name }}
版本: {{ cookiecutter.version }}
## 运行项目
uv 环境
```shell
uv sync
uv pip install -e .
source .venv/bin/activate # windwows 命令为 .venv\Scripts\activate
cm_main
```
纯 python 环境
```shell
python -m venv .venv
source .venv/bin/activate # windwows 命令为 .venv\Scripts\activate
pip install -r requirements.txt
pip install -e .
cm_main
```
## 打包项目
```shell
cm_build
```
打包出来的文件在 build 目录下
```text
build
├── resources
│   ├── {{ cookiecutter.project_name }}.ico
│   ├── plugins.mbank
│   └── app.mbank
├── config.toml
└── {{ cookiecutter.project_name }} # windows {{ cookiecutter.project_name }}.exe
```

View File

@ -0,0 +1,86 @@
[default]
app.name = "{{ cookiecutter.project_name }}"
app.version = "{{ cookiecutter.author_name }}"
app.description = "基于 FastAPI 和 Tortoise-ORM 的 chongming 打包脚手架"
app.debug = false
# env = "production"
[server]
host = "0.0.0.0"
port = 8000
reload = false
workers = 4
[database]
url = "sqlite://db.sqlite3"
# PostgreSQL 示例: "postgres://user:pass@localhost:5432/dbname"
# MySQL 示例: "mysql://user:pass@localhost:3306/dbname"
generate_schemas = true # 生产环境使用迁移工具
[security]
secret_key = "your-secret-key-change-in-production"
algorithm = "HS256"
access_token_expire_minutes = 30
[development]
env = "development"
debug = true
[development.server]
host = "127.0.0.1"
port = 8000
reload = true
# workers = 4
[development.database]
url = "sqlite://dev.db"
generate_schemas = true
[development.logging]
level = "DEBUG"
file = "logs/dev.log"
[development.cors]
allow_origins = ["*"]
allow_credentials = true
allow_methods = ["*"]
allow_headers = ["*"]
[production]
env = "production"
debug = false
[production.server]
host = "0.0.0.0"
port = 8000
reload = false
workers = 1
[production.database]
url = "sqlite://prod.db"
generate_schemas = true
[production.logging]
level = "INFO"
file = "logs/prod.log"
[production.cors]
allow_origins = ["https://yourdomain.com"]
allow_credentials = true
allow_methods = ["GET", "POST", "PUT", "DELETE"]
allow_headers = ["Authorization", "Content-Type"]
[production.module_system]
type = "module-bank"
path = ["resources/app.mbank", "resources/plugins.mbank"]
[production.file_system]
type = "svfs"
path = "resources/application.vfs"
[redis]
enabled = false
url = "redis://localhost:6379"
[cache]
default_ttl = 300

View File

@ -0,0 +1,74 @@
# main.py - 修复 reload 问题
#!/usr/bin/env python3
"""
FastAPI + Tortoise-ORM + Chongming 示例
主入口文件适配 chongming 打包系统
"""
import sys
import os
from utils.launch import launch
from multiprocessing import freeze_support
def app_run(app_config: dict, default_config: dict):
"""
应用运行函数 - 适配模块银行环境
Args:
app_config: 应用配置
default_config: 默认配置
"""
import uvicorn
from app.logger import setup_logging, get_logger # type: ignore
setup_logging(app_config["logging"])
logger = get_logger("app")
# 获取服务器配置
server_config = app_config.get("server", {})
host = server_config.get("host", "0.0.0.0")
port = server_config.get("port", 8000)
# 打包环境强制禁用重载
is_frozen = getattr(sys, 'frozen', False)
reload = False if is_frozen else server_config.get("reload", False)
workers = server_config.get("workers", 4)
# 显示启动信息
env = app_config.get("env", "development")
logger.info("=" * 50)
logger.info(f"🚀 启动 {default_config.get('app.name', 'FastAPI 应用')}")
logger.info(f"📦 版本: {default_config.get('app.version', '1.0.0')}")
logger.info(f"🌍 环境: {env}")
logger.info(f"📍 地址: http://{host}:{port}")
logger.info(f"📚 文档: http://{host}:{port}/docs")
logger.info(f"🔧 调试: {'启用' if app_config.get('debug', False) else '禁用'}")
logger.info(f"🔄 重载: {'启用' if reload else '禁用'}")
logger.info(f"👥 Workers: {workers}")
logger.info("=" * 50)
# 关键修改:根据 reload 模式选择不同的启动方式
uvicorn.run(
"app:app",
host=host,
port=port,
reload=reload,
workers=workers,
log_level="debug" if app_config.get("debug", False) else "info",
access_log=True,
)
def main():
freeze_support()
# 如果是打包后的可执行文件,调整路径
if getattr(sys, 'frozen', False):
exe_dir = os.path.dirname(sys.executable)
if exe_dir not in sys.path:
sys.path.insert(0, exe_dir)
# 启动应用
launch(app_run, "development")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,82 @@
# config_loader.py - 读取config.toml的示例代码
import toml
import os
from pathlib import Path
from typing import Any, Dict, Optional
class ConfigLoader:
"""TOML配置文件加载器"""
def __init__(self, config_path: Optional[str] = None):
"""
初始化配置加载器
Args:
config_path: 配置文件路径如果为None则自动查找
"""
if config_path is None:
# 自动查找配置文件
config_path = self._find_config_file()
self.config_path = Path(config_path)
self._config: Dict[str, Any] = {}
self.load()
def _find_config_file(self) -> str:
"""自动查找配置文件"""
possible_paths = [
"config.toml",
"config/config.toml",
"~/.config/myapp/config.toml",
"/etc/myapp/config.toml"
]
for path in possible_paths:
expanded_path = os.path.expanduser(path)
if os.path.exists(expanded_path):
return expanded_path
raise FileNotFoundError("未找到配置文件")
def load(self) -> None:
"""加载配置文件"""
if not self.config_path.exists():
raise FileNotFoundError(f"配置文件不存在: {self.config_path}")
with open(self.config_path, 'r', encoding='utf-8') as f:
self._config = toml.load(f)
def get(self, key: str, default: Any = None) -> Any:
"""获取配置值,支持点号分隔的路径"""
keys = key.split('.')
value = self._config
try:
for k in keys:
value = value[k]
return value
except (KeyError, TypeError):
return default
def get_section(self, section: str) -> Dict[str, Any]:
"""获取整个配置节"""
return self._config.get(section, {})
def reload(self) -> None:
"""重新加载配置文件"""
self.load()
def __getitem__(self, key: str) -> Any:
"""支持字典式访问"""
return self.get(key)
def __contains__(self, key: str) -> bool:
"""检查配置是否存在"""
return self.get(key) is not None
@property
def all(self) -> Dict[str, Any]:
"""获取所有配置"""
return self._config

View File

@ -0,0 +1,49 @@
import sys
from .config import ConfigLoader
import argparse
import os
from module_bank import PythonToSQLite
from typing import Callable
# 检查是否在 PyInstaller 子进程中
def is_pyinstaller_child():
return len(sys.argv) > 1 and ('_child.py' in sys.argv[0] or sys.argv[1].isdigit())
def parse_args():
"""安全的参数解析函数"""
if is_pyinstaller_child():
# PyInstaller 子进程,使用默认参数
return argparse.Namespace(config="config.toml")
parser = argparse.ArgumentParser(description="chongming 应用启动脚本")
parser.add_argument("--config", default="config.toml", help="配置文件路径")
return parser.parse_args()
args = parse_args()
def init_module_bank(config):
MODULE_BANK_PATH_LIST = config["production"]["module_system"]["path"]
for module_path in MODULE_BANK_PATH_LIST:
if os.path.exists(module_path) == False:
raise ValueError(f"模块路径不存在: {module_path}")
packer_list = [PythonToSQLite(module_path) for module_path in MODULE_BANK_PATH_LIST]
for packer in packer_list:
modules = packer.list_modules()
for module in modules:
print(f"{module['module_name']} {'[包]' if module['is_package'] else ''}")
packer.install_importer()
def launch(run_fun: Callable[[dict, dict], None], run_mode: str):
config = ConfigLoader(args.config)
# 获取对应运行模式的配置
default_config = config.get("default", {})
run_mode = default_config.get("env", "development")
app_config = config.get(run_mode, {})
if run_mode == "production":
init_module_bank(config)
run_fun(app_config, default_config)

View File

@ -0,0 +1,42 @@
[project]
name = "{{ cookiecutter.project_name }}"
version = "0.1.0"
description = "基于 modules-bank 的 fastapi 打包脚手架"
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"aerich",
"fastapi[uvicorn]",
"module-bank",
"passlib",
"pydantic-settings",
"pydantic[email]",
"sqlite-vfs",
"toml",
"tortoise-orm",
"uvicorn[standard]",
]
[[tool.uv.index]]
url = "https://mirrors.ustc.edu.cn/pypi/web/simple/"
[tool.uv.workspace]
members = [
".",
]
[tool.uv.sources]
{{ cookiecutter.project_name }} = { workspace = true }
[dependency-groups]
dev = [
"{{ cookiecutter.project_name }}",
"pyinstaller>=6.18.0",
]
[tool.hatch.build.targets.wheel]
packages = ["src/{{ cookiecutter.project_name }}"]
[project.scripts]
cm_build = "{{ cookiecutter.project_name }}.scripts.build:main"
cm_main = "{{ cookiecutter.project_name }}:main"

View File

@ -0,0 +1,10 @@
aerich
fastapi[uvicorn]
module-bank==0.1.1
passlib
pydantic-settings
pydantic[email]
sqlite-vfs
toml
tortoise-orm
uvicorn[standard]

View File

@ -0,0 +1,74 @@
# main.py - 修复 reload 问题
#!/usr/bin/env python3
"""
FastAPI + Tortoise-ORM + Chongming 示例
主入口文件适配 chongming 打包系统
"""
import sys
import os
from .utils.launch import launch
from multiprocessing import freeze_support
def app_run(app_config: dict, default_config: dict):
"""
应用运行函数 - 适配模块银行环境
Args:
app_config: 应用配置
default_config: 默认配置
"""
import uvicorn
from .app.logger import setup_logging, get_logger
setup_logging(app_config["logging"])
logger = get_logger("app")
# 获取服务器配置
server_config = app_config.get("server", {})
host = server_config.get("host", "0.0.0.0")
port = server_config.get("port", 8000)
# 打包环境强制禁用重载
is_frozen = getattr(sys, 'frozen', False)
reload = False if is_frozen else server_config.get("reload", False)
workers = server_config.get("workers", 4)
# 显示启动信息
env = app_config.get("env", "development")
logger.info("=" * 50)
logger.info(f"🚀 启动 {default_config.get('app.name', 'FastAPI 应用')}")
logger.info(f"📦 版本: {default_config.get('app.version', '1.0.0')}")
logger.info(f"🌍 环境: {env}")
logger.info(f"📍 地址: http://{host}:{port}")
logger.info(f"📚 文档: http://{host}:{port}/docs")
logger.info(f"🔧 调试: {'启用' if app_config.get('debug', False) else '禁用'}")
logger.info(f"🔄 重载: {'启用' if reload else '禁用'}")
logger.info(f"👥 Workers: {workers}")
logger.info("=" * 50)
# 关键修改:根据 reload 模式选择不同的启动方式
uvicorn.run(
"{{ cookiecutter.project_name }}.app:app",
host=host,
port=port,
reload=reload,
workers=workers,
log_level="debug" if app_config.get("debug", False) else "info",
access_log=True,
)
def main():
freeze_support()
# 如果是打包后的可执行文件,调整路径
if getattr(sys, 'frozen', False):
exe_dir = os.path.dirname(sys.executable)
if exe_dir not in sys.path:
sys.path.insert(0, exe_dir)
# 启动应用
launch(app_run, "development")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,98 @@
"""
FastAPI 应用包
"""
__version__ = "1.0.0"
"""
FastAPI 应用实例
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from .logger import setup_logging, get_logger
from .core.config import get_settings
from .core.database import register_db, init_db, close_db
from .api import api_router
# 获取配置
settings = get_settings()
logger = get_logger("app")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
应用生命周期管理
注意在生产环境的模块银行中数据库初始化可能失败
这里使用 try-except 确保应用能够正常启动
"""
logger.info(f"🚀 启动 {settings.app_name} v{settings.app_version}")
logger.info(f"🌍 环境: {settings.env}")
# 初始化数据库(生产环境可能跳过)
try:
await init_db(settings.env)
except Exception as e:
logger.warning(f"⚠️ 数据库初始化失败: {e}")
logger.warning("💡 提示:应用将继续运行,但数据库功能可能受限")
yield
# 关闭数据库连接
try:
await close_db()
except Exception as e:
logger.warning(f"⚠️ 数据库关闭失败: {e}")
logger.info("👋 关闭应用")
# 创建 FastAPI 应用实例
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
description=settings.description,
docs_url="/docs" if settings.debug else None,
redoc_url="/redoc" if settings.debug else None,
openapi_url="/openapi.json" if settings.debug else None,
lifespan=lifespan,
)
# 添加 CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_allow_origins,
allow_credentials=settings.cors_allow_credentials,
allow_methods=settings.cors_allow_methods,
allow_headers=settings.cors_allow_headers,
)
# 注册数据库
app = register_db(app, settings.env)
# 注册路由
app.include_router(api_router, prefix="/api/v1")
# 根路由
@app.get("/", summary="根路径")
async def root():
"""根路径,返回应用信息"""
return {
"message": f"欢迎使用 {settings.app_name}",
"version": settings.app_version,
"docs": "/docs" if settings.debug else None,
"health": "/api/v1/health",
}
# 全局异常处理器
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
"""全局异常处理器"""
logger.error(f"全局异常: {exc}")
return {
"error": "内部服务器错误",
"detail": str(exc) if settings.debug else "请查看服务器日志"
}
logger.info("✅ FastAPI 应用初始化完成")

View File

@ -0,0 +1,16 @@
"""
API 路由
"""
from fastapi import APIRouter
from .routers import health, items, users
# 创建主路由
api_router = APIRouter()
# 注册子路由
api_router.include_router(health.router, prefix="/health", tags=["health"])
api_router.include_router(items.router, prefix="/items", tags=["items"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
# 导出路由
__all__ = ["api_router"]

View File

@ -0,0 +1,31 @@
"""
API 依赖项
"""
from typing import AsyncGenerator
from fastapi import Depends, HTTPException, status
from tortoise.exceptions import DBConnectionError, DoesNotExist
from tortoise import Tortoise
async def get_db() -> AsyncGenerator[None, None]:
"""获取数据库连接依赖"""
try:
# 检查数据库连接
conn = Tortoise.get_connection("default")
# 执行简单查询验证连接
await conn.execute_query("SELECT 1")
yield
except DBConnectionError as e:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="数据库连接失败"
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"数据库错误: {str(e)}"
)
def get_current_user():
"""获取当前用户(示例)"""
# 这里可以实现 JWT 验证等逻辑
pass

View File

@ -0,0 +1,44 @@
"""
健康检查路由
"""
from fastapi import APIRouter, Depends
from ...core.config import get_settings
from ..dependencies import get_db
router = APIRouter()
@router.get("/", summary="健康检查")
async def health_check(db = Depends(get_db)):
"""健康检查端点"""
settings = get_settings()
return {
"status": "healthy",
"service": settings.app_name,
"version": settings.app_version,
"environment": settings.env,
"timestamp": "now" # 实际使用中可以用 datetime.now()
}
@router.get("/ready", summary="就绪检查")
async def readiness_check():
"""就绪检查端点"""
return {
"status": "ready",
"database": "connected",
"api": "available"
}
@router.get("/info", summary="服务信息")
async def service_info():
"""服务信息端点"""
settings = get_settings()
return {
"name": settings.app_name,
"version": settings.app_version,
"description": settings.description,
"environment": settings.env,
"debug": settings.debug,
"database": settings.database_url.split("://")[0] if "://" in settings.database_url else "unknown"
}

View File

@ -0,0 +1,93 @@
"""
Item 路由
"""
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from tortoise.exceptions import DoesNotExist
from ...models.item import Item
from ...schemas.item import ItemCreate, ItemUpdate, ItemResponse
from ..dependencies import get_db
router = APIRouter()
@router.get("/", response_model=List[ItemResponse], summary="获取物品列表")
async def read_items(
skip: int = 0,
limit: int = 100,
db=Depends(get_db)
):
"""获取物品列表"""
items = await Item.all().offset(skip).limit(limit)
return items
@router.get("/{item_id}", response_model=ItemResponse, summary="获取物品详情")
async def read_item(item_id: int, db=Depends(get_db)):
"""根据ID获取物品"""
try:
item = await Item.get(id=item_id)
return item
except DoesNotExist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"物品 ID {item_id} 不存在"
)
@router.post("/", response_model=ItemResponse, status_code=status.HTTP_201_CREATED, summary="创建物品")
async def create_item(item: ItemCreate, db=Depends(get_db)):
"""创建新物品"""
# 检查名称是否已存在
existing_item = await Item.filter(name=item.name).first()
if existing_item:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"物品名称 '{item.name}' 已存在"
)
# 创建物品
item_dict = item.model_dump()
new_item = await Item.create(**item_dict)
return new_item
@router.put("/{item_id}", response_model=ItemResponse, summary="更新物品")
async def update_item(item_id: int, item_update: ItemUpdate, db=Depends(get_db)):
"""更新物品信息"""
try:
item = await Item.get(id=item_id)
# 更新字段
update_data = item_update.model_dump(exclude_unset=True)
# 如果更新名称,检查是否与其他物品重名
if "name" in update_data:
existing_item = await Item.filter(
name=update_data["name"]
).exclude(id=item_id).first()
if existing_item:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"物品名称 '{update_data['name']}' 已存在"
)
# 应用更新
await item.update_from_dict(update_data)
await item.save()
return item
except DoesNotExist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"物品 ID {item_id} 不存在"
)
@router.delete("/{item_id}", status_code=status.HTTP_204_NO_CONTENT, summary="删除物品")
async def delete_item(item_id: int, db=Depends(get_db)):
"""删除物品"""
try:
item = await Item.get(id=item_id)
await item.delete()
return None
except DoesNotExist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"物品 ID {item_id} 不存在"
)

View File

@ -0,0 +1,85 @@
"""
User 路由
"""
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from tortoise.exceptions import DoesNotExist, IntegrityError
from ...models.user import User
from ...schemas.user import UserCreate, UserUpdate, UserResponse
from ...services.user_service import UserService
from ..dependencies import get_db
router = APIRouter()
@router.get("/", response_model=List[UserResponse], summary="获取用户列表")
async def read_users(
skip: int = 0,
limit: int = 100,
db=Depends(get_db)
):
"""获取用户列表"""
users = await UserService.get_all_users(skip, limit)
return users
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED, summary="创建用户")
async def create_user(user_data: UserCreate, db=Depends(get_db)):
"""创建新用户"""
# 检查用户名是否已存在
existing_user = await UserService.get_user_by_username(user_data.username)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"用户名 '{user_data.username}' 已存在"
)
# 检查邮箱是否已存在
existing_email = await UserService.get_user_by_email(user_data.email)
if existing_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"邮箱 '{user_data.email}' 已存在"
)
# 创建用户
try:
user = await UserService.create_user(user_data)
return user
except IntegrityError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="创建用户失败"
)
@router.get("/{user_id}", response_model=UserResponse, summary="获取用户详情")
async def read_user(user_id: int, db=Depends(get_db)):
"""根据ID获取用户"""
user = await UserService.get_user_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"用户 ID {user_id} 不存在"
)
return user
@router.put("/{user_id}", response_model=UserResponse, summary="更新用户")
async def update_user(user_id: int, user_update: UserUpdate, db=Depends(get_db)):
"""更新用户信息"""
user = await UserService.update_user(user_id, user_update)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"用户 ID {user_id} 不存在"
)
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT, summary="删除用户")
async def delete_user(user_id: int, db=Depends(get_db)):
"""删除用户"""
deleted = await UserService.delete_user(user_id)
if not deleted:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"用户 ID {user_id} 不存在"
)
return None

View File

@ -0,0 +1,3 @@
"""
核心模块
"""

View File

@ -0,0 +1,135 @@
"""
配置管理模块
基于 Pydantic Settings TOML 文件
"""
from typing import Optional, Dict, Any
from pydantic_settings import BaseSettings
from pydantic import Field
import toml
from pathlib import Path
import sys
import os
try:
from ...utils.launch import args
except:
from utils.launch import args # type: ignore
class Settings(BaseSettings):
"""应用配置"""
# 应用信息
app_name: str = "FastAPI Demo"
app_version: str = "1.0.0"
description: str = "FastAPI 应用"
# 服务器配置
host: str = "0.0.0.0"
port: int = 8000
reload: bool = False
workers: int = 4
# 数据库配置
database_url: str = "sqlite://db.sqlite3"
generate_schemas: bool = False
# 环境配置
env: str = "development"
debug: bool = False
# 日志配置
logging_level: str = "INFO"
logging_file: Optional[str] = None
# CORS 配置
cors_allow_origins: list = ["*"]
cors_allow_credentials: bool = True
cors_allow_methods: list = ["*"]
cors_allow_headers: list = ["*"]
# 安全配置
secret_key: str = "your-secret-key-change-in-production"
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# 模块系统
module_system_type: str = "local"
module_system_path: list = ["plugins", "app"]
class Config:
env_file = ".env"
extra = "ignore"
@classmethod
def from_toml(cls, config_path: Optional[str] = None) -> "Settings":
"""从 TOML 文件加载配置"""
if config_path is None:
# 尝试多个可能的配置路径
possible_paths = [
"config.toml",
"./config.toml",
"../config.toml",
Path(__file__).parent.parent.parent / "config.toml",
]
for path in possible_paths:
if Path(path).exists():
config_path = str(path)
print(f"📄 加载配置文件: {config_path}")
break
if config_path is None:
print("⚠️ 未找到配置文件,使用默认配置")
return cls()
try:
config_file = Path(config_path)
if not config_file.exists():
print(f"⚠️ 配置文件不存在: {config_path}")
return cls()
config_data = toml.load(config_file)
# 获取默认配置
default_config = config_data.get("default", {})
# 确定环境
env = default_config.get("env", "development")
# 获取环境特定配置
env_config = config_data.get(env, {})
# 合并配置
merged_config = {**default_config, **env_config}
# 处理嵌套配置
for section in ["server", "database", "security", "logging", "cors", "module_system"]:
if section in config_data:
if isinstance(config_data[section], dict):
merged_config.update(config_data[section])
# 从环境特定配置中覆盖
for section in ["server", "database", "security", "logging", "cors", "module_system"]:
env_section_key = f"{env}.{section}"
if env_section_key in config_data:
if isinstance(config_data[env_section_key], dict):
merged_config.update(config_data[env_section_key])
print(f"🌍 环境: {env}")
return cls(**merged_config)
except Exception as e:
print(f"❌ 加载配置失败: {e}")
import traceback
traceback.print_exc()
return cls()
# 全局配置实例
_settings = None
def get_settings() -> Settings:
"""获取配置实例(单例模式)"""
global _settings
if _settings is None:
_settings = Settings.from_toml(args.config)
return _settings

View File

@ -0,0 +1,103 @@
"""
Tortoise-ORM 数据库配置
"""
from typing import Optional
from tortoise import Tortoise
from tortoise.contrib.fastapi import register_tortoise
from .config import get_settings
async def init_db(run_mode: str):
"""初始化数据库连接"""
settings = get_settings()
if run_mode == "production":
db_config = {
"connections": {
"default": settings.database_url
},
"apps": {
"models": {
"models": ["app.models", "aerich.models"],
"default_connection": "default",
}
},
"use_tz": False,
"timezone": "UTC",
}
elif run_mode == "development":
db_config = {
"connections": {
"default": settings.database_url
},
"apps": {
"models": {
"models": ["{{ cookiecutter.project_name }}.app.models", "aerich.models"],
"default_connection": "default",
}
},
"use_tz": False,
"timezone": "UTC",
}
else:
raise ValueError("Invalid run mode")
await Tortoise.init(db_config)
# 根据配置决定是否生成表结构
if settings.generate_schemas:
print("🗃️ 生成数据库表结构...")
await Tortoise.generate_schemas()
print("✅ 数据库表结构生成完成")
else:
print("⏭️ 跳过数据库表结构生成(建议使用迁移工具)")
print(f"✅ 数据库连接成功: {settings.database_url}")
async def close_db():
"""关闭数据库连接"""
await Tortoise.close_connections()
print("✅ 数据库连接已关闭")
def register_db(app, run_mode: str):
"""将数据库注册到 FastAPI 应用"""
settings = get_settings()
modules_path = "app.models" if run_mode == "production" else "{{ cookiecutter.project_name }}.app.models"
register_tortoise(
app,
db_url=settings.database_url,
modules={"models": [modules_path]},
generate_schemas=settings.generate_schemas,
add_exception_handlers=True,
)
return app
# 在 app/core/database.py 中添加
from aerich import Command
async def run_migrations():
"""运行数据库迁移"""
try:
command = Command(
tortoise_config={
"connections": {
"default": get_settings().database_url
},
"apps": {
"models": {
"models": ["app.models", "aerich.models"],
"default_connection": "default",
}
},
},
app="models"
)
# 检查并应用迁移
await command.init()
await command.upgrade()
print("✅ 数据库迁移完成")
except Exception as e:
print(f"⚠️ 数据库迁移失败: {e}")

View File

@ -0,0 +1,48 @@
import logging
from pathlib import Path
class ExcludeWatchFilesFilter(logging.Filter):
def filter(self, record):
# 如果日志记录来自 watchfiles 模块,则不记录(返回 False
return not record.name.startswith('watchfiles')
def setup_logging(log_config: dict):
log_level = getattr(logging, log_config["level"], logging.INFO)
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
# 创建文件夹
log_file_path = Path(log_config["file"])
log_file_dir = log_file_path.parent
if log_file_dir != Path("."):
log_file_dir.mkdir(parents=True, exist_ok=True)
# 设置特定日志级别的过滤
logging.getLogger("watchfiles").setLevel(logging.WARNING)
# 配置根日志记录器
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
# 创建格式化器
formatter = logging.Formatter(log_format)
# 创建文件处理器并添加过滤器
file_handler = logging.FileHandler(log_config["file"], encoding="utf-8")
file_handler.setFormatter(formatter)
file_handler.addFilter(ExcludeWatchFilesFilter())
# 创建控制台处理器并添加过滤器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.addFilter(ExcludeWatchFilesFilter())
# 添加处理器到根日志记录器
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
print(f"日志系统已配置 - 级别: {logging.getLevelName(log_level)}, 文件: {log_config['file']}")
def get_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
# 为每个logger添加过滤器
logger.addFilter(ExcludeWatchFilesFilter())
return logger

View File

@ -0,0 +1,8 @@
"""
数据库模型
"""
from .item import Item
from .user import User
# 导出所有模型,供 Tortoise-ORM 自动发现
__all__ = ["Item", "User"]

View File

@ -0,0 +1,40 @@
"""
Item 数据模型
"""
from tortoise.models import Model
from tortoise import fields
from datetime import datetime
class Item(Model):
"""物品模型"""
id = fields.IntField(pk=True, description="主键ID")
name = fields.CharField(max_length=100, description="名称")
description = fields.TextField(null=True, description="描述")
price = fields.DecimalField(max_digits=10, decimal_places=2, default=0.00, description="价格")
is_active = fields.BooleanField(default=True, description="是否激活")
# 关联用户(可选)
owner = fields.ForeignKeyField(
"models.User",
related_name="items",
null=True,
on_delete=fields.SET_NULL,
description="所有者"
)
# 时间戳
created_at = fields.DatetimeField(auto_now_add=True, description="创建时间")
updated_at = fields.DatetimeField(auto_now=True, description="更新时间")
def __str__(self):
return f"Item(id={self.id}, name={self.name}, price={self.price})"
class Meta:
table = "items"
table_description = "物品表"
ordering = ["-created_at"]
class PydanticMeta:
# Pydantic 模型配置
exclude = ["is_active"]

View File

@ -0,0 +1,36 @@
"""
User 数据模型
"""
from tortoise.models import Model
from tortoise import fields
from datetime import datetime
from .item import Item
class User(Model):
"""用户模型"""
id = fields.IntField(pk=True, description="主键ID")
username = fields.CharField(max_length=50, unique=True, description="用户名")
email = fields.CharField(max_length=100, unique=True, description="邮箱")
hashed_password = fields.CharField(max_length=128, description="哈希密码")
is_active = fields.BooleanField(default=True, description="是否激活")
is_superuser = fields.BooleanField(default=False, description="是否超级用户")
# 时间戳
created_at = fields.DatetimeField(auto_now_add=True, description="创建时间")
updated_at = fields.DatetimeField(auto_now=True, description="更新时间")
# 反向关系
items: fields.ReverseRelation["Item"]
def __str__(self):
return f"User(id={self.id}, username={self.username})"
class Meta:
table = "users"
table_description = "用户表"
ordering = ["-created_at"]
class PydanticMeta:
# Pydantic 模型配置
exclude = ["hashed_password"]

View File

@ -0,0 +1,11 @@
"""
Pydantic 模型
"""
from .item import ItemCreate, ItemUpdate, ItemResponse, ItemInDB
__all__ = [
"ItemCreate",
"ItemUpdate",
"ItemResponse",
"ItemInDB"
]

View File

@ -0,0 +1,37 @@
"""
Item Pydantic 模型
"""
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional
from decimal import Decimal
class ItemBase(BaseModel):
"""Item 基础模型"""
name: str = Field(..., min_length=1, max_length=100, description="物品名称")
description: Optional[str] = Field(None, description="物品描述")
price: Decimal = Field(Decimal(0.00), ge=0, description="物品价格")
class ItemCreate(ItemBase):
"""创建 Item 模型"""
pass
class ItemUpdate(BaseModel):
"""更新 Item 模型"""
name: Optional[str] = Field(None, min_length=1, max_length=100, description="物品名称")
description: Optional[str] = Field(None, description="物品描述")
price: Optional[Decimal] = Field(None, ge=0, description="物品价格")
class ItemInDB(ItemBase):
"""数据库中的 Item 模型"""
id: int
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class ItemResponse(ItemInDB):
"""Item 响应模型"""
pass

View File

@ -0,0 +1,28 @@
# app/schemas/user.py
from pydantic import BaseModel, EmailStr
from datetime import datetime
from typing import Optional
class UserBase(BaseModel):
username: str
email: EmailStr
class UserCreate(UserBase):
password: str
class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None
class UserInDB(UserBase):
id: int
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class UserResponse(UserInDB):
pass

View File

@ -0,0 +1,47 @@
# app/services/item_service.py
from typing import Optional, List
from ..models.item import Item
from ..schemas.item import ItemCreate, ItemUpdate
class ItemService:
@staticmethod
async def get_all_items(
skip: int = 0,
limit: int = 100,
owner_id: Optional[int] = None
) -> List[Item]:
query = Item.all()
if owner_id:
query = query.filter(owner_id=owner_id)
return await query.offset(skip).limit(limit)
@staticmethod
async def get_item_by_id(item_id: int) -> Optional[Item]:
return await Item.filter(id=item_id).first().prefetch_related("owner")
@staticmethod
async def create_item(item_data: ItemCreate, owner_id: Optional[int] = None) -> Item:
item = await Item.create(
**item_data.model_dump(),
owner_id=owner_id
)
return item
@staticmethod
async def update_item(item_id: int, item_data: ItemUpdate) -> Optional[Item]:
item = await ItemService.get_item_by_id(item_id)
if not item:
return None
update_data = item_data.model_dump(exclude_unset=True)
await item.update_from_dict(update_data)
await item.save()
return item
@staticmethod
async def delete_item(item_id: int) -> bool:
item = await ItemService.get_item_by_id(item_id)
if item:
await item.delete()
return True
return False

View File

@ -0,0 +1,67 @@
# app/services/user_service.py
from typing import Optional, List
from ..models.user import User
from ..schemas.user import UserCreate, UserUpdate
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class UserService:
@staticmethod
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
@staticmethod
async def get_all_users(skip: int = 0, limit: int = 100) -> List[User]:
return await User.all().offset(skip).limit(limit)
@staticmethod
async def get_user_by_id(user_id: int) -> Optional[User]:
return await User.filter(id=user_id).first()
@staticmethod
async def get_user_by_username(username: str) -> Optional[User]:
return await User.filter(username=username).first()
@staticmethod
async def get_user_by_email(email: str) -> Optional[User]:
return await User.filter(email=email).first()
@staticmethod
async def create_user(user_data: UserCreate) -> User:
hashed_password = UserService.get_password_hash(user_data.password)
user = await User.create(
username=user_data.username,
email=user_data.email,
hashed_password=hashed_password
)
return user
@staticmethod
async def update_user(user_id: int, user_data: UserUpdate) -> Optional[User]:
user = await UserService.get_user_by_id(user_id)
if not user:
return None
update_data = user_data.model_dump(exclude_unset=True)
if "password" in update_data:
update_data["hashed_password"] = UserService.get_password_hash(
update_data.pop("password")
)
await user.update_from_dict(update_data)
await user.save()
return user
@staticmethod
async def delete_user(user_id: int) -> bool:
user = await UserService.get_user_by_id(user_id)
if user:
await user.delete()
return True
return False

View File

@ -0,0 +1,124 @@
from module_bank import PythonToSQLite
import os
from pathlib import Path
import shutil
import subprocess
import sys
def run_pyinstaller():
"""运行 PyInstaller 打包成可执行文件"""
try:
# 构建 PyInstaller 命令
cmd = [
sys.executable, "-m", "PyInstaller",
"--onefile",
"--name", "{{ cookiecutter.project_name }}",
"--distpath", ".",
"--workpath", "./build_temp",
"--specpath", "./specs",
"--clean",
# 隐藏导入
"--hidden-import", "passlib",
"--hidden-import", "passlib.context",
"--hidden-import", "passlib.handlers.bcrypt",
"--hidden-import", "fastapi",
"--hidden-import", "fastapi.middleware.cors",
"--hidden-import", "pydantic_settings",
"--hidden-import", "tortoise",
"--hidden-import", "tortoise.contrib.fastapi",
"--hidden-import", "aerich",
"--hidden-import", "uvicorn",
# 复制包元数据 - 解决 importlib.metadata 问题
"--copy-metadata", "tortoise-orm",
"--copy-metadata", "aerich",
"--copy-metadata", "fastapi",
"--copy-metadata", "uvicorn",
"--copy-metadata", "pydantic",
"--copy-metadata", "pydantic-settings",
"--copy-metadata", "module-bank",
"--copy-metadata", "passlib",
"--copy-metadata", "sqlite-vfs",
"--copy-metadata", "toml",
# 收集整个包
"--collect-all", "tortoise-orm",
"--collect-all", "aerich",
# 禁用多进程优化
# "--multiprocessing-fork", # 明确指定使用 fork 模式
# 添加数据文件
# "--add-data", "resources:resources",
# 其他选项
"--icon", "../resources/{{ cookiecutter.project_name }}.ico",
"main.py"
]
print(f"正在运行 PyInstaller: {' '.join(cmd)}")
result = subprocess.run(cmd, cwd="./build", capture_output=True, text=True)
if result.returncode == 0:
print("PyInstaller 打包成功!")
print(result.stdout)
else:
print("PyInstaller 打包失败!")
print("错误信息:", result.stderr)
except FileNotFoundError:
print("错误:未找到 PyInstaller请先安装 PyInstallerpip install pyinstaller")
except Exception as e:
print(f"PyInstaller 执行出错: {e}")
def main():
# 打包模块
app_packer = PythonToSQLite("./resources/app.mbank")
app_packer.pack_directory("src/{{ cookiecutter.project_name }}/app", "app")
app_packer.verify_package_structure()
# app_packer.delete_source_code(None)
plugin_packer = PythonToSQLite("./resources/plugins.mbank")
plugin_packer.pack_directory("src/{{ cookiecutter.project_name }}/plugins", "plugins")
plugin_packer.verify_package_structure()
# plugin_packer.delete_source_code(None)
# 创建 build 目录
build_path = Path("build")
if build_path.exists() and build_path.is_dir():
shutil.rmtree("build")
os.makedirs("build", exist_ok=True)
# 复制 utils 文件夹到 build 文件夹
utils_path = Path("utils")
if utils_path.exists():
target_utils_path = Path("build") / "utils"
shutil.copytree(utils_path, target_utils_path, dirs_exist_ok=True)
print(f"已复制 utils 文件夹到 {target_utils_path}")
# 复制 resources 文件夹到 build 文件夹
resources_path = Path("resources")
if resources_path.exists():
target_resources_path = Path("build") / "resources"
shutil.copytree(resources_path, target_resources_path, dirs_exist_ok=True)
print(f"已复制 resources 文件夹到 {target_resources_path}")
print("模块打包完成")
# 复制 main.py 到 build 文件夹
shutil.copy("public/main.py", "build")
# 复制 config.toml 到 build 文件夹
shutil.copy("config.toml", "build")
# 复制 utils 文件夹到 build 文件夹
shutil.copytree("public/utils", "build/utils")
# 运行 PyInstaller
run_pyinstaller()
# 清理临时文件
shutil.rmtree(r"build/build_temp", ignore_errors=True)
shutil.rmtree(r"build/specs", ignore_errors=True)
shutil.rmtree(r"build/utils", ignore_errors=True)
os.remove(r"build/main.py")
# os.remove(r"build/config.toml")
print("构建完成")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,12 @@
import os
import shutil
# 需要遍历的目录
root_dir = "./"
# 遍历目录
for dirpath, dirnames, filenames in os.walk(root_dir):
if "__pycache__" in dirnames:
# 获取 __pycache__ 目录的全路径
pycache_dir = os.path.join(dirpath, "__pycache__")
# 删除目录
shutil.rmtree(pycache_dir)
print(f"Removed: {pycache_dir}")

View File

@ -0,0 +1,34 @@
import os
import sys
def generate_markdown_from_py_files(directory, output_file):
with open(output_file, "w", encoding="utf-8") as md_file:
for root, dirs, files in os.walk(directory):
# 排除 venv 目录
dirs[:] = [d for d in dirs if d != ".venv"]
dirs[:] = [d for d in dirs if d != ".vscode"]
dirs[:] = [d for d in dirs if d != "Scripts"]
dirs[:] = [d for d in dirs if d != "build"]
dirs[:] = [d for d in dirs if d != "node_modules"]
for file in files:
if (
file.endswith(".py")
or file.endswith(".rs")
or file.endswith(".toml")
):
file_path = os.path.join(root, file)
md_file.write(f"`{file_path}`\n")
md_file.write("```python\n")
with open(file_path, "r", encoding="utf-8") as py_file:
md_file.write(py_file.read())
md_file.write("\n```\n\n")
if __name__ == "__main__":
# 指定目录和输出文件名
target_directory = sys.argv[1] # 替换为你的目标目录
output_markdown_file = "output.md" # 输出的 Markdown 文件名
generate_markdown_from_py_files(target_directory, output_markdown_file)
print(f"Markdown 文件已生成:{output_markdown_file}")

View File

@ -0,0 +1,61 @@
#!/usr/bin/env python3
"""
数据库迁移脚本
"""
import asyncio
import sys
import os
# 添加项目根目录到路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
async def main():
from app.core.config import get_settings
from aerich import Command
settings = get_settings()
# Aerich 配置
TORTOISE_ORM = {
"connections": {
"default": settings.database_url
},
"apps": {
"models": {
"models": ["app.models", "aerich.models"],
"default_connection": "default",
}
},
"use_tz": False,
"timezone": "UTC",
}
command = Command(tortoise_config=TORTOISE_ORM, app="models")
# 初始化 Aerich首次运行
if not os.path.exists("./migrations"):
print("🔧 初始化数据库迁移...")
await command.init()
print("✅ 迁移初始化完成")
# 生成迁移文件
print("📝 生成迁移文件...")
try:
migration_name = sys.argv[1] if len(sys.argv) > 1 else "update"
await command.migrate(name=migration_name)
print(f"✅ 迁移文件生成成功: {migration_name}")
except Exception as e:
print(f"⚠️ 生成迁移文件失败: {e}")
# 尝试升级
pass
# 应用迁移
print("🔄 应用数据库迁移...")
try:
await command.upgrade()
print("✅ 迁移应用成功")
except Exception as e:
print(f"❌ 迁移应用失败: {e}")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,82 @@
# config_loader.py - 读取config.toml的示例代码
import toml
import os
from pathlib import Path
from typing import Any, Dict, Optional
class ConfigLoader:
"""TOML配置文件加载器"""
def __init__(self, config_path: Optional[str] = None):
"""
初始化配置加载器
Args:
config_path: 配置文件路径如果为None则自动查找
"""
if config_path is None:
# 自动查找配置文件
config_path = self._find_config_file()
self.config_path = Path(config_path)
self._config: Dict[str, Any] = {}
self.load()
def _find_config_file(self) -> str:
"""自动查找配置文件"""
possible_paths = [
"config.toml",
"config/config.toml",
"~/.config/myapp/config.toml",
"/etc/myapp/config.toml"
]
for path in possible_paths:
expanded_path = os.path.expanduser(path)
if os.path.exists(expanded_path):
return expanded_path
raise FileNotFoundError("未找到配置文件")
def load(self) -> None:
"""加载配置文件"""
if not self.config_path.exists():
raise FileNotFoundError(f"配置文件不存在: {self.config_path}")
with open(self.config_path, 'r', encoding='utf-8') as f:
self._config = toml.load(f)
def get(self, key: str, default: Any = None) -> Any:
"""获取配置值,支持点号分隔的路径"""
keys = key.split('.')
value = self._config
try:
for k in keys:
value = value[k]
return value
except (KeyError, TypeError):
return default
def get_section(self, section: str) -> Dict[str, Any]:
"""获取整个配置节"""
return self._config.get(section, {})
def reload(self) -> None:
"""重新加载配置文件"""
self.load()
def __getitem__(self, key: str) -> Any:
"""支持字典式访问"""
return self.get(key)
def __contains__(self, key: str) -> bool:
"""检查配置是否存在"""
return self.get(key) is not None
@property
def all(self) -> Dict[str, Any]:
"""获取所有配置"""
return self._config

View File

@ -0,0 +1,49 @@
import sys
from .config import ConfigLoader
import argparse
import os
from module_bank import PythonToSQLite
from typing import Callable
# 检查是否在 PyInstaller 子进程中
def is_pyinstaller_child():
return len(sys.argv) > 1 and ('_child.py' in sys.argv[0] or sys.argv[1].isdigit())
def parse_args():
"""安全的参数解析函数"""
if is_pyinstaller_child():
# PyInstaller 子进程,使用默认参数
return argparse.Namespace(config="config.toml")
parser = argparse.ArgumentParser(description="chongming 应用启动脚本")
parser.add_argument("--config", default="config.toml", help="配置文件路径")
return parser.parse_args()
args = parse_args()
def init_module_bank(config):
MODULE_BANK_PATH_LIST = config["production"]["module_system"]["path"]
for module_path in MODULE_BANK_PATH_LIST:
if os.path.exists(module_path) == False:
raise ValueError(f"模块路径不存在: {module_path}")
packer_list = [PythonToSQLite(module_path) for module_path in MODULE_BANK_PATH_LIST]
for packer in packer_list:
modules = packer.list_modules()
for module in modules:
print(f"{module['module_name']} {'[包]' if module['is_package'] else ''}")
packer.install_importer()
def launch(run_fun: Callable[[dict, dict], None], run_mode: str):
config = ConfigLoader(args.config)
# 获取对应运行模式的配置
default_config = config.get("default", {})
run_mode = default_config.get("env", "development")
app_config = config.get(run_mode, {})
if run_mode == "production":
init_module_bank(config)
run_fun(app_config, default_config)