modify: 优化清理调度器
This commit is contained in:
parent
45817e5542
commit
bfa97e760e
1
.gitignore
vendored
1
.gitignore
vendored
@ -5,6 +5,7 @@ build/
|
|||||||
dist/
|
dist/
|
||||||
wheels/
|
wheels/
|
||||||
*.egg-info
|
*.egg-info
|
||||||
|
.python-version
|
||||||
|
|
||||||
# Virtual environments
|
# Virtual environments
|
||||||
.venv
|
.venv
|
||||||
|
|||||||
@ -33,7 +33,7 @@ error_log_handler.setFormatter(
|
|||||||
logging.getLogger().addHandler(error_log_handler)
|
logging.getLogger().addHandler(error_log_handler)
|
||||||
|
|
||||||
__title__ = "queue_sqlite"
|
__title__ = "queue_sqlite"
|
||||||
__version__ = "0.2.0"
|
__version__ = "0.2.1"
|
||||||
__author__ = "chakcy"
|
__author__ = "chakcy"
|
||||||
__email__ = "947105045@qq.com"
|
__email__ = "947105045@qq.com"
|
||||||
__license__ = "MIT"
|
__license__ = "MIT"
|
||||||
|
|||||||
@ -14,6 +14,7 @@ import threading
|
|||||||
import time
|
import time
|
||||||
from queue_sqlite_core import ShardedQueueOperation
|
from queue_sqlite_core import ShardedQueueOperation
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
class CleanupScheduler:
|
class CleanupScheduler:
|
||||||
@ -28,6 +29,7 @@ class CleanupScheduler:
|
|||||||
self.is_running = False
|
self.is_running = False
|
||||||
self.cleanup_thread = None
|
self.cleanup_thread = None
|
||||||
self.remove_days = remove_days
|
self.remove_days = remove_days
|
||||||
|
self.max_db_size_mb = 500
|
||||||
|
|
||||||
# for i in range(self.queue_operation.shard_num):
|
# for i in range(self.queue_operation.shard_num):
|
||||||
# 清理过期但未处理的消息
|
# 清理过期但未处理的消息
|
||||||
@ -40,6 +42,7 @@ class CleanupScheduler:
|
|||||||
while self.is_running:
|
while self.is_running:
|
||||||
try:
|
try:
|
||||||
self.queue_operation.clean_expired_messages()
|
self.queue_operation.clean_expired_messages()
|
||||||
|
self.remove_days = self._check_and_optimize_db(self.remove_days)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"清理消息错误: {str(e)}")
|
logging.error(f"清理消息错误: {str(e)}")
|
||||||
@ -50,6 +53,37 @@ class CleanupScheduler:
|
|||||||
break
|
break
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
def _check_and_optimize_db(self, remove_days):
|
||||||
|
"""检查数据库大小并优化"""
|
||||||
|
try:
|
||||||
|
# 获取数据库文件大小
|
||||||
|
db_size_mb = self._get_database_size()
|
||||||
|
|
||||||
|
if db_size_mb > self.max_db_size_mb:
|
||||||
|
logging.info(f"数据库大小 {db_size_mb}MB 超过限制,执行优化...")
|
||||||
|
print(f"数据库大小 {db_size_mb}MB 超过限制,执行优化...")
|
||||||
|
|
||||||
|
if remove_days > 1:
|
||||||
|
remove_days = remove_days - 1
|
||||||
|
|
||||||
|
self.queue_operation.clean_old_messages(remove_days)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"数据库优化错误: {str(e)}")
|
||||||
|
|
||||||
|
return remove_days
|
||||||
|
|
||||||
|
def _get_database_size(self) -> int:
|
||||||
|
"""获取数据库文件大小"""
|
||||||
|
# 实现获取数据库文件大小的逻辑
|
||||||
|
total_size = 0
|
||||||
|
db_path = self.queue_operation.db_dir
|
||||||
|
for root, _, files in os.walk(db_path):
|
||||||
|
for file in files:
|
||||||
|
file_path = os.path.join(root, file)
|
||||||
|
total_size += os.path.getsize(file_path)
|
||||||
|
return total_size // (1024 * 1024)
|
||||||
|
|
||||||
def start_cleanup(self):
|
def start_cleanup(self):
|
||||||
if self.is_running:
|
if self.is_running:
|
||||||
return
|
return
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user