add module bank

This commit is contained in:
chakcy 2026-01-23 14:45:49 +08:00
commit 10fc3f7869
12 changed files with 744 additions and 0 deletions

10
.gitignore vendored Normal file
View File

@ -0,0 +1,10 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv

1
.python-version Normal file
View File

@ -0,0 +1 @@
3.13

251
README.md Normal file
View File

@ -0,0 +1,251 @@
# Python Module Bank - SQLite 模块打包系统
一个将Python模块打包到SQLite数据库并支持从数据库直接导入模块的工具系统。
## 🌟 特性
- **单文件分发** - 将所有模块打包到单个SQLite数据库文件
- **源代码保护** - 模块以编译后的字节码形式存储
- **动态导入** - 运行时直接从数据库加载模块,无需文件系统
- **完整包支持** - 支持包结构和子模块导入
- **CLI工具** - 提供完整的命令行接口
- **导入钩子** - 无缝集成Python导入系统
## 📦 安装
### 从源码安装
```bash
git clone <repository-url>
cd python-module-bank
pip install -e .
```
### 依赖要求
- Python 3.7+
- 无需额外依赖(仅使用标准库)
## 🚀 快速开始
### 1. 创建示例模块
```python
# my_module.py
def hello():
print("Hello from my_module!")
return "success"
```
### 2. 打包模块到数据库
```python
# pack_example.py
from module_bank import PythonToSQLite
packer = PythonToSQLite("my_modules.db")
packer.pack_module("my_module.py", "my_module")
packer.pack_directory("my_package/")
```
### 3. 从数据库导入
```python
from module_bank import PythonToSQLite
# 安装导入器
packer = PythonToSQLite("my_modules.db")
finder = packer.install_importer()
# 现在可以从数据库导入模块了!
import my_module
import my_package.package_module
my_module.hello()
my_package.package_module.hello()
```
## 📖 详细使用
### 命令行工具
```python
# 打包模块或目录
python -m module_bank.cli pack my_package --db modules.db
# 列出数据库中的模块
python -m module_bank.cli list --db modules.db
# 安装导入器并进入交互模式
python -m module_bank.cli install --db modules.db
```
### 编程接口
#### 打包模块
```python
from module_bank import PythonToSQLite
packer = PythonToSQLite("modules.db")
# 打包单个模块
packer.pack_module("module.py", "module_name")
# 打包整个目录(自动识别包结构)
packer.pack_directory("my_package/")
# 验证包结构
packer.verify_package_structure()
```
#### 导入模块
```python
from module_bank import PythonToSQLite
import sys
packer = PythonToSQLite("modules.db")
# 安装导入器到sys.meta_path
finder = packer.install_importer()
# 列出所有可用模块
modules = packer.list_modules()
for module in modules:
print(f"{module['module_name']} {'[包]' if module['is_package'] else ''}")
# 导入数据库中的模块
import my_package
import my_package.submodule
```
## 🏗️ 架构设计
### 核心组件
```python
src/module_bank/
├── python_to_sqlite.py # 主打包类
├── sqlite_module_importer.py # 数据库存储管理器
├── sqlite_meta_path_finder.py # 元路径查找器
├── sqlite_module_loader.py # 模块加载器
├── cli.py # 命令行接口
└── __init__.py # 模块导出
```
### 数据流
```text
1. 打包阶段:
.py文件 → 编译为字节码 → 存储到SQLite数据库
2. 导入阶段:
导入请求 → MetaPathFinder查找 → ModuleLoader加载 → 执行模块
```
### 数据库模式
```sql
CREATE TABLE python_modules (
module_name TEXT PRIMARY KEY,
source_code TEXT, -- 源代码(可选)
bytecode BLOB, -- 编译后的字节码
is_package BOOLEAN, -- 是否是包
metadata TEXT, -- 元数据JSON格式
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
```
## 🔧 高级功能
### 排除模式
```python
# 打包时排除特定文件
packer.pack_directory(
"my_project/",
exclude_patterns=["*_test.py", "*.pyc", "__pycache__"]
)
```
### 元数据存储
```python
# 为模块添加元数据
packer.importer.add_module(
"my_module",
source_code,
is_package=False,
metadata={"version": "1.0", "author": "me"}
)
```
### 混合导入
```python
# 可以同时使用文件系统和数据库导入
# 数据库导入器优先级更高
import sys
from module_bank import PythonToSQLite
packer = PythonToSQLite("modules.db")
finder = packer.install_importer() # 插入到meta_path开头
# 如果需要文件系统优先,可以调整插入位置
sys.meta_path.append(finder)
```
## ⚠️ 注意事项
### 安全性
- 模块字节码直接执行,确保数据库来源可信
- 生产环境建议添加代码签名验证
### 兼容性
- 字节码不跨Python版本兼容
- 不支持C扩展模块
- 不支持需要文件系统资源的模块如__file__依赖
### 性能
- **启动时**:有一次性数据库查询和反序列化开销
- **运行时**与传统导入性能相同使用sys.modules缓存
- **最佳适用**:长期运行的服务、桌面应用
### 更新模块
```python
# 重新打包会自动更新
packer.pack_module("updated_module.py", "module_name")
```
### 删除模块
```sql
-- 直接从数据库删除
DELETE FROM python_modules WHERE module_name = 'module_to_remove';
```
### 备份与恢复
```bash
# 数据库是单个文件,易于备份
cp modules.db modules.backup.db
# 恢复
cp modules.backup.db modules.db
```
## 📚 应用场景
1. 商业软件分发 - 保护源代码知识产权
2. 插件系统 - 动态加载数据库中的插件模块
3. 教育平台 - 安全分发练习代码
4. 微服务 - 打包多个服务模块到单个文件
5. 嵌入式系统 - 减少文件系统依赖
---
**注意**: 本工具主要用于模块分发和部署场景,不适合开发阶段的频繁修改。

