chongming/app/services/item_service.py
2026-01-26 11:45:09 +08:00

47 lines
1.5 KiB
Python

# app/services/item_service.py
from typing import Optional, List
from app.models.item import Item
from app.schemas.item import ItemCreate, ItemUpdate
class ItemService:
@staticmethod
async def get_all_items(
skip: int = 0,
limit: int = 100,
owner_id: Optional[int] = None
) -> List[Item]:
query = Item.all()
if owner_id:
query = query.filter(owner_id=owner_id)
return await query.offset(skip).limit(limit)
@staticmethod
async def get_item_by_id(item_id: int) -> Optional[Item]:
return await Item.filter(id=item_id).first().prefetch_related("owner")
@staticmethod
async def create_item(item_data: ItemCreate, owner_id: Optional[int] = None) -> Item:
item = await Item.create(
**item_data.model_dump(),
owner_id=owner_id
)
return item
@staticmethod
async def update_item(item_id: int, item_data: ItemUpdate) -> Optional[Item]:
item = await ItemService.get_item_by_id(item_id)
if not item:
return None
update_data = item_data.model_dump(exclude_unset=True)
await item.update_from_dict(update_data)
await item.save()
return item
@staticmethod
async def delete_item(item_id: int) -> bool:
item = await ItemService.get_item_by_id(item_id)
if item:
await item.delete()
return True
return False