# app/services/user_service.py from typing import Optional, List from app.models.user import User from app.schemas.user import UserCreate, UserUpdate from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class UserService: @staticmethod def get_password_hash(password: str) -> str: return pwd_context.hash(password) @staticmethod def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) @staticmethod async def get_all_users(skip: int = 0, limit: int = 100) -> List[User]: return await User.all().offset(skip).limit(limit) @staticmethod async def get_user_by_id(user_id: int) -> Optional[User]: return await User.filter(id=user_id).first() @staticmethod async def get_user_by_username(username: str) -> Optional[User]: return await User.filter(username=username).first() @staticmethod async def get_user_by_email(email: str) -> Optional[User]: return await User.filter(email=email).first() @staticmethod async def create_user(user_data: UserCreate) -> User: hashed_password = UserService.get_password_hash(user_data.password) user = await User.create( username=user_data.username, email=user_data.email, hashed_password=hashed_password ) return user @staticmethod async def update_user(user_id: int, user_data: UserUpdate) -> Optional[User]: user = await UserService.get_user_by_id(user_id) if not user: return None update_data = user_data.model_dump(exclude_unset=True) if "password" in update_data: update_data["hashed_password"] = UserService.get_password_hash( update_data.pop("password") ) await user.update_from_dict(update_data) await user.save() return user @staticmethod async def delete_user(user_id: int) -> bool: user = await UserService.get_user_by_id(user_id) if user: await user.delete() return True return False # app/services/item_service.py from typing import Optional, List from app.models.item import Item from app.schemas.item import ItemCreate, ItemUpdate class ItemService: @staticmethod async def get_all_items( skip: int = 0, limit: int = 100, owner_id: Optional[int] = None ) -> List[Item]: query = Item.all() if owner_id: query = query.filter(owner_id=owner_id) return await query.offset(skip).limit(limit) @staticmethod async def get_item_by_id(item_id: int) -> Optional[Item]: return await Item.filter(id=item_id).first().prefetch_related("owner") @staticmethod async def create_item(item_data: ItemCreate, owner_id: Optional[int] = None) -> Item: item = await Item.create( **item_data.model_dump(), owner_id=owner_id ) return item @staticmethod async def update_item(item_id: int, item_data: ItemUpdate) -> Optional[Item]: item = await ItemService.get_item_by_id(item_id) if not item: return None update_data = item_data.model_dump(exclude_unset=True) await item.update_from_dict(update_data) await item.save() return item @staticmethod async def delete_item(item_id: int) -> bool: item = await ItemService.get_item_by_id(item_id) if item: await item.delete() return True return False