21
pyproject.toml Normal file
View File

@ -0,0 +1,21 @@
[project]
name = "module_bank"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = []
[[tool.uv.index]]
default = true
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
[build-system]
build-backend = "hatchling.build"
requires = ["hatchling"]
[tool.hatch.build.targets.wheel]
packages = ["src/module_bank"]
[project.scripts]
mb = "module_bank.cli:main"

13
scripts/clean_pycache.py Normal file
View File

@ -0,0 +1,13 @@
import os
import shutil
# 需要遍历的目录
root_dir = "./"
# 遍历目录
for dirpath, dirnames, filenames in os.walk(root_dir):
if "__pycache__" in dirnames:
# 获取 __pycache__ 目录的全路径
pycache_dir = os.path.join(dirpath, "__pycache__")
# 删除目录
shutil.rmtree(pycache_dir)
print(f"Removed: {pycache_dir}")

29
scripts/code_content.py Normal file
View File

@ -0,0 +1,29 @@
import os
import sys
def generate_markdown_from_py_files(directory, output_file):
with open(output_file, "w", encoding="utf-8") as md_file:
for root, dirs, files in os.walk(directory):
# 排除 venv 目录
dirs[:] = [d for d in dirs if d != ".venv"]
dirs[:] = [d for d in dirs if d != ".vscode"]
dirs[:] = [d for d in dirs if d != "scripts"]
dirs[:] = [d for d in dirs if d != "build"]
for file in files:
if file.endswith(".py") or file.endswith(".rs"):
file_path = os.path.join(root, file)
md_file.write(f"`{file_path}`\n")
md_file.write("```python\n")
with open(file_path, "r", encoding="utf-8") as py_file:
md_file.write(py_file.read())
md_file.write("\n```\n\n")
if __name__ == "__main__":
# 指定目录和输出文件名
target_directory = sys.argv[1] # 替换为你的目标目录
output_markdown_file = "output.md" # 输出的 Markdown 文件名
generate_markdown_from_py_files(target_directory, output_markdown_file)
print(f"Markdown 文件已生成:{output_markdown_file}")

View File

