Advertisement
Guest User

Untitled

a guest
Nov 11th, 2024
2,490
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 12.21 KB | None | 0 0
  1. Creating a Supplier Relationship Management (SRM) system using FastAPI with Domain-Driven Design (DDD) principles is a great way to build a robust and maintainable application. Below is an example of how you might structure such an application.
  2.  
  3. ### Directory Structure
  4.  
  5. ```
  6. srm_app/
  7. ├── app/
  8. │ ├── __init__.py
  9. │ ├── main.py
  10. │ ├── domain/
  11. │ │ ├── __init__.py
  12. │ │ ├── models.py
  13. │ │ └── repositories.py
  14. │ ├── application/
  15. │ │ ├── __init__.py
  16. │ │ └── services.py
  17. │ ├── infrastructure/
  18. │ │ ├── __init__.py
  19. │ │ └── database.py
  20. │ └── presentation/
  21. │ ├── __init__.py
  22. │ └── api.py
  23. └── requirements.txt
  24. ```
  25.  
  26. ### Implementation
  27.  
  28. #### `requirements.txt`
  29. ```plaintext
  30. fastapi
  31. uvicorn
  32. sqlalchemy
  33. databases
  34. pydantic
  35. python-dotenv
  36. ```
  37.  
  38. #### `app/domain/models.py`
  39.  
  40. Here we define our entities, value objects, and aggregates.
  41.  
  42. ```python
  43. from dataclasses import dataclass, field
  44. from typing import List, Optional
  45. import uuid
  46.  
  47. @dataclass(frozen=True)
  48. class Address:
  49. street: str
  50. city: str
  51. state: str
  52. zip_code: str
  53.  
  54. @dataclass(frozen=True)
  55. class ContactInfo:
  56. email: str
  57. phone: str
  58.  
  59. @dataclass(unsafe_hash=True)
  60. class Supplier:
  61. supplier_id: uuid.UUID = field(default_factory=uuid.uuid4, hash=True, compare=False)
  62. name: str
  63. address: Address
  64. contact_info: ContactInfo
  65. products: List[str] = field(default_factory=list)
  66.  
  67. def add_product(self, product_name: str):
  68. if product_name not in self.products:
  69. self.products.append(product_name)
  70. ```
  71.  
  72. #### `app/domain/repositories.py`
  73.  
  74. Here we define the repository interface.
  75.  
  76. ```python
  77. from typing import List, Optional
  78. from app.domain.models import Supplier
  79.  
  80. class SupplierRepository:
  81. async def add(self, supplier: Supplier) -> None:
  82. raise NotImplementedError
  83.  
  84. async def get_by_id(self, supplier_id: str) -> Optional[Supplier]:
  85. raise NotImplementedError
  86.  
  87. async def list_all(self) -> List[Supplier]:
  88. raise NotImplementedError
  89.  
  90. async def update(self, supplier: Supplier) -> None:
  91. raise NotImplementedError
  92.  
  93. async def delete(self, supplier_id: str) -> None:
  94. raise NotImplementedError
  95. ```
  96.  
  97. #### `app/infrastructure/database.py`
  98.  
  99. Here we define the concrete implementation of the repository using SQLAlchemy.
  100.  
  101. ```python
  102. import databases
  103. import sqlalchemy
  104. from sqlalchemy.ext.declarative import declarative_base
  105. from app.domain.models import Supplier, Address, ContactInfo
  106. from app.domain.repositories import SupplierRepository
  107.  
  108. DATABASE_URL = "sqlite:///./test.db"
  109. database = databases.Database(DATABASE_URL)
  110. metadata = sqlalchemy.MetaData()
  111.  
  112. suppliers = sqlalchemy.Table(
  113. "suppliers",
  114. metadata,
  115. sqlalchemy.Column("supplier_id", sqlalchemy.String, primary_key=True),
  116. sqlalchemy.Column("name", sqlalchemy.String),
  117. sqlalchemy.Column("street", sqlalchemy.String),
  118. sqlalchemy.Column("city", sqlalchemy.String),
  119. sqlalchemy.Column("state", sqlalchemy.String),
  120. sqlalchemy.Column("zip_code", sqlalchemy.String),
  121. sqlalchemy.Column("email", sqlalchemy.String),
  122. sqlalchemy.Column("phone", sqlalchemy.String),
  123. sqlalchemy.Column("products", sqlalchemy.JSON)
  124. )
  125.  
  126. engine = sqlalchemy.create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
  127. metadata.create_all(engine)
  128.  
  129. class SQLSupplierRepository(SupplierRepository):
  130. def __init__(self, database: databases.Database):
  131. self.database = database
  132.  
  133. async def add(self, supplier: Supplier) -> None:
  134. query = suppliers.insert().values(
  135. supplier_id=str(supplier.supplier_id),
  136. name=supplier.name,
  137. street=supplier.address.street,
  138. city=supplier.address.city,
  139. state=supplier.address.state,
  140. zip_code=supplier.address.zip_code,
  141. email=supplier.contact_info.email,
  142. phone=supplier.contact_info.phone,
  143. products=supplier.products
  144. )
  145. await self.database.execute(query)
  146.  
  147. async def get_by_id(self, supplier_id: str) -> Optional[Supplier]:
  148. query = suppliers.select().where(suppliers.c.supplier_id == supplier_id)
  149. row = await self.database.fetch_one(query)
  150. if row is None:
  151. return None
  152. address = Address(street=row.street, city=row.city, state=row.state, zip_code=row.zip_code)
  153. contact_info = ContactInfo(email=row.email, phone=row.phone)
  154. return Supplier(supplier_id=uuid.UUID(row.supplier_id), name=row.name, address=address, contact_info=contact_info, products=row.products)
  155.  
  156. async def list_all(self) -> List[Supplier]:
  157. query = suppliers.select()
  158. rows = await self.database.fetch_all(query)
  159. result = []
  160. for row in rows:
  161. address = Address(street=row.street, city=row.city, state=row.state, zip_code=row.zip_code)
  162. contact_info = ContactInfo(email=row.email, phone=row.phone)
  163. supplier = Supplier(supplier_id=uuid.UUID(row.supplier_id), name=row.name, address=address, contact_info=contact_info, products=row.products)
  164. result.append(supplier)
  165. return result
  166.  
  167. async def update(self, supplier: Supplier) -> None:
  168. query = (
  169. suppliers
  170. .update()
  171. .where(suppliers.c.supplier_id == str(supplier.supplier_id))
  172. .values(
  173. name=supplier.name,
  174. street=supplier.address.street,
  175. city=supplier.address.city,
  176. state=supplier.address.state,
  177. zip_code=supplier.address.zip_code,
  178. email=supplier.contact_info.email,
  179. phone=supplier.contact_info.phone,
  180. products=supplier.products
  181. )
  182. )
  183. await self.database.execute(query)
  184.  
  185. async def delete(self, supplier_id: str) -> None:
  186. query = suppliers.delete().where(suppliers.c.supplier_id == supplier_id)
  187. await self.database.execute(query)
  188. ```
  189.  
  190. #### `app/application/services.py`
  191.  
  192. Here we define our application services.
  193.  
  194. ```python
  195. from typing import List
  196. from app.domain.models import Supplier
  197. from app.domain.repositories import SupplierRepository
  198.  
  199. class SupplierService:
  200. def __init__(self, repository: SupplierRepository):
  201. self.repository = repository
  202.  
  203. async def add_supplier(self, name: str, address: Address, contact_info: ContactInfo) -> None:
  204. supplier = Supplier(name=name, address=address, contact_info=contact_info)
  205. await self.repository.add(supplier)
  206.  
  207. async def get_supplier_by_id(self, supplier_id: str) -> Optional[Supplier]:
  208. return await self.repository.get_by_id(supplier_id)
  209.  
  210. async def list_suppliers(self) -> List[Supplier]:
  211. return await self.repository.list_all()
  212.  
  213. async def update_supplier(self, supplier_id: str, name: str, address: Address, contact_info: ContactInfo) -> None:
  214. existing_supplier = await self.repository.get_by_id(supplier_id)
  215. if not existing_supplier:
  216. raise ValueError("Supplier not found")
  217.  
  218. existing_supplier.name = name
  219. existing_supplier.address = address
  220. existing_supplier.contact_info = contact_info
  221.  
  222. await self.repository.update(existing_supplier)
  223.  
  224. async def delete_supplier(self, supplier_id: str) -> None:
  225. existing_supplier = await self.repository.get_by_id(supplier_id)
  226. if not existing_supplier:
  227. raise ValueError("Supplier not found")
  228.  
  229. await self.repository.delete(supplier_id)
  230. ```
  231.  
  232. #### `app/presentation/api.py`
  233.  
  234. Here we define our FastAPI routes.
  235.  
  236. ```python
  237. from fastapi import FastAPI, Depends, HTTPException
  238. from pydantic import BaseModel
  239. from app.infrastructure.database import SQLSupplierRepository, database
  240. from app.application.services import SupplierService
  241. from app.domain.models import Address, ContactInfo
  242.  
  243. app = FastAPI()
  244.  
  245. @app.on_event("startup")
  246. async def startup():
  247. await database.connect()
  248.  
  249. @app.on_event("shutdown")
  250. async def shutdown():
  251. await database.disconnect()
  252.  
  253. def get_repository() -> SQLSupplierRepository:
  254. return SQLSupplierRepository(database)
  255.  
  256. def get_service(repository: SQLSupplierRepository = Depends(get_repository)) -> SupplierService:
  257. return SupplierService(repository)
  258.  
  259. class AddressSchema(BaseModel):
  260. street: str
  261. city: str
  262. state: str
  263. zip_code: str
  264.  
  265. class ContactInfoSchema(BaseModel):
  266. email: str
  267. phone: str
  268.  
  269. class SupplierCreateSchema(BaseModel):
  270. name: str
  271. address: AddressSchema
  272. contact_info: ContactInfoSchema
  273.  
  274. class SupplierResponseSchema(BaseModel):
  275. supplier_id: str
  276. name: str
  277. address: AddressSchema
  278. contact_info: ContactInfoSchema
  279.  
  280. @app.post("/suppliers/", response_model=SupplierResponseSchema)
  281. async def create_supplier(supplier_create: SupplierCreateSchema, service: SupplierService = Depends(get_service)):
  282. await service.add_supplier(
  283. name=supplier_create.name,
  284. address=Address(**supplier_create.address.dict()),
  285. contact_info=ContactInfo(**supplier_create.contact_info.dict())
  286. )
  287. return {"supplier_id": supplier_create.name, **supplier_create.dict()}
  288.  
  289. @app.get("/suppliers/{supplier_id}", response_model=SupplierResponseSchema)
  290. async def read_supplier(supplier_id: str, service: SupplierService = Depends(get_service)):
  291. supplier = await service.get_supplier_by_id(supplier_id)
  292. if not supplier:
  293. raise HTTPException(status_code=404, detail="Supplier not found")
  294. return {"supplier_id": str(supplier.supplier_id), "name": supplier.name, "address": supplier.address.dict(), "contact_info": supplier.contact_info.dict()}
  295.  
  296. @app.get("/suppliers/", response_model=list[SupplierResponseSchema])
  297. async def read_suppliers(service: SupplierService = Depends(get_service)):
  298. suppliers = await service.list_suppliers()
  299. return [{"supplier_id": str(supplier.supplier_id), "name": supplier.name, "address": supplier.address.dict(), "contact_info": supplier.contact_info.dict()} for supplier in suppliers]
  300.  
  301. @app.put("/suppliers/{supplier_id}", response_model=SupplierResponseSchema)
  302. async def update_supplier(supplier_id: str, supplier_create: SupplierCreateSchema, service: SupplierService = Depends(get_service)):
  303. await service.update_supplier(
  304. supplier_id=supplier_id,
  305. name=supplier_create.name,
  306. address=Address(**supplier_create.address.dict()),
  307. contact_info=ContactInfo(**supplier_create.contact_info.dict())
  308. )
  309. return {"supplier_id": supplier_id, **supplier_create.dict()}
  310.  
  311. @app.delete("/suppliers/{supplier_id}")
  312. async def delete_supplier(supplier_id: str, service: SupplierService = Depends(get_service)):
  313. await service.delete_supplier(supplier_id)
  314. return {"detail": "Supplier deleted"}
  315. ```
  316.  
  317. #### `app/main.py`
  318.  
  319. Finally, we define the main entry point for our FastAPI application.
  320.  
  321. ```python
  322. from app.presentation.api import app
  323.  
  324. if __name__ == "__main__":
  325. import uvicorn
  326. uvicorn.run(app, host="0.0.0.0", port=8000)
  327. ```
  328.  
  329. This is a complete example of a FastAPI application using the Hexagonal Architecture (Ports and Adapters). The `app` directory contains all the necessary components:
  330.  
  331. - **domain/models.py**: Defines the domain models.
  332. - **domain/repositories.py**: Defines the repository interfaces.
  333. - **application/services.py**: Contains the application services.
  334. - **infrastructure/database.py**: Implements the repository using SQLAlchemy.
  335. - **presentation/api.py**: Defines the FastAPI routes.
  336. - **main.py**: The main entry point for the application.
  337.  
  338. You can run this application using `uvicorn`:
  339.  
  340. ```bash
  341. uvicorn app.main:app --reload
  342. ```
  343.  
  344. This will start the FastAPI server on `http://0.0.0.0:8000`. You can interact with the API using tools like Postman or directly from your browser by navigating to `http://127.0.0.1:8000/docs` for the Swagger UI.
  345.  
  346. This example demonstrates how to structure a FastAPI application in a way that adheres to the Hexagonal Architecture, promoting separation of concerns and testability.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement