`./{{ cookiecutter.project_name }}\config.toml` ```python [default] app.name = "{{ cookiecutter.project_name }}" app.version = "{{ cookiecutter.author_name }}" app.description = "基于 FastAPI 和 Tortoise-ORM 的 chongming 打包脚手架" app.debug = false # env = "production" [server] host = "0.0.0.0" port = 8000 reload = false workers = 4 [database] url = "sqlite://db.sqlite3" # PostgreSQL 示例: "postgres://user:pass@localhost:5432/dbname" # MySQL 示例: "mysql://user:pass@localhost:3306/dbname" generate_schemas = true # 生产环境使用迁移工具 [security] secret_key = "your-secret-key-change-in-production" algorithm = "HS256" access_token_expire_minutes = 30 [development] env = "development" debug = true [development.server] host = "127.0.0.1" port = 8000 reload = true # workers = 4 [development.database] url = "sqlite://dev.db" generate_schemas = true [development.logging] level = "DEBUG" file = "logs/dev.log" [development.cors] allow_origins = ["*"] allow_credentials = true allow_methods = ["*"] allow_headers = ["*"] [production] env = "production" debug = false [production.server] host = "0.0.0.0" port = 8000 reload = false workers = 1 [production.database] url = "sqlite://prod.db" generate_schemas = true [production.logging] level = "INFO" file = "logs/prod.log" [production.cors] allow_origins = ["https://yourdomain.com"] allow_credentials = true allow_methods = ["GET", "POST", "PUT", "DELETE"] allow_headers = ["Authorization", "Content-Type"] [production.module_system] type = "module-bank" path = ["resources/app.mbank", "resources/plugins.mbank"] [production.file_system] type = "svfs" path = "resources/application.vfs" [redis] enabled = false url = "redis://localhost:6379" [cache] default_ttl = 300 ``` `./{{ cookiecutter.project_name }}\pyproject.toml` ```python [project] name = "{{ cookiecutter.project_name }}" version = "0.1.0" description = "基于 modules-bank 的 fastapi 打包脚手架" readme = "README.md" requires-python = ">=3.8" dependencies = [ "aerich", "fastapi[uvicorn]", "module-bank", "passlib", "pydantic-settings", "pydantic[email]", "sqlite-vfs", "toml", "tortoise-orm", "uvicorn[standard]", ] [[tool.uv.index]] url = "https://mirrors.ustc.edu.cn/pypi/web/simple/" [tool.uv.workspace] members = [ ".", ] [tool.uv.sources] {{ cookiecutter.project_name }} = { workspace = true } [dependency-groups] dev = [ "{{ cookiecutter.project_name }}", "pyinstaller>=6.18.0", ] [tool.hatch.build.targets.wheel] packages = ["src/{{ cookiecutter.project_name }}"] [project.scripts] cm_build = "{{ cookiecutter.project_name }}.scripts.build:main" cm_main = "{{ cookiecutter.project_name }}:main" ``` `./{{ cookiecutter.project_name }}\public\config.toml` ```python [default] app.name = "{{ cookiecutter.project_name }}" app.version = "{{ cookiecutter.author_name }}" app.description = "基于 FastAPI 和 Tortoise-ORM 的 chongming 打包脚手架" app.debug = false env = "production" [server] host = "0.0.0.0" port = 8000 reload = false workers = 4 [database] url = "sqlite://db.sqlite3" # PostgreSQL 示例: "postgres://user:pass@localhost:5432/dbname" # MySQL 示例: "mysql://user:pass@localhost:3306/dbname" generate_schemas = true # 生产环境使用迁移工具 [security] secret_key = "your-secret-key-change-in-production" algorithm = "HS256" access_token_expire_minutes = 30 [development] env = "development" debug = true [development.server] host = "127.0.0.1" port = 8000 reload = true # workers = 4 [development.database] url = "sqlite://dev.db" generate_schemas = true [development.logging] level = "DEBUG" file = "logs/dev.log" [development.cors] allow_origins = ["*"] allow_credentials = true allow_methods = ["*"] allow_headers = ["*"] [production] env = "production" debug = false [production.server] host = "0.0.0.0" port = 8000 reload = false workers = 1 [production.database] url = "sqlite://prod.db" generate_schemas = true [production.logging] level = "INFO" file = "logs/prod.log" [production.cors] allow_origins = ["https://yourdomain.com"] allow_credentials = true allow_methods = ["GET", "POST", "PUT", "DELETE"] allow_headers = ["Authorization", "Content-Type"] [production.module_system] type = "module-bank" path = ["resources/app.mbank", "resources/plugins.mbank"] [production.file_system] type = "svfs" path = "resources/application.vfs" [redis] enabled = false url = "redis://localhost:6379" [cache] default_ttl = 300 ``` `./{{ cookiecutter.project_name }}\public\main.py` ```python # main.py - 修复 reload 问题 #!/usr/bin/env python3 """ FastAPI + Tortoise-ORM + Chongming 示例 主入口文件,适配 chongming 打包系统 """ import sys import os from utils.launch import launch from multiprocessing import freeze_support def app_run(app_config: dict, default_config: dict): """ 应用运行函数 - 适配模块银行环境 Args: app_config: 应用配置 default_config: 默认配置 """ import uvicorn from app.logger import setup_logging, get_logger # type: ignore setup_logging(app_config["logging"]) logger = get_logger("app") # 获取服务器配置 server_config = app_config.get("server", {}) host = server_config.get("host", "0.0.0.0") port = server_config.get("port", 8000) # 打包环境强制禁用重载 is_frozen = getattr(sys, 'frozen', False) reload = False if is_frozen else server_config.get("reload", False) workers = server_config.get("workers", 4) # 显示启动信息 env = app_config.get("env", "development") logger.info("=" * 50) logger.info(f"🚀 启动 {default_config.get('app.name', 'FastAPI 应用')}") logger.info(f"📦 版本: {default_config.get('app.version', '1.0.0')}") logger.info(f"🌍 环境: {env}") logger.info(f"📍 地址: http://{host}:{port}") logger.info(f"📚 文档: http://{host}:{port}/docs") logger.info(f"🔧 调试: {'启用' if app_config.get('debug', False) else '禁用'}") logger.info(f"🔄 重载: {'启用' if reload else '禁用'}") logger.info(f"👥 Workers: {workers}") logger.info("=" * 50) # 关键修改:根据 reload 模式选择不同的启动方式 uvicorn.run( "app:app", host=host, port=port, reload=reload, workers=workers, log_level="debug" if app_config.get("debug", False) else "info", access_log=True, ) def main(): freeze_support() # 如果是打包后的可执行文件,调整路径 if getattr(sys, 'frozen', False): exe_dir = os.path.dirname(sys.executable) if exe_dir not in sys.path: sys.path.insert(0, exe_dir) # 启动应用 launch(app_run, "development") if __name__ == "__main__": main() ``` `./{{ cookiecutter.project_name }}\public\utils\config.py` ```python # config_loader.py - 读取config.toml的示例代码 import toml import os from pathlib import Path from typing import Any, Dict, Optional class ConfigLoader: """TOML配置文件加载器""" def __init__(self, config_path: Optional[str] = None): """ 初始化配置加载器 Args: config_path: 配置文件路径,如果为None则自动查找 """ if config_path is None: # 自动查找配置文件 config_path = self._find_config_file() self.config_path = Path(config_path) self._config: Dict[str, Any] = {} self.load() def _find_config_file(self) -> str: """自动查找配置文件""" possible_paths = [ "config.toml", "config/config.toml", "~/.config/myapp/config.toml", "/etc/myapp/config.toml" ] for path in possible_paths: expanded_path = os.path.expanduser(path) if os.path.exists(expanded_path): return expanded_path raise FileNotFoundError("未找到配置文件") def load(self) -> None: """加载配置文件""" if not self.config_path.exists(): raise FileNotFoundError(f"配置文件不存在: {self.config_path}") with open(self.config_path, 'r', encoding='utf-8') as f: self._config = toml.load(f) def get(self, key: str, default: Any = None) -> Any: """获取配置值,支持点号分隔的路径""" keys = key.split('.') value = self._config try: for k in keys: value = value[k] return value except (KeyError, TypeError): return default def get_section(self, section: str) -> Dict[str, Any]: """获取整个配置节""" return self._config.get(section, {}) def reload(self) -> None: """重新加载配置文件""" self.load() def __getitem__(self, key: str) -> Any: """支持字典式访问""" return self.get(key) def __contains__(self, key: str) -> bool: """检查配置是否存在""" return self.get(key) is not None @property def all(self) -> Dict[str, Any]: """获取所有配置""" return self._config ``` `./{{ cookiecutter.project_name }}\public\utils\launch.py` ```python import sys from .config import ConfigLoader import argparse import os from module_bank import PythonToSQLite from typing import Callable # 检查是否在 PyInstaller 子进程中 def is_pyinstaller_child(): return len(sys.argv) > 1 and ('_child.py' in sys.argv[0] or sys.argv[1].isdigit()) def parse_args(): """安全的参数解析函数""" if is_pyinstaller_child(): # PyInstaller 子进程,使用默认参数 return argparse.Namespace(config="config.toml") parser = argparse.ArgumentParser(description="chongming 应用启动脚本") parser.add_argument("--config", default="config.toml", help="配置文件路径") return parser.parse_args() args = parse_args() def init_module_bank(config): MODULE_BANK_PATH_LIST = config["production"]["module_system"]["path"] for module_path in MODULE_BANK_PATH_LIST: if os.path.exists(module_path) == False: raise ValueError(f"模块路径不存在: {module_path}") packer_list = [PythonToSQLite(module_path) for module_path in MODULE_BANK_PATH_LIST] for packer in packer_list: modules = packer.list_modules() for module in modules: print(f"{module['module_name']} {'[包]' if module['is_package'] else ''}") packer.install_importer() def launch(run_fun: Callable[[dict, dict], None], run_mode: str): config = ConfigLoader(args.config) # 获取对应运行模式的配置 default_config = config.get("default", {}) run_mode = default_config.get("env", "development") app_config = config.get(run_mode, {}) if run_mode == "production": init_module_bank(config) run_fun(app_config, default_config) ``` `./{{ cookiecutter.project_name }}\public\utils\__init__.py` ```python ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\__init__.py` ```python # main.py - 修复 reload 问题 #!/usr/bin/env python3 """ FastAPI + Tortoise-ORM + Chongming 示例 主入口文件,适配 chongming 打包系统 """ import sys import os from .utils.launch import launch from multiprocessing import freeze_support def app_run(app_config: dict, default_config: dict): """ 应用运行函数 - 适配模块银行环境 Args: app_config: 应用配置 default_config: 默认配置 """ import uvicorn from .app.logger import setup_logging, get_logger setup_logging(app_config["logging"]) logger = get_logger("app") # 获取服务器配置 server_config = app_config.get("server", {}) host = server_config.get("host", "0.0.0.0") port = server_config.get("port", 8000) # 打包环境强制禁用重载 is_frozen = getattr(sys, 'frozen', False) reload = False if is_frozen else server_config.get("reload", False) workers = server_config.get("workers", 4) # 显示启动信息 env = app_config.get("env", "development") logger.info("=" * 50) logger.info(f"🚀 启动 {default_config.get('app.name', 'FastAPI 应用')}") logger.info(f"📦 版本: {default_config.get('app.version', '1.0.0')}") logger.info(f"🌍 环境: {env}") logger.info(f"📍 地址: http://{host}:{port}") logger.info(f"📚 文档: http://{host}:{port}/docs") logger.info(f"🔧 调试: {'启用' if app_config.get('debug', False) else '禁用'}") logger.info(f"🔄 重载: {'启用' if reload else '禁用'}") logger.info(f"👥 Workers: {workers}") logger.info("=" * 50) # 关键修改:根据 reload 模式选择不同的启动方式 uvicorn.run( "{{ cookiecutter.project_name }}.app:app", host=host, port=port, reload=reload, workers=workers, log_level="debug" if app_config.get("debug", False) else "info", access_log=True, ) def main(): freeze_support() # 如果是打包后的可执行文件,调整路径 if getattr(sys, 'frozen', False): exe_dir = os.path.dirname(sys.executable) if exe_dir not in sys.path: sys.path.insert(0, exe_dir) # 启动应用 launch(app_run, "development") if __name__ == "__main__": main() ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\__init__.py` ```python """ FastAPI 应用包 """ __version__ = "1.0.0" """ FastAPI 应用实例 """ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager from .logger import setup_logging, get_logger from .core.config import get_settings from .core.database import register_db, init_db, close_db from .api import api_router # 获取配置 settings = get_settings() logger = get_logger("app") @asynccontextmanager async def lifespan(app: FastAPI): """ 应用生命周期管理 注意:在生产环境的模块银行中,数据库初始化可能失败 这里使用 try-except 确保应用能够正常启动 """ logger.info(f"🚀 启动 {settings.app_name} v{settings.app_version}") logger.info(f"🌍 环境: {settings.env}") # 初始化数据库(生产环境可能跳过) try: await init_db(settings.env) except Exception as e: logger.warning(f"⚠️ 数据库初始化失败: {e}") logger.warning("💡 提示:应用将继续运行,但数据库功能可能受限") yield # 关闭数据库连接 try: await close_db() except Exception as e: logger.warning(f"⚠️ 数据库关闭失败: {e}") logger.info("👋 关闭应用") # 创建 FastAPI 应用实例 app = FastAPI( title=settings.app_name, version=settings.app_version, description=settings.description, docs_url="/docs" if settings.debug else None, redoc_url="/redoc" if settings.debug else None, openapi_url="/openapi.json" if settings.debug else None, lifespan=lifespan, ) # 添加 CORS 中间件 app.add_middleware( CORSMiddleware, allow_origins=settings.cors_allow_origins, allow_credentials=settings.cors_allow_credentials, allow_methods=settings.cors_allow_methods, allow_headers=settings.cors_allow_headers, ) # 注册数据库 app = register_db(app, settings.env) # 注册路由 app.include_router(api_router, prefix="/api/v1") # 根路由 @app.get("/", summary="根路径") async def root(): """根路径,返回应用信息""" return { "message": f"欢迎使用 {settings.app_name}", "version": settings.app_version, "docs": "/docs" if settings.debug else None, "health": "/api/v1/health", } # 全局异常处理器 @app.exception_handler(Exception) async def global_exception_handler(request, exc): """全局异常处理器""" logger.error(f"全局异常: {exc}") return { "error": "内部服务器错误", "detail": str(exc) if settings.debug else "请查看服务器日志" } logger.info("✅ FastAPI 应用初始化完成") ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\api\dependencies.py` ```python """ API 依赖项 """ from typing import AsyncGenerator from fastapi import Depends, HTTPException, status from tortoise.exceptions import DBConnectionError, DoesNotExist from tortoise import Tortoise async def get_db() -> AsyncGenerator[None, None]: """获取数据库连接依赖""" try: # 检查数据库连接 conn = Tortoise.get_connection("default") # 执行简单查询验证连接 await conn.execute_query("SELECT 1") yield except DBConnectionError as e: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="数据库连接失败" ) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"数据库错误: {str(e)}" ) def get_current_user(): """获取当前用户(示例)""" # 这里可以实现 JWT 验证等逻辑 pass ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\api\__init__.py` ```python """ API 路由 """ from fastapi import APIRouter from .routers import health, items, users # 创建主路由 api_router = APIRouter() # 注册子路由 api_router.include_router(health.router, prefix="/health", tags=["health"]) api_router.include_router(items.router, prefix="/items", tags=["items"]) api_router.include_router(users.router, prefix="/users", tags=["users"]) # 导出路由 __all__ = ["api_router"] ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\api\routers\health.py` ```python """ 健康检查路由 """ from fastapi import APIRouter, Depends from ...core.config import get_settings from ..dependencies import get_db router = APIRouter() @router.get("/", summary="健康检查") async def health_check(db = Depends(get_db)): """健康检查端点""" settings = get_settings() return { "status": "healthy", "service": settings.app_name, "version": settings.app_version, "environment": settings.env, "timestamp": "now" # 实际使用中可以用 datetime.now() } @router.get("/ready", summary="就绪检查") async def readiness_check(): """就绪检查端点""" return { "status": "ready", "database": "connected", "api": "available" } @router.get("/info", summary="服务信息") async def service_info(): """服务信息端点""" settings = get_settings() return { "name": settings.app_name, "version": settings.app_version, "description": settings.description, "environment": settings.env, "debug": settings.debug, "database": settings.database_url.split("://")[0] if "://" in settings.database_url else "unknown" } ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\api\routers\items.py` ```python """ Item 路由 """ from typing import List from fastapi import APIRouter, Depends, HTTPException, status from tortoise.exceptions import DoesNotExist from ...models.item import Item from ...schemas.item import ItemCreate, ItemUpdate, ItemResponse from ..dependencies import get_db router = APIRouter() @router.get("/", response_model=List[ItemResponse], summary="获取物品列表") async def read_items( skip: int = 0, limit: int = 100, db=Depends(get_db) ): """获取物品列表""" items = await Item.all().offset(skip).limit(limit) return items @router.get("/{item_id}", response_model=ItemResponse, summary="获取物品详情") async def read_item(item_id: int, db=Depends(get_db)): """根据ID获取物品""" try: item = await Item.get(id=item_id) return item except DoesNotExist: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"物品 ID {item_id} 不存在" ) @router.post("/", response_model=ItemResponse, status_code=status.HTTP_201_CREATED, summary="创建物品") async def create_item(item: ItemCreate, db=Depends(get_db)): """创建新物品""" # 检查名称是否已存在 existing_item = await Item.filter(name=item.name).first() if existing_item: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"物品名称 '{item.name}' 已存在" ) # 创建物品 item_dict = item.model_dump() new_item = await Item.create(**item_dict) return new_item @router.put("/{item_id}", response_model=ItemResponse, summary="更新物品") async def update_item(item_id: int, item_update: ItemUpdate, db=Depends(get_db)): """更新物品信息""" try: item = await Item.get(id=item_id) # 更新字段 update_data = item_update.model_dump(exclude_unset=True) # 如果更新名称,检查是否与其他物品重名 if "name" in update_data: existing_item = await Item.filter( name=update_data["name"] ).exclude(id=item_id).first() if existing_item: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"物品名称 '{update_data['name']}' 已存在" ) # 应用更新 await item.update_from_dict(update_data) await item.save() return item except DoesNotExist: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"物品 ID {item_id} 不存在" ) @router.delete("/{item_id}", status_code=status.HTTP_204_NO_CONTENT, summary="删除物品") async def delete_item(item_id: int, db=Depends(get_db)): """删除物品""" try: item = await Item.get(id=item_id) await item.delete() return None except DoesNotExist: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"物品 ID {item_id} 不存在" ) ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\api\routers\users.py` ```python """ User 路由 """ from typing import List from fastapi import APIRouter, Depends, HTTPException, status from tortoise.exceptions import DoesNotExist, IntegrityError from ...models.user import User from ...schemas.user import UserCreate, UserUpdate, UserResponse from ...services.user_service import UserService from ..dependencies import get_db router = APIRouter() @router.get("/", response_model=List[UserResponse], summary="获取用户列表") async def read_users( skip: int = 0, limit: int = 100, db=Depends(get_db) ): """获取用户列表""" users = await UserService.get_all_users(skip, limit) return users @router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED, summary="创建用户") async def create_user(user_data: UserCreate, db=Depends(get_db)): """创建新用户""" # 检查用户名是否已存在 existing_user = await UserService.get_user_by_username(user_data.username) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"用户名 '{user_data.username}' 已存在" ) # 检查邮箱是否已存在 existing_email = await UserService.get_user_by_email(user_data.email) if existing_email: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"邮箱 '{user_data.email}' 已存在" ) # 创建用户 try: user = await UserService.create_user(user_data) return user except IntegrityError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="创建用户失败" ) @router.get("/{user_id}", response_model=UserResponse, summary="获取用户详情") async def read_user(user_id: int, db=Depends(get_db)): """根据ID获取用户""" user = await UserService.get_user_by_id(user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"用户 ID {user_id} 不存在" ) return user @router.put("/{user_id}", response_model=UserResponse, summary="更新用户") async def update_user(user_id: int, user_update: UserUpdate, db=Depends(get_db)): """更新用户信息""" user = await UserService.update_user(user_id, user_update) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"用户 ID {user_id} 不存在" ) return user @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT, summary="删除用户") async def delete_user(user_id: int, db=Depends(get_db)): """删除用户""" deleted = await UserService.delete_user(user_id) if not deleted: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"用户 ID {user_id} 不存在" ) return None ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\api\routers\__init__.py` ```python """ 路由模块 """ ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\core\config.py` ```python """ 配置管理模块 基于 Pydantic Settings 和 TOML 文件 """ from typing import Optional, Dict, Any from pydantic_settings import BaseSettings from pydantic import Field import toml from pathlib import Path import sys import os try: from ...utils.launch import args except: from utils.launch import args # type: ignore class Settings(BaseSettings): """应用配置""" # 应用信息 app_name: str = "FastAPI Demo" app_version: str = "1.0.0" description: str = "FastAPI 应用" # 服务器配置 host: str = "0.0.0.0" port: int = 8000 reload: bool = False workers: int = 4 # 数据库配置 database_url: str = "sqlite://db.sqlite3" generate_schemas: bool = False # 环境配置 env: str = "development" debug: bool = False # 日志配置 logging_level: str = "INFO" logging_file: Optional[str] = None # CORS 配置 cors_allow_origins: list = ["*"] cors_allow_credentials: bool = True cors_allow_methods: list = ["*"] cors_allow_headers: list = ["*"] # 安全配置 secret_key: str = "your-secret-key-change-in-production" algorithm: str = "HS256" access_token_expire_minutes: int = 30 # 模块系统 module_system_type: str = "local" module_system_path: list = ["plugins", "app"] class Config: env_file = ".env" extra = "ignore" @classmethod def from_toml(cls, config_path: Optional[str] = None) -> "Settings": """从 TOML 文件加载配置""" if config_path is None: # 尝试多个可能的配置路径 possible_paths = [ "config.toml", "./config.toml", "../config.toml", Path(__file__).parent.parent.parent / "config.toml", ] for path in possible_paths: if Path(path).exists(): config_path = str(path) print(f"📄 加载配置文件: {config_path}") break if config_path is None: print("⚠️ 未找到配置文件,使用默认配置") return cls() try: config_file = Path(config_path) if not config_file.exists(): print(f"⚠️ 配置文件不存在: {config_path}") return cls() config_data = toml.load(config_file) # 获取默认配置 default_config = config_data.get("default", {}) # 确定环境 env = default_config.get("env", "development") # 获取环境特定配置 env_config = config_data.get(env, {}) # 合并配置 merged_config = {**default_config, **env_config} # 处理嵌套配置 for section in ["server", "database", "security", "logging", "cors", "module_system"]: if section in config_data: if isinstance(config_data[section], dict): merged_config.update(config_data[section]) # 从环境特定配置中覆盖 for section in ["server", "database", "security", "logging", "cors", "module_system"]: env_section_key = f"{env}.{section}" if env_section_key in config_data: if isinstance(config_data[env_section_key], dict): merged_config.update(config_data[env_section_key]) print(f"🌍 环境: {env}") return cls(**merged_config) except Exception as e: print(f"❌ 加载配置失败: {e}") import traceback traceback.print_exc() return cls() # 全局配置实例 _settings = None def get_settings() -> Settings: """获取配置实例(单例模式)""" global _settings if _settings is None: _settings = Settings.from_toml(args.config) return _settings ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\core\database.py` ```python """ Tortoise-ORM 数据库配置 """ from typing import Optional from tortoise import Tortoise from tortoise.contrib.fastapi import register_tortoise from .config import get_settings async def init_db(run_mode: str): """初始化数据库连接""" settings = get_settings() if run_mode == "production": db_config = { "connections": { "default": settings.database_url }, "apps": { "models": { "models": ["app.models", "aerich.models"], "default_connection": "default", } }, "use_tz": False, "timezone": "UTC", } elif run_mode == "development": db_config = { "connections": { "default": settings.database_url }, "apps": { "models": { "models": ["{{ cookiecutter.project_name }}.app.models", "aerich.models"], "default_connection": "default", } }, "use_tz": False, "timezone": "UTC", } else: raise ValueError("Invalid run mode") await Tortoise.init(db_config) # 根据配置决定是否生成表结构 if settings.generate_schemas: print("🗃️ 生成数据库表结构...") await Tortoise.generate_schemas() print("✅ 数据库表结构生成完成") else: print("⏭️ 跳过数据库表结构生成(建议使用迁移工具)") print(f"✅ 数据库连接成功: {settings.database_url}") async def close_db(): """关闭数据库连接""" await Tortoise.close_connections() print("✅ 数据库连接已关闭") def register_db(app, run_mode: str): """将数据库注册到 FastAPI 应用""" settings = get_settings() modules_path = "app.models" if run_mode == "production" else "{{ cookiecutter.project_name }}.app.models" register_tortoise( app, db_url=settings.database_url, modules={"models": [modules_path]}, generate_schemas=settings.generate_schemas, add_exception_handlers=True, ) return app # 在 app/core/database.py 中添加 from aerich import Command async def run_migrations(): """运行数据库迁移""" try: command = Command( tortoise_config={ "connections": { "default": get_settings().database_url }, "apps": { "models": { "models": ["app.models", "aerich.models"], "default_connection": "default", } }, }, app="models" ) # 检查并应用迁移 await command.init() await command.upgrade() print("✅ 数据库迁移完成") except Exception as e: print(f"⚠️ 数据库迁移失败: {e}") ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\core\security.py` ```python ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\core\__init__.py` ```python """ 核心模块 """ ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\logger\__init__.py` ```python import logging from pathlib import Path class ExcludeWatchFilesFilter(logging.Filter): def filter(self, record): # 如果日志记录来自 watchfiles 模块,则不记录(返回 False) return not record.name.startswith('watchfiles') def setup_logging(log_config: dict): log_level = getattr(logging, log_config["level"], logging.INFO) log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' # 创建文件夹 log_file_path = Path(log_config["file"]) log_file_dir = log_file_path.parent if log_file_dir != Path("."): log_file_dir.mkdir(parents=True, exist_ok=True) # 设置特定日志级别的过滤 logging.getLogger("watchfiles").setLevel(logging.WARNING) # 配置根日志记录器 root_logger = logging.getLogger() root_logger.setLevel(log_level) # 创建格式化器 formatter = logging.Formatter(log_format) # 创建文件处理器并添加过滤器 file_handler = logging.FileHandler(log_config["file"], encoding="utf-8") file_handler.setFormatter(formatter) file_handler.addFilter(ExcludeWatchFilesFilter()) # 创建控制台处理器并添加过滤器 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) console_handler.addFilter(ExcludeWatchFilesFilter()) # 添加处理器到根日志记录器 root_logger.addHandler(file_handler) root_logger.addHandler(console_handler) print(f"日志系统已配置 - 级别: {logging.getLevelName(log_level)}, 文件: {log_config['file']}") def get_logger(name: str) -> logging.Logger: logger = logging.getLogger(name) # 为每个logger添加过滤器 logger.addFilter(ExcludeWatchFilesFilter()) return logger ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\models\item.py` ```python """ Item 数据模型 """ from tortoise.models import Model from tortoise import fields from datetime import datetime class Item(Model): """物品模型""" id = fields.IntField(pk=True, description="主键ID") name = fields.CharField(max_length=100, description="名称") description = fields.TextField(null=True, description="描述") price = fields.DecimalField(max_digits=10, decimal_places=2, default=0.00, description="价格") is_active = fields.BooleanField(default=True, description="是否激活") # 关联用户(可选) owner = fields.ForeignKeyField( "models.User", related_name="items", null=True, on_delete=fields.SET_NULL, description="所有者" ) # 时间戳 created_at = fields.DatetimeField(auto_now_add=True, description="创建时间") updated_at = fields.DatetimeField(auto_now=True, description="更新时间") def __str__(self): return f"Item(id={self.id}, name={self.name}, price={self.price})" class Meta: table = "items" table_description = "物品表" ordering = ["-created_at"] class PydanticMeta: # Pydantic 模型配置 exclude = ["is_active"] ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\models\user.py` ```python """ User 数据模型 """ from tortoise.models import Model from tortoise import fields from datetime import datetime from .item import Item class User(Model): """用户模型""" id = fields.IntField(pk=True, description="主键ID") username = fields.CharField(max_length=50, unique=True, description="用户名") email = fields.CharField(max_length=100, unique=True, description="邮箱") hashed_password = fields.CharField(max_length=128, description="哈希密码") is_active = fields.BooleanField(default=True, description="是否激活") is_superuser = fields.BooleanField(default=False, description="是否超级用户") # 时间戳 created_at = fields.DatetimeField(auto_now_add=True, description="创建时间") updated_at = fields.DatetimeField(auto_now=True, description="更新时间") # 反向关系 items: fields.ReverseRelation["Item"] def __str__(self): return f"User(id={self.id}, username={self.username})" class Meta: table = "users" table_description = "用户表" ordering = ["-created_at"] class PydanticMeta: # Pydantic 模型配置 exclude = ["hashed_password"] ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\models\__init__.py` ```python """ 数据库模型 """ from .item import Item from .user import User # 导出所有模型,供 Tortoise-ORM 自动发现 __all__ = ["Item", "User"] ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\schemas\item.py` ```python """ Item Pydantic 模型 """ from pydantic import BaseModel, Field from datetime import datetime from typing import Optional from decimal import Decimal class ItemBase(BaseModel): """Item 基础模型""" name: str = Field(..., min_length=1, max_length=100, description="物品名称") description: Optional[str] = Field(None, description="物品描述") price: Decimal = Field(Decimal(0.00), ge=0, description="物品价格") class ItemCreate(ItemBase): """创建 Item 模型""" pass class ItemUpdate(BaseModel): """更新 Item 模型""" name: Optional[str] = Field(None, min_length=1, max_length=100, description="物品名称") description: Optional[str] = Field(None, description="物品描述") price: Optional[Decimal] = Field(None, ge=0, description="物品价格") class ItemInDB(ItemBase): """数据库中的 Item 模型""" id: int is_active: bool created_at: datetime updated_at: datetime class Config: from_attributes = True class ItemResponse(ItemInDB): """Item 响应模型""" pass ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\schemas\user.py` ```python # app/schemas/user.py from pydantic import BaseModel, EmailStr from datetime import datetime from typing import Optional class UserBase(BaseModel): username: str email: EmailStr class UserCreate(UserBase): password: str class UserUpdate(BaseModel): username: Optional[str] = None email: Optional[EmailStr] = None password: Optional[str] = None class UserInDB(UserBase): id: int is_active: bool created_at: datetime updated_at: datetime class Config: from_attributes = True class UserResponse(UserInDB): pass ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\schemas\__init__.py` ```python """ Pydantic 模型 """ from .item import ItemCreate, ItemUpdate, ItemResponse, ItemInDB __all__ = [ "ItemCreate", "ItemUpdate", "ItemResponse", "ItemInDB" ] ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\services\item_service.py` ```python # app/services/item_service.py from typing import Optional, List from ..models.item import Item from ..schemas.item import ItemCreate, ItemUpdate class ItemService: @staticmethod async def get_all_items( skip: int = 0, limit: int = 100, owner_id: Optional[int] = None ) -> List[Item]: query = Item.all() if owner_id: query = query.filter(owner_id=owner_id) return await query.offset(skip).limit(limit) @staticmethod async def get_item_by_id(item_id: int) -> Optional[Item]: return await Item.filter(id=item_id).first().prefetch_related("owner") @staticmethod async def create_item(item_data: ItemCreate, owner_id: Optional[int] = None) -> Item: item = await Item.create( **item_data.model_dump(), owner_id=owner_id ) return item @staticmethod async def update_item(item_id: int, item_data: ItemUpdate) -> Optional[Item]: item = await ItemService.get_item_by_id(item_id) if not item: return None update_data = item_data.model_dump(exclude_unset=True) await item.update_from_dict(update_data) await item.save() return item @staticmethod async def delete_item(item_id: int) -> bool: item = await ItemService.get_item_by_id(item_id) if item: await item.delete() return True return False ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\services\user_service.py` ```python # app/services/user_service.py from typing import Optional, List from ..models.user import User from ..schemas.user import UserCreate, UserUpdate from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class UserService: @staticmethod def get_password_hash(password: str) -> str: return pwd_context.hash(password) @staticmethod def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) @staticmethod async def get_all_users(skip: int = 0, limit: int = 100) -> List[User]: return await User.all().offset(skip).limit(limit) @staticmethod async def get_user_by_id(user_id: int) -> Optional[User]: return await User.filter(id=user_id).first() @staticmethod async def get_user_by_username(username: str) -> Optional[User]: return await User.filter(username=username).first() @staticmethod async def get_user_by_email(email: str) -> Optional[User]: return await User.filter(email=email).first() @staticmethod async def create_user(user_data: UserCreate) -> User: hashed_password = UserService.get_password_hash(user_data.password) user = await User.create( username=user_data.username, email=user_data.email, hashed_password=hashed_password ) return user @staticmethod async def update_user(user_id: int, user_data: UserUpdate) -> Optional[User]: user = await UserService.get_user_by_id(user_id) if not user: return None update_data = user_data.model_dump(exclude_unset=True) if "password" in update_data: update_data["hashed_password"] = UserService.get_password_hash( update_data.pop("password") ) await user.update_from_dict(update_data) await user.save() return user @staticmethod async def delete_user(user_id: int) -> bool: user = await UserService.get_user_by_id(user_id) if user: await user.delete() return True return False ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\app\services\__init__.py` ```python ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\scripts\build.py` ```python from module_bank import PythonToSQLite import os from pathlib import Path import shutil import subprocess import sys def run_pyinstaller(): """运行 PyInstaller 打包成可执行文件""" try: # 构建 PyInstaller 命令 cmd = [ sys.executable, "-m", "PyInstaller", "--onefile", "--name", "{{ cookiecutter.project_name }}", "--distpath", ".", "--workpath", "./build_temp", "--specpath", "./specs", "--clean", # 隐藏导入 "--hidden-import", "passlib", "--hidden-import", "passlib.context", "--hidden-import", "passlib.handlers.bcrypt", "--hidden-import", "fastapi", "--hidden-import", "fastapi.middleware.cors", "--hidden-import", "pydantic_settings", "--hidden-import", "tortoise", "--hidden-import", "tortoise.contrib.fastapi", "--hidden-import", "aerich", "--hidden-import", "uvicorn", # 复制包元数据 - 解决 importlib.metadata 问题 "--copy-metadata", "tortoise-orm", "--copy-metadata", "aerich", "--copy-metadata", "fastapi", "--copy-metadata", "uvicorn", "--copy-metadata", "pydantic", "--copy-metadata", "pydantic-settings", "--copy-metadata", "module-bank", "--copy-metadata", "passlib", "--copy-metadata", "sqlite-vfs", "--copy-metadata", "toml", # 收集整个包 "--collect-all", "tortoise-orm", "--collect-all", "aerich", # 禁用多进程优化 # "--multiprocessing-fork", # 明确指定使用 fork 模式 # 添加数据文件 # "--add-data", "resources:resources", # 其他选项 "--icon", "../../public/{{ cookiecutter.project_name }}.ico", "main.py" ] print(f"正在运行 PyInstaller: {' '.join(cmd)}") result = subprocess.run(cmd, cwd="./build", capture_output=True, text=True) if result.returncode == 0: print("PyInstaller 打包成功!") print(result.stdout) else: print("PyInstaller 打包失败!") print("错误信息:", result.stderr) except FileNotFoundError: print("错误:未找到 PyInstaller,请先安装 PyInstaller:pip install pyinstaller") except Exception as e: print(f"PyInstaller 执行出错: {e}") def main(): # 打包模块 resources_path = Path("resources") if not resources_path.exists(): resources_path.mkdir() app_packer = PythonToSQLite("./resources/app.mbank") app_packer.pack_directory("src/{{ cookiecutter.project_name }}/app", "app") app_packer.verify_package_structure() # app_packer.delete_source_code(None) plugin_packer = PythonToSQLite("./resources/plugins.mbank") plugin_packer.pack_directory("src/{{ cookiecutter.project_name }}/plugins", "plugins") plugin_packer.verify_package_structure() # plugin_packer.delete_source_code(None) # 创建 build 目录 build_path = Path("build") if build_path.exists() and build_path.is_dir(): shutil.rmtree("build") os.makedirs("build", exist_ok=True) # 复制 utils 文件夹到 build 文件夹 utils_path = Path("utils") if utils_path.exists(): target_utils_path = Path("build") / "utils" shutil.copytree(utils_path, target_utils_path, dirs_exist_ok=True) print(f"已复制 utils 文件夹到 {target_utils_path}") # 复制 resources 文件夹到 build 文件夹 resources_path = Path("resources") if resources_path.exists(): target_resources_path = Path("build") / "resources" shutil.copytree(resources_path, target_resources_path, dirs_exist_ok=True) print(f"已复制 resources 文件夹到 {target_resources_path}") print("模块打包完成") # 复制 main.py 到 build 文件夹 shutil.copy("public/main.py", "build") # 复制 config.toml 到 build 文件夹 shutil.copy("public/config.toml", "build") # 复制 utils 文件夹到 build 文件夹 shutil.copytree("public/utils", "build/utils") # 运行 PyInstaller run_pyinstaller() # 清理临时文件 shutil.rmtree(r"build/build_temp", ignore_errors=True) shutil.rmtree(r"build/specs", ignore_errors=True) shutil.rmtree(r"build/utils", ignore_errors=True) os.remove(r"build/main.py") # os.remove(r"build/config.toml") print("构建完成") if __name__ == "__main__": main() ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\scripts\clean_pycache.py` ```python import os import shutil # 需要遍历的目录 root_dir = "./" # 遍历目录 for dirpath, dirnames, filenames in os.walk(root_dir): if "__pycache__" in dirnames: # 获取 __pycache__ 目录的全路径 pycache_dir = os.path.join(dirpath, "__pycache__") # 删除目录 shutil.rmtree(pycache_dir) print(f"Removed: {pycache_dir}") ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\scripts\code_content.py` ```python import os import sys def generate_markdown_from_py_files(directory, output_file): with open(output_file, "w", encoding="utf-8") as md_file: for root, dirs, files in os.walk(directory): # 排除 venv 目录 dirs[:] = [d for d in dirs if d != ".venv"] dirs[:] = [d for d in dirs if d != ".vscode"] dirs[:] = [d for d in dirs if d != "Scripts"] dirs[:] = [d for d in dirs if d != "build"] dirs[:] = [d for d in dirs if d != "node_modules"] for file in files: if ( file.endswith(".py") or file.endswith(".rs") or file.endswith(".toml") ): file_path = os.path.join(root, file) md_file.write(f"`{file_path}`\n") md_file.write("```python\n") with open(file_path, "r", encoding="utf-8") as py_file: md_file.write(py_file.read()) md_file.write("\n```\n\n") if __name__ == "__main__": # 指定目录和输出文件名 target_directory = sys.argv[1] # 替换为你的目标目录 output_markdown_file = "output.md" # 输出的 Markdown 文件名 generate_markdown_from_py_files(target_directory, output_markdown_file) print(f"Markdown 文件已生成:{output_markdown_file}") ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\scripts\migrate.py` ```python #!/usr/bin/env python3 """ 数据库迁移脚本 """ import asyncio import sys import os # 添加项目根目录到路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) async def main(): from app.core.config import get_settings from aerich import Command settings = get_settings() # Aerich 配置 TORTOISE_ORM = { "connections": { "default": settings.database_url }, "apps": { "models": { "models": ["app.models", "aerich.models"], "default_connection": "default", } }, "use_tz": False, "timezone": "UTC", } command = Command(tortoise_config=TORTOISE_ORM, app="models") # 初始化 Aerich(首次运行) if not os.path.exists("./migrations"): print("🔧 初始化数据库迁移...") await command.init() print("✅ 迁移初始化完成") # 生成迁移文件 print("📝 生成迁移文件...") try: migration_name = sys.argv[1] if len(sys.argv) > 1 else "update" await command.migrate(name=migration_name) print(f"✅ 迁移文件生成成功: {migration_name}") except Exception as e: print(f"⚠️ 生成迁移文件失败: {e}") # 尝试升级 pass # 应用迁移 print("🔄 应用数据库迁移...") try: await command.upgrade() print("✅ 迁移应用成功") except Exception as e: print(f"❌ 迁移应用失败: {e}") if __name__ == "__main__": asyncio.run(main()) ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\scripts\__init__.py` ```python ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\utils\config.py` ```python # config_loader.py - 读取config.toml的示例代码 import toml import os from pathlib import Path from typing import Any, Dict, Optional class ConfigLoader: """TOML配置文件加载器""" def __init__(self, config_path: Optional[str] = None): """ 初始化配置加载器 Args: config_path: 配置文件路径,如果为None则自动查找 """ if config_path is None: # 自动查找配置文件 config_path = self._find_config_file() self.config_path = Path(config_path) self._config: Dict[str, Any] = {} self.load() def _find_config_file(self) -> str: """自动查找配置文件""" possible_paths = [ "config.toml", "config/config.toml", "~/.config/myapp/config.toml", "/etc/myapp/config.toml" ] for path in possible_paths: expanded_path = os.path.expanduser(path) if os.path.exists(expanded_path): return expanded_path raise FileNotFoundError("未找到配置文件") def load(self) -> None: """加载配置文件""" if not self.config_path.exists(): raise FileNotFoundError(f"配置文件不存在: {self.config_path}") with open(self.config_path, 'r', encoding='utf-8') as f: self._config = toml.load(f) def get(self, key: str, default: Any = None) -> Any: """获取配置值,支持点号分隔的路径""" keys = key.split('.') value = self._config try: for k in keys: value = value[k] return value except (KeyError, TypeError): return default def get_section(self, section: str) -> Dict[str, Any]: """获取整个配置节""" return self._config.get(section, {}) def reload(self) -> None: """重新加载配置文件""" self.load() def __getitem__(self, key: str) -> Any: """支持字典式访问""" return self.get(key) def __contains__(self, key: str) -> bool: """检查配置是否存在""" return self.get(key) is not None @property def all(self) -> Dict[str, Any]: """获取所有配置""" return self._config ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\utils\launch.py` ```python import sys from .config import ConfigLoader import argparse import os from module_bank import PythonToSQLite from typing import Callable # 检查是否在 PyInstaller 子进程中 def is_pyinstaller_child(): return len(sys.argv) > 1 and ('_child.py' in sys.argv[0] or sys.argv[1].isdigit()) def parse_args(): """安全的参数解析函数""" if is_pyinstaller_child(): # PyInstaller 子进程,使用默认参数 return argparse.Namespace(config="config.toml") parser = argparse.ArgumentParser(description="chongming 应用启动脚本") parser.add_argument("--config", default="config.toml", help="配置文件路径") return parser.parse_args() args = parse_args() def init_module_bank(config): MODULE_BANK_PATH_LIST = config["production"]["module_system"]["path"] for module_path in MODULE_BANK_PATH_LIST: if os.path.exists(module_path) == False: raise ValueError(f"模块路径不存在: {module_path}") packer_list = [PythonToSQLite(module_path) for module_path in MODULE_BANK_PATH_LIST] for packer in packer_list: modules = packer.list_modules() for module in modules: print(f"{module['module_name']} {'[包]' if module['is_package'] else ''}") packer.install_importer() def launch(run_fun: Callable[[dict, dict], None], run_mode: str): config = ConfigLoader(args.config) # 获取对应运行模式的配置 default_config = config.get("default", {}) run_mode = default_config.get("env", "development") app_config = config.get(run_mode, {}) if run_mode == "production": init_module_bank(config) run_fun(app_config, default_config) ``` `./{{ cookiecutter.project_name }}\src\{{ cookiecutter.project_name }}\utils\__init__.py` ```python ```