@ -0,0 +1,11 @@
from .python_to_sqlite import PythonToSQLite
from .sqlite_module_importer import SQLiteModuleImporter
from .sqlite_module_loader import SQLiteModuleLoader
from .sqlite_meta_path_finder import SQLiteMetaPathFinder
__all__ = [
"PythonToSQLite",
"SQLiteModuleImporter",
"SQLiteModuleLoader",
"SQLiteMetaPathFinder",
]

66
src/module_bank/cli.py Normal file
View File

@ -0,0 +1,66 @@
# cli.py
import argparse
from pathlib import Path
def main():
parser = argparse.ArgumentParser(description="Python模块SQLite打包工具")
subparsers = parser.add_subparsers(dest="command", help="命令")
# 打包命令
pack_parser = subparsers.add_parser("pack", help="打包模块到SQLite")
pack_parser.add_argument("source", help="源文件或目录")
pack_parser.add_argument("--db", default="modules.db", help="数据库文件路径")
pack_parser.add_argument("--name", help="模块名(默认从文件名推断)")
# 列出命令
list_parser = subparsers.add_parser("list", help="列出数据库中的模块")
list_parser.add_argument("--db", default="modules.db", help="数据库文件路径")
# 安装命令
install_parser = subparsers.add_parser("install", help="安装SQLite导入器")
install_parser.add_argument("--db", default="modules.db", help="数据库文件路径")
args = parser.parse_args()
if args.command == "pack":
from .python_to_sqlite import PythonToSQLite
packer = PythonToSQLite(args.db)
if Path(args.source).is_dir():
packer.pack_directory(args.source)
print(f"已打包目录: {args.source}")
else:
packer.pack_module(args.source, args.name)
print(f"已打包模块: {args.source}")
elif args.command == "list":
from .python_to_sqlite import PythonToSQLite
packer = PythonToSQLite(args.db)
modules = packer.list_modules()
print(f"数据库中的模块 ({args.db}):")
for module in modules:
package_flag = " [包]" if module["is_package"] else ""
print(f" - {module['module_name']}{package_flag}")
elif args.command == "install":
from .python_to_sqlite import PythonToSQLite
packer = PythonToSQLite(args.db)
packer.install_importer()
print(f"已安装SQLite导入器可以从数据库导入模块了")
# 保持程序运行以便交互使用
import code
code.interact(local=locals())
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,119 @@
import sys
from pathlib import Path
from typing import List
from .sqlite_module_importer import SQLiteModuleImporter
from .sqlite_meta_path_finder import SQLiteMetaPathFinder
class PythonToSQLite:
"""Python到SQLite的打包工具"""
def __init__(self, db_path: str = "python_modules.db"):
self.db_path = db_path
self.importer = SQLiteModuleImporter(db_path)
def pack_directory(
self,
directory_path: str,
include_patterns: List[str] = None,
exclude_patterns: List[str] = None,
):
"""打包整个目录 - 修复版"""
directory = Path(directory_path)
package_name = directory.name # 包名
if include_patterns is None:
include_patterns = ["*.py"]
if exclude_patterns is None:
exclude_patterns = ["__pycache__", "*.pyc"]
# 遍历目录
for py_file in directory.rglob("*.py"):
# 检查排除模式
if any(py_file.match(pattern) for pattern in exclude_patterns):
continue
# 检查包含模式
if any(py_file.match(pattern) for pattern in include_patterns):
# 计算相对于包的路径
rel_path = py_file.relative_to(directory)
module_parts = list(rel_path.parts)
module_parts[-1] = module_parts[-1][:-3] # 去掉.py
if module_parts[-1] == "__init__":
# 这是包
if len(module_parts) > 1:
# 子包例如subpackage/__init__.py
module_name = f"{package_name}.{'.'.join(module_parts[:-1])}"
else:
# 主包例如__init__.py
module_name = package_name
print(f"打包包: {module_name} (来自: {py_file})")
self.importer.add_module(
module_name,
py_file.read_text(encoding="utf-8"),
is_package=True,
)
else:
# 普通模块
module_name = f"{package_name}.{'.'.join(module_parts)}"
print(f"打包模块: {module_name} (来自: {py_file})")
self.importer.add_module(
module_name,
py_file.read_text(encoding="utf-8"),
is_package=False,
)
def pack_module(self, module_path: str, module_name: str = None):
"""打包单个模块"""
module_path = Path(module_path)
if module_name is None:
module_name = module_path.stem
with open(module_path, "r", encoding="utf-8") as f:
source_code = f.read()
print(f"打包独立模块: {module_name} (来自: {module_path})")
self.importer.add_module(module_name, source_code, is_package=False)
def install_importer(self):
"""安装导入器到sys.meta_path"""
finder = SQLiteMetaPathFinder(self.db_path)
# 添加到meta_path开头优先于文件系统查找
sys.meta_path.insert(0, finder)
return finder
def list_modules(self):
"""列出数据库中的所有模块"""
cursor = self.importer.connection.execute(
"SELECT module_name, is_package FROM python_modules ORDER BY module_name"
)
return [dict(row) for row in cursor.fetchall()]
def verify_package_structure(self):
"""验证包的完整性"""
cursor = self.importer.connection.execute(
"SELECT module_name, is_package FROM python_modules"
)
modules = {row["module_name"]: row["is_package"] for row in cursor.fetchall()}
print("数据库中的模块结构:")
for module_name, is_package in sorted(modules.items()):
package_flag = " [包]" if is_package else ""
print(f" - {module_name}{package_flag}")
# 检查包的完整性
for module_name, is_package in modules.items():
if "." in module_name and not is_package:
parent_package = module_name.rsplit(".", 1)[0]
if parent_package not in modules:
print(
f"警告:模块 {module_name} 的父包 {parent_package} 不在数据库中"
)
elif not modules[parent_package]:
print(
f"警告:模块 {module_name} 的父包 {parent_package} 不是一个包"
)

