Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Creating a Supplier Relationship Management (SRM) system using FastAPI with Domain-Driven Design (DDD) principles is a great way to build a robust and maintainable application. Below is an example of how you might structure such an application.
- ### Directory Structure
- ```
- srm_app/
- ├── app/
- │ ├── __init__.py
- │ ├── main.py
- │ ├── domain/
- │ │ ├── __init__.py
- │ │ ├── models.py
- │ │ └── repositories.py
- │ ├── application/
- │ │ ├── __init__.py
- │ │ └── services.py
- │ ├── infrastructure/
- │ │ ├── __init__.py
- │ │ └── database.py
- │ └── presentation/
- │ ├── __init__.py
- │ └── api.py
- └── requirements.txt
- ```
- ### Implementation
- #### `requirements.txt`
- ```plaintext
- fastapi
- uvicorn
- sqlalchemy
- databases
- pydantic
- python-dotenv
- ```
- #### `app/domain/models.py`
- Here we define our entities, value objects, and aggregates.
- ```python
- from dataclasses import dataclass, field
- from typing import List, Optional
- import uuid
- @dataclass(frozen=True)
- class Address:
- street: str
- city: str
- state: str
- zip_code: str
- @dataclass(frozen=True)
- class ContactInfo:
- email: str
- phone: str
- @dataclass(unsafe_hash=True)
- class Supplier:
- supplier_id: uuid.UUID = field(default_factory=uuid.uuid4, hash=True, compare=False)
- name: str
- address: Address
- contact_info: ContactInfo
- products: List[str] = field(default_factory=list)
- def add_product(self, product_name: str):
- if product_name not in self.products:
- self.products.append(product_name)
- ```
- #### `app/domain/repositories.py`
- Here we define the repository interface.
- ```python
- from typing import List, Optional
- from app.domain.models import Supplier
- class SupplierRepository:
- async def add(self, supplier: Supplier) -> None:
- raise NotImplementedError
- async def get_by_id(self, supplier_id: str) -> Optional[Supplier]:
- raise NotImplementedError
- async def list_all(self) -> List[Supplier]:
- raise NotImplementedError
- async def update(self, supplier: Supplier) -> None:
- raise NotImplementedError
- async def delete(self, supplier_id: str) -> None:
- raise NotImplementedError
- ```
- #### `app/infrastructure/database.py`
- Here we define the concrete implementation of the repository using SQLAlchemy.
- ```python
- import databases
- import sqlalchemy
- from sqlalchemy.ext.declarative import declarative_base
- from app.domain.models import Supplier, Address, ContactInfo
- from app.domain.repositories import SupplierRepository
- DATABASE_URL = "sqlite:///./test.db"
- database = databases.Database(DATABASE_URL)
- metadata = sqlalchemy.MetaData()
- suppliers = sqlalchemy.Table(
- "suppliers",
- metadata,
- sqlalchemy.Column("supplier_id", sqlalchemy.String, primary_key=True),
- sqlalchemy.Column("name", sqlalchemy.String),
- sqlalchemy.Column("street", sqlalchemy.String),
- sqlalchemy.Column("city", sqlalchemy.String),
- sqlalchemy.Column("state", sqlalchemy.String),
- sqlalchemy.Column("zip_code", sqlalchemy.String),
- sqlalchemy.Column("email", sqlalchemy.String),
- sqlalchemy.Column("phone", sqlalchemy.String),
- sqlalchemy.Column("products", sqlalchemy.JSON)
- )
- engine = sqlalchemy.create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
- metadata.create_all(engine)
- class SQLSupplierRepository(SupplierRepository):
- def __init__(self, database: databases.Database):
- self.database = database
- async def add(self, supplier: Supplier) -> None:
- query = suppliers.insert().values(
- supplier_id=str(supplier.supplier_id),
- name=supplier.name,
- street=supplier.address.street,
- city=supplier.address.city,
- state=supplier.address.state,
- zip_code=supplier.address.zip_code,
- email=supplier.contact_info.email,
- phone=supplier.contact_info.phone,
- products=supplier.products
- )
- await self.database.execute(query)
- async def get_by_id(self, supplier_id: str) -> Optional[Supplier]:
- query = suppliers.select().where(suppliers.c.supplier_id == supplier_id)
- row = await self.database.fetch_one(query)
- if row is None:
- return None
- address = Address(street=row.street, city=row.city, state=row.state, zip_code=row.zip_code)
- contact_info = ContactInfo(email=row.email, phone=row.phone)
- return Supplier(supplier_id=uuid.UUID(row.supplier_id), name=row.name, address=address, contact_info=contact_info, products=row.products)
- async def list_all(self) -> List[Supplier]:
- query = suppliers.select()
- rows = await self.database.fetch_all(query)
- result = []
- for row in rows:
- address = Address(street=row.street, city=row.city, state=row.state, zip_code=row.zip_code)
- contact_info = ContactInfo(email=row.email, phone=row.phone)
- supplier = Supplier(supplier_id=uuid.UUID(row.supplier_id), name=row.name, address=address, contact_info=contact_info, products=row.products)
- result.append(supplier)
- return result
- async def update(self, supplier: Supplier) -> None:
- query = (
- suppliers
- .update()
- .where(suppliers.c.supplier_id == str(supplier.supplier_id))
- .values(
- name=supplier.name,
- street=supplier.address.street,
- city=supplier.address.city,
- state=supplier.address.state,
- zip_code=supplier.address.zip_code,
- email=supplier.contact_info.email,
- phone=supplier.contact_info.phone,
- products=supplier.products
- )
- )
- await self.database.execute(query)
- async def delete(self, supplier_id: str) -> None:
- query = suppliers.delete().where(suppliers.c.supplier_id == supplier_id)
- await self.database.execute(query)
- ```
- #### `app/application/services.py`
- Here we define our application services.
- ```python
- from typing import List
- from app.domain.models import Supplier
- from app.domain.repositories import SupplierRepository
- class SupplierService:
- def __init__(self, repository: SupplierRepository):
- self.repository = repository
- async def add_supplier(self, name: str, address: Address, contact_info: ContactInfo) -> None:
- supplier = Supplier(name=name, address=address, contact_info=contact_info)
- await self.repository.add(supplier)
- async def get_supplier_by_id(self, supplier_id: str) -> Optional[Supplier]:
- return await self.repository.get_by_id(supplier_id)
- async def list_suppliers(self) -> List[Supplier]:
- return await self.repository.list_all()
- async def update_supplier(self, supplier_id: str, name: str, address: Address, contact_info: ContactInfo) -> None:
- existing_supplier = await self.repository.get_by_id(supplier_id)
- if not existing_supplier:
- raise ValueError("Supplier not found")
- existing_supplier.name = name
- existing_supplier.address = address
- existing_supplier.contact_info = contact_info
- await self.repository.update(existing_supplier)
- async def delete_supplier(self, supplier_id: str) -> None:
- existing_supplier = await self.repository.get_by_id(supplier_id)
- if not existing_supplier:
- raise ValueError("Supplier not found")
- await self.repository.delete(supplier_id)
- ```
- #### `app/presentation/api.py`
- Here we define our FastAPI routes.
- ```python
- from fastapi import FastAPI, Depends, HTTPException
- from pydantic import BaseModel
- from app.infrastructure.database import SQLSupplierRepository, database
- from app.application.services import SupplierService
- from app.domain.models import Address, ContactInfo
- app = FastAPI()
- @app.on_event("startup")
- async def startup():
- await database.connect()
- @app.on_event("shutdown")
- async def shutdown():
- await database.disconnect()
- def get_repository() -> SQLSupplierRepository:
- return SQLSupplierRepository(database)
- def get_service(repository: SQLSupplierRepository = Depends(get_repository)) -> SupplierService:
- return SupplierService(repository)
- class AddressSchema(BaseModel):
- street: str
- city: str
- state: str
- zip_code: str
- class ContactInfoSchema(BaseModel):
- email: str
- phone: str
- class SupplierCreateSchema(BaseModel):
- name: str
- address: AddressSchema
- contact_info: ContactInfoSchema
- class SupplierResponseSchema(BaseModel):
- supplier_id: str
- name: str
- address: AddressSchema
- contact_info: ContactInfoSchema
- @app.post("/suppliers/", response_model=SupplierResponseSchema)
- async def create_supplier(supplier_create: SupplierCreateSchema, service: SupplierService = Depends(get_service)):
- await service.add_supplier(
- name=supplier_create.name,
- address=Address(**supplier_create.address.dict()),
- contact_info=ContactInfo(**supplier_create.contact_info.dict())
- )
- return {"supplier_id": supplier_create.name, **supplier_create.dict()}
- @app.get("/suppliers/{supplier_id}", response_model=SupplierResponseSchema)
- async def read_supplier(supplier_id: str, service: SupplierService = Depends(get_service)):
- supplier = await service.get_supplier_by_id(supplier_id)
- if not supplier:
- raise HTTPException(status_code=404, detail="Supplier not found")
- return {"supplier_id": str(supplier.supplier_id), "name": supplier.name, "address": supplier.address.dict(), "contact_info": supplier.contact_info.dict()}
- @app.get("/suppliers/", response_model=list[SupplierResponseSchema])
- async def read_suppliers(service: SupplierService = Depends(get_service)):
- suppliers = await service.list_suppliers()
- return [{"supplier_id": str(supplier.supplier_id), "name": supplier.name, "address": supplier.address.dict(), "contact_info": supplier.contact_info.dict()} for supplier in suppliers]
- @app.put("/suppliers/{supplier_id}", response_model=SupplierResponseSchema)
- async def update_supplier(supplier_id: str, supplier_create: SupplierCreateSchema, service: SupplierService = Depends(get_service)):
- await service.update_supplier(
- supplier_id=supplier_id,
- name=supplier_create.name,
- address=Address(**supplier_create.address.dict()),
- contact_info=ContactInfo(**supplier_create.contact_info.dict())
- )
- return {"supplier_id": supplier_id, **supplier_create.dict()}
- @app.delete("/suppliers/{supplier_id}")
- async def delete_supplier(supplier_id: str, service: SupplierService = Depends(get_service)):
- await service.delete_supplier(supplier_id)
- return {"detail": "Supplier deleted"}
- ```
- #### `app/main.py`
- Finally, we define the main entry point for our FastAPI application.
- ```python
- from app.presentation.api import app
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=8000)
- ```
- This is a complete example of a FastAPI application using the Hexagonal Architecture (Ports and Adapters). The `app` directory contains all the necessary components:
- - **domain/models.py**: Defines the domain models.
- - **domain/repositories.py**: Defines the repository interfaces.
- - **application/services.py**: Contains the application services.
- - **infrastructure/database.py**: Implements the repository using SQLAlchemy.
- - **presentation/api.py**: Defines the FastAPI routes.
- - **main.py**: The main entry point for the application.
- You can run this application using `uvicorn`:
- ```bash
- uvicorn app.main:app --reload
- ```
- This will start the FastAPI server on `http://0.0.0.0:8000`. You can interact with the API using tools like Postman or directly from your browser by navigating to `http://127.0.0.1:8000/docs` for the Swagger UI.
- This example demonstrates how to structure a FastAPI application in a way that adheres to the Hexagonal Architecture, promoting separation of concerns and testability.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement