FastAPI Routers
Last updated
Last updated
It can become quite cluttered if you have all your routes in the main.py
file
That's why the best approach is to split this into 2 files
One of them should contain all the CRUD operations
The other should contain all the user paths
This is not as simple as just moving the paths to a different file
This will be done by something called routers
from typing import List
from fastapi import Depends, FastAPI, Response, status, HTTPException
from sqlalchemy.orm import Session
from . import models, schemas, utils
from app.database import engine, get_db
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# while True:
# try:
# # Remote install
# conn = psycopg.connect("host='postgres-01.lan' port='5432' dbname='fastapi' user='fastapi' password='Password1'")
# # Local install
# conn = psycopg.connect("host='localhost' port='5432' dbname='fastapi' user='postgres' password='Password1'")
# cursor = conn.cursor()
# print("Database connection successful")
# break
# except Exception as error:
# print("Connecting to database failed")
# print("Error: ", error)
# time.sleep(2)
# Default Langing page
@app.get("/")
def root():
return {"message": "Hello World"}
# Get all posts that are stored in DB/Memory (Depending on backend)
@app.get("/posts", response_model=List[schemas.PostResponse])
def get_posts(db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" SELECT * FROM posts """)
# posts = cursor.fetchall()
# SQLAlchemy (ORM)
posts = db.query(models.Post).all()
return posts
# Creates new post and returns said post
@app.post("/posts", status_code=status.HTTP_201_CREATED, response_model=schemas.PostResponse)
def create_posts(post: schemas.PostCreate, db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" INSERT INTO posts (title, content, published) VALUES (%s, %s, %s) RETURNING *""", (post.title, post.content, post.published))
# new_post = cursor.fetchone()
# conn.commit() # Required for committing the changes
# SQLAlchemy
new_post = models.Post(**post.dict())
# Add post to database
db.add(new_post)
# Commit it to the database
db.commit()
# Returning - Basically refresh table and retrieve the post in order to get the
db.refresh(new_post)
return new_post
# Note that this is sigunlar to retrieve one single post
# Has {id} because the user needs to specify the id of the post
@app.get("/posts/{id}",response_model=schemas.PostResponse)
# 1. You would keep this as int to validate the number correctly so that the user does not pass an actual string
def get_post(id: int, db: Session = Depends(get_db)):
# 2. You would convert it to a string in the query because it expects a string
# Raw SQL
# cursor.execute(""" SELECT * FROM posts WHERE id = %s """, (str(id),)) # Note you need to set it up as (id), and not (id) - Weird issue where you need to pass tuple in
# post = cursor.fetchone()
# SQLAlchemy
post = db.query(models.Post).filter(models.Post.id == id).first()
if not post:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with id {id} was not found")
return post
# Delete a post
@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_post(id: int, db: Session = Depends(get_db)):
# Deleting posts
# find the index in the array that requires the ID
# my_posts.pop()
# Raw SQL
# cursor.execute(""" DELETE FROM posts * WHERE id = %s RETURNING *""", (str(id),))
# deleted_post = cursor.fetchone()
# conn.commit()
# SQLAlchemy
post = db.query(models.Post).filter(models.Post.id == id)
if post.first() == None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with {id} does not exist")
post.delete(synchronize_session=False)
db.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
@app.put("/posts/{id}", response_model=schemas.PostResponse)
def update_posts(id: int, post_schema: schemas.PostCreate, db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" UPDATE posts SET title = %s, content = %s, published = %s WHERE id = %s RETURNING *""", (post.title, post.content, post.published, str(id)))
# updated_post = cursor.fetchone()
# conn.commit()
# SQLAlchemy
post_query = db.query(models.Post).filter(models.Post.id == id)
post = post_query.first()
# Raising a 404 if post index position of post is not found
if post == None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with {id} does not exist")
post_query.update(post_schema.dict(), synchronize_session=False)
db.commit()
return post_query.first() # Return new post post
# Create USER
@app.post("/users", status_code=status.HTTP_201_CREATED, response_model=schemas.UserOut)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# Hash the Password - user.password
hashed_password = utils.hash(user.password)
user.password = hashed_password
new_user = models.User(**user.dict())
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
# Get USER by ID
@app.get("/users/{id}", response_model=schemas.UserOut)
def get_user(id: int, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == id).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with id: {id} does not exist")
return user
Fist we will add a folder called routers
Within this we will create 2 files:
post.py
users.py
Now we will have to move all the routes from main.py
The ones for posts in post.py
The ones for users in users.py
Note: This will create a lot of issues initially as we will have to do all the imports for the files. These were initially imported in main.py
from fastapi import Depends, FastAPI, Response, status, HTTPException
from .. import models, schemas, utils
from sqlalchemy.orm import Session
from app.database import get_db
# Create USER
@app.post("/users", status_code=status.HTTP_201_CREATED, response_model=schemas.UserOut)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# Hash the Password - user.password
hashed_password = utils.hash(user.password)
user.password = hashed_password
new_user = models.User(**user.dict())
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
# Get USER by ID
@app.get("/users/{id}", response_model=schemas.UserOut)
def get_user(id: int, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == id).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with id: {id} does not exist")
return user
from fastapi import Depends, FastAPI, Response, status, HTTPException
from .. import models, schemas
from sqlalchemy.orm import Session
from app.database import get_db
from typing import List
# Get all posts that are stored in DB/Memory (Depending on backend)
@app.get("/posts", response_model=List[schemas.PostResponse])
def get_posts(db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" SELECT * FROM posts """)
# posts = cursor.fetchall()
# SQLAlchemy (ORM)
posts = db.query(models.Post).all()
return posts
# Creates new post and returns said post
@app.post("/posts", status_code=status.HTTP_201_CREATED, response_model=schemas.PostResponse)
def create_posts(post: schemas.PostCreate, db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" INSERT INTO posts (title, content, published) VALUES (%s, %s, %s) RETURNING *""", (post.title, post.content, post.published))
# new_post = cursor.fetchone()
# conn.commit() # Required for committing the changes
# SQLAlchemy
new_post = models.Post(**post.dict())
# Add post to database
db.add(new_post)
# Commit it to the database
db.commit()
# Returning - Basically refresh table and retrieve the post in order to get the
db.refresh(new_post)
return new_post
# Note that this is sigunlar to retrieve one single post
# Has {id} because the user needs to specify the id of the post
@app.get("/posts/{id}",response_model=schemas.PostResponse)
# 1. You would keep this as int to validate the number correctly so that the user does not pass an actual string
def get_post(id: int, db: Session = Depends(get_db)):
# 2. You would convert it to a string in the query because it expects a string
# Raw SQL
# cursor.execute(""" SELECT * FROM posts WHERE id = %s """, (str(id),)) # Note you need to set it up as (id), and not (id) - Weird issue where you need to pass tuple in
# post = cursor.fetchone()
# SQLAlchemy
post = db.query(models.Post).filter(models.Post.id == id).first()
if not post:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with id {id} was not found")
return post
# Delete a post
@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_post(id: int, db: Session = Depends(get_db)):
# Deleting posts
# find the index in the array that requires the ID
# my_posts.pop()
# Raw SQL
# cursor.execute(""" DELETE FROM posts * WHERE id = %s RETURNING *""", (str(id),))
# deleted_post = cursor.fetchone()
# conn.commit()
# SQLAlchemy
post = db.query(models.Post).filter(models.Post.id == id)
if post.first() == None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with {id} does not exist")
post.delete(synchronize_session=False)
db.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
@app.put("/posts/{id}", response_model=schemas.PostResponse)
def update_posts(id: int, post_schema: schemas.PostCreate, db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" UPDATE posts SET title = %s, content = %s, published = %s WHERE id = %s RETURNING *""", (post.title, post.content, post.published, str(id)))
# updated_post = cursor.fetchone()
# conn.commit()
# SQLAlchemy
post_query = db.query(models.Post).filter(models.Post.id == id)
post = post_query.first()
# Raising a 404 if post index position of post is not found
if post == None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with {id} does not exist")
post_query.update(post_schema.dict(), synchronize_session=False)
db.commit()
return post_query.first() # Return new post post
Fist instinct would be to import the app
object but that is not exactly correct
We will have to make use of the routers
We will have to import APIRouters
first
from fastapi import Depends, FastAPI, Response, status, HTTPException, APIRouter
router = APIRouter
Next we will create the router
object
After which we will replace the app
object with the router
object
from fastapi import Depends, FastAPI, Response, status, HTTPException, APIRouter
from .. import models, schemas, utils
from sqlalchemy.orm import Session
from app.database import get_db
router = APIRouter()
# Create USER
@router.post("/users", status_code=status.HTTP_201_CREATED, response_model=schemas.UserOut)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# Hash the Password - user.password
hashed_password = utils.hash(user.password)
user.password = hashed_password
new_user = models.User(**user.dict())
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
# Get USER by ID
@router.get("/users/{id}", response_model=schemas.UserOut)
def get_user(id: int, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == id).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with id: {id} does not exist")
return user
It's more or less the same for the post.py
file as well
from fastapi import Depends, FastAPI, Response, status, HTTPException, APIRouter
from .. import models, schemas
from sqlalchemy.orm import Session
from app.database import get_db
from typing import List
router = APIRouter()
# Get all posts that are stored in DB/Memory (Depending on backend)
@router.get("/posts", response_model=List[schemas.PostResponse])
def get_posts(db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" SELECT * FROM posts """)
# posts = cursor.fetchall()
# SQLAlchemy (ORM)
posts = db.query(models.Post).all()
return posts
# Creates new post and returns said post
@router.post("/posts", status_code=status.HTTP_201_CREATED, response_model=schemas.PostResponse)
def create_posts(post: schemas.PostCreate, db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" INSERT INTO posts (title, content, published) VALUES (%s, %s, %s) RETURNING *""", (post.title, post.content, post.published))
# new_post = cursor.fetchone()
# conn.commit() # Required for committing the changes
# SQLAlchemy
new_post = models.Post(**post.dict())
# Add post to database
db.add(new_post)
# Commit it to the database
db.commit()
# Returning - Basically refresh table and retrieve the post in order to get the
db.refresh(new_post)
return new_post
# Note that this is sigunlar to retrieve one single post
# Has {id} because the user needs to specify the id of the post
@router.get("/posts/{id}",response_model=schemas.PostResponse)
# 1. You would keep this as int to validate the number correctly so that the user does not pass an actual string
def get_post(id: int, db: Session = Depends(get_db)):
# 2. You would convert it to a string in the query because it expects a string
# Raw SQL
# cursor.execute(""" SELECT * FROM posts WHERE id = %s """, (str(id),)) # Note you need to set it up as (id), and not (id) - Weird issue where you need to pass tuple in
# post = cursor.fetchone()
# SQLAlchemy
post = db.query(models.Post).filter(models.Post.id == id).first()
if not post:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with id {id} was not found")
return post
# Delete a post
@router.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_post(id: int, db: Session = Depends(get_db)):
# Deleting posts
# find the index in the array that requires the ID
# my_posts.pop()
# Raw SQL
# cursor.execute(""" DELETE FROM posts * WHERE id = %s RETURNING *""", (str(id),))
# deleted_post = cursor.fetchone()
# conn.commit()
# SQLAlchemy
post = db.query(models.Post).filter(models.Post.id == id)
if post.first() == None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with {id} does not exist")
post.delete(synchronize_session=False)
db.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.put("/posts/{id}", response_model=schemas.PostResponse)
def update_posts(id: int, post_schema: schemas.PostCreate, db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" UPDATE posts SET title = %s, content = %s, published = %s WHERE id = %s RETURNING *""", (post.title, post.content, post.published, str(id)))
# updated_post = cursor.fetchone()
# conn.commit()
# SQLAlchemy
post_query = db.query(models.Post).filter(models.Post.id == id)
post = post_query.first()
# Raising a 404 if post index position of post is not found
if post == None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with {id} does not exist")
post_query.update(post_schema.dict(), synchronize_session=False)
db.commit()
return post_query.first() # Return new post post
Otherwise the API will not work
We will import the post and user from the routers folder
from fastapi import Depends, FastAPI, Response, status, HTTPException
from . import models
from app.database import engine
from .routers import post, user
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
app.include_router(post.router)
app.include_router(user.router)
# Default Langing page
@app.get("/")
def root():
return {"message": "Hello World"}
Then we will use the app object to import the post and user routers
app.include_router(post.router)
app.include_router(user.router)
If we get a request, the above 2 lines will check the post and user files respectively and see if we have a match within those
If a request hits one of the routes in the files, it will work as it did previously with everything in main.py