View File

@ -0,0 +1,53 @@
import sqlite3
import importlib
import importlib.abc
import importlib.util
from .sqlite_module_loader import SQLiteModuleLoader
class SQLiteMetaPathFinder(importlib.abc.MetaPathFinder):
"""SQLite元路径查找器"""
def __init__(self, db_path: str):
self.db_path = db_path
self.connection = sqlite3.connect(db_path)
self.connection.row_factory = sqlite3.Row
# 缓存模块查找结果
self.module_cache = {}
self._load_module_cache()
def _load_module_cache(self):
"""加载所有模块到缓存"""
cursor = self.connection.execute(
"SELECT module_name, is_package FROM python_modules"
)
for row in cursor.fetchall():
self.module_cache[row["module_name"]] = row["is_package"]
def find_spec(self, fullname, path=None, target=None):
"""查找模块规范"""
# 检查模块是否在数据库中
if fullname in self.module_cache:
# 创建模块规范
spec = importlib.util.spec_from_loader(
fullname,
SQLiteModuleLoader(self.connection, fullname, self.db_path),
origin=f"sqlite://{self.db_path}",
is_package=self.module_cache[fullname],
)
return spec
# 如果不是完整模块名,检查是否可能是包的子模块
# 例如my_package.package_module
if "." in fullname:
# 检查完整模块名是否在数据库中
if fullname in self.module_cache:
spec = importlib.util.spec_from_loader(
fullname,
SQLiteModuleLoader(self.connection, fullname, self.db_path),
origin=f"sqlite://{self.db_path}",
is_package=self.module_cache[fullname],
)
return spec
return None

View File

@ -0,0 +1,107 @@
import sqlite3
import marshal
from pathlib import Path
from typing import Dict
class SQLiteModuleImporter:
"""从SQLite数据库导入Python模块的导入器"""
def __init__(self, db_path: str):
self.db_path = db_path
self.connection = sqlite3.connect(db_path)
self.connection.row_factory = sqlite3.Row
self._create_tables()
def _create_tables(self):
"""创建存储模块的数据库表"""
self.connection.execute(
"""
CREATE TABLE IF NOT EXISTS python_modules (
module_name TEXT PRIMARY KEY,
source_code TEXT,
bytecode BLOB,
is_package BOOLEAN,
metadata TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
self.connection.commit()
def add_module(
self,
module_name: str,
source_code: str,
is_package: bool = False,
metadata: Dict = None,
):
"""添加模块到数据库"""
# 编译为字节码
bytecode = compile(source_code, f"<sqlite_module:{module_name}>", "exec")
bytecode_blob = marshal.dumps(bytecode)
metadata_str = str(metadata) if metadata else "{}"
self.connection.execute(
"""
INSERT OR REPLACE INTO python_modules
(module_name, source_code, bytecode, is_package, metadata)
VALUES (?, ?, ?, ?, ?)
""",
(module_name, source_code, bytecode_blob, is_package, metadata_str),
)
self.connection.commit()
def add_module_from_file(self, file_path: str, module_name: str = None):
"""从文件添加模块"""
file_path = Path(file_path)
if module_name is None:
# 从文件名推导模块名
module_name = file_path.stem
with open(file_path, "r", encoding="utf-8") as f:
source_code = f.read()
self.add_module(module_name, source_code)
def add_package(self, package_path: str, package_name: str = None):
"""添加整个包到数据库"""
package_path = Path(package_path)
if package_name is None:
package_name = package_path.name
# 添加包目录下的所有.py文件
for py_file in package_path.rglob("*.py"):
# 计算模块名
rel_path = py_file.relative_to(package_path)
module_parts = list(rel_path.parts)
module_parts[-1] = module_parts[-1][:-3] # 去掉.py
if module_parts[-1] == "__init__":
# 包本身
module_name = package_name
if len(module_parts) > 1:
# 子包
module_name = f"{package_name}.{'.'.join(module_parts[:-1])}"
self.add_module(
module_name,
py_file.read_text(encoding="utf-8"),
is_package=True,
)
else:
# 包内的模块
if len(module_parts) == 1:
# 直接子模块
module_name = f"{package_name}.{module_parts[0]}"
else:
# 子包内的模块
module_name = f"{package_name}.{'.'.join(module_parts)}"
self.add_module(
module_name,
py_file.read_text(encoding="utf-8"),
is_package=False,
)

View File

@ -0,0 +1,63 @@
import sqlite3
import importlib
import importlib.abc
import marshal
import types
class SQLiteModuleLoader(importlib.abc.Loader):
"""SQLite模块加载器"""
def __init__(
self, db_connection: sqlite3.Connection, module_name: str, db_path: str = None
):
self.db_connection = db_connection
self.module_name = module_name
self.db_path = db_path
self.db_connection.row_factory = sqlite3.Row
def create_module(self, spec):
"""创建模块对象"""
# 获取模块信息
cursor = self.db_connection.execute(
"SELECT is_package FROM python_modules WHERE module_name = ?",
(self.module_name,),
)
row = cursor.fetchone()
if row and row["is_package"]:
# 创建包模块
module = types.ModuleType(self.module_name)
module.__path__ = [] # 包需要有__path__属性
module.__package__ = self.module_name
return module
else:
# 创建普通模块
return None # 使用默认创建方式
def exec_module(self, module):
"""执行模块代码"""
cursor = self.db_connection.execute(
"SELECT bytecode, source_code, is_package FROM python_modules WHERE module_name = ?",
(self.module_name,),
)
row = cursor.fetchone()
if not row:
raise ImportError(f"No module named '{self.module_name}' in database")
# 从字节码加载
bytecode = marshal.loads(row["bytecode"])
# 在模块的命名空间中执行字节码
exec(bytecode, module.__dict__)
# 如果是包设置__path__属性
if row["is_package"]:
module.__path__ = []
module.__package__ = self.module_name
# 设置模块的__file__属性为数据库路径
module.__file__ = f"sqlite://{self.db_path}/{self.module_name}"
return module