from typing import List
from fastapi import Depends, FastAPI, Response, status, HTTPException
from sqlalchemy.orm import Session
from . import models, schemas, utils
from app.database import engine, get_db
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# while True:
# try:
# # Remote install
# conn = psycopg.connect("host='postgres-01.lan' port='5432' dbname='fastapi' user='fastapi' password='Password1'")
# # Local install
# conn = psycopg.connect("host='localhost' port='5432' dbname='fastapi' user='postgres' password='Password1'")
# cursor = conn.cursor()
# print("Database connection successful")
# break
# except Exception as error:
# print("Connecting to database failed")
# print("Error: ", error)
# time.sleep(2)
# Default Langing page
@app.get("/")
def root():
return {"message": "Hello World"}
# Get all posts that are stored in DB/Memory (Depending on backend)
@app.get("/posts", response_model=List[schemas.PostResponse])
def get_posts(db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" SELECT * FROM posts """)
# posts = cursor.fetchall()
# SQLAlchemy (ORM)
posts = db.query(models.Post).all()
return posts
# Creates new post and returns said post
@app.post("/posts", status_code=status.HTTP_201_CREATED, response_model=schemas.PostResponse)
def create_posts(post: schemas.PostCreate, db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" INSERT INTO posts (title, content, published) VALUES (%s, %s, %s) RETURNING *""", (post.title, post.content, post.published))
# new_post = cursor.fetchone()
# conn.commit() # Required for committing the changes
# SQLAlchemy
new_post = models.Post(**post.dict())
# Add post to database
db.add(new_post)
# Commit it to the database
db.commit()
# Returning - Basically refresh table and retrieve the post in order to get the
db.refresh(new_post)
return new_post
# Note that this is sigunlar to retrieve one single post
# Has {id} because the user needs to specify the id of the post
@app.get("/posts/{id}",response_model=schemas.PostResponse)
# 1. You would keep this as int to validate the number correctly so that the user does not pass an actual string
def get_post(id: int, db: Session = Depends(get_db)):
# 2. You would convert it to a string in the query because it expects a string
# Raw SQL
# cursor.execute(""" SELECT * FROM posts WHERE id = %s """, (str(id),)) # Note you need to set it up as (id), and not (id) - Weird issue where you need to pass tuple in
# post = cursor.fetchone()
# SQLAlchemy
post = db.query(models.Post).filter(models.Post.id == id).first()
if not post:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with id {id} was not found")
return post
# Delete a post
@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_post(id: int, db: Session = Depends(get_db)):
# Deleting posts
# find the index in the array that requires the ID
# my_posts.pop()
# Raw SQL
# cursor.execute(""" DELETE FROM posts * WHERE id = %s RETURNING *""", (str(id),))
# deleted_post = cursor.fetchone()
# conn.commit()
# SQLAlchemy
post = db.query(models.Post).filter(models.Post.id == id)
if post.first() == None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with {id} does not exist")
post.delete(synchronize_session=False)
db.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
@app.put("/posts/{id}", response_model=schemas.PostResponse)
def update_posts(id: int, post_schema: schemas.PostCreate, db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" UPDATE posts SET title = %s, content = %s, published = %s WHERE id = %s RETURNING *""", (post.title, post.content, post.published, str(id)))
# updated_post = cursor.fetchone()
# conn.commit()
# SQLAlchemy
post_query = db.query(models.Post).filter(models.Post.id == id)
post = post_query.first()
# Raising a 404 if post index position of post is not found
if post == None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with {id} does not exist")
post_query.update(post_schema.dict(), synchronize_session=False)
db.commit()
return post_query.first() # Return new post post
# Create USER
@app.post("/users", status_code=status.HTTP_201_CREATED, response_model=schemas.UserOut)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# Hash the Password - user.password
hashed_password = utils.hash(user.password)
user.password = hashed_password
new_user = models.User(**user.dict())
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
# Get USER by ID
@app.get("/users/{id}", response_model=schemas.UserOut)
def get_user(id: int, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == id).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with id: {id} does not exist")
return user
from fastapi import Depends, FastAPI, Response, status, HTTPException
from .. import models, schemas, utils
from sqlalchemy.orm import Session
from app.database import get_db
# Create USER
@app.post("/users", status_code=status.HTTP_201_CREATED, response_model=schemas.UserOut)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# Hash the Password - user.password
hashed_password = utils.hash(user.password)
user.password = hashed_password
new_user = models.User(**user.dict())
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
# Get USER by ID
@app.get("/users/{id}", response_model=schemas.UserOut)
def get_user(id: int, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == id).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with id: {id} does not exist")
return user
from fastapi import Depends, FastAPI, Response, status, HTTPException
from .. import models, schemas
from sqlalchemy.orm import Session
from app.database import get_db
from typing import List
# Get all posts that are stored in DB/Memory (Depending on backend)
@app.get("/posts", response_model=List[schemas.PostResponse])
def get_posts(db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" SELECT * FROM posts """)
# posts = cursor.fetchall()
# SQLAlchemy (ORM)
posts = db.query(models.Post).all()
return posts
# Creates new post and returns said post
@app.post("/posts", status_code=status.HTTP_201_CREATED, response_model=schemas.PostResponse)
def create_posts(post: schemas.PostCreate, db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" INSERT INTO posts (title, content, published) VALUES (%s, %s, %s) RETURNING *""", (post.title, post.content, post.published))
# new_post = cursor.fetchone()
# conn.commit() # Required for committing the changes
# SQLAlchemy
new_post = models.Post(**post.dict())
# Add post to database
db.add(new_post)
# Commit it to the database
db.commit()
# Returning - Basically refresh table and retrieve the post in order to get the
db.refresh(new_post)
return new_post
# Note that this is sigunlar to retrieve one single post
# Has {id} because the user needs to specify the id of the post
@app.get("/posts/{id}",response_model=schemas.PostResponse)
# 1. You would keep this as int to validate the number correctly so that the user does not pass an actual string
def get_post(id: int, db: Session = Depends(get_db)):
# 2. You would convert it to a string in the query because it expects a string
# Raw SQL
# cursor.execute(""" SELECT * FROM posts WHERE id = %s """, (str(id),)) # Note you need to set it up as (id), and not (id) - Weird issue where you need to pass tuple in
# post = cursor.fetchone()
# SQLAlchemy
post = db.query(models.Post).filter(models.Post.id == id).first()
if not post:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with id {id} was not found")
return post
# Delete a post
@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_post(id: int, db: Session = Depends(get_db)):
# Deleting posts
# find the index in the array that requires the ID
# my_posts.pop()
# Raw SQL
# cursor.execute(""" DELETE FROM posts * WHERE id = %s RETURNING *""", (str(id),))
# deleted_post = cursor.fetchone()
# conn.commit()
# SQLAlchemy
post = db.query(models.Post).filter(models.Post.id == id)
if post.first() == None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with {id} does not exist")
post.delete(synchronize_session=False)
db.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
@app.put("/posts/{id}", response_model=schemas.PostResponse)
def update_posts(id: int, post_schema: schemas.PostCreate, db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" UPDATE posts SET title = %s, content = %s, published = %s WHERE id = %s RETURNING *""", (post.title, post.content, post.published, str(id)))
# updated_post = cursor.fetchone()
# conn.commit()
# SQLAlchemy
post_query = db.query(models.Post).filter(models.Post.id == id)
post = post_query.first()
# Raising a 404 if post index position of post is not found
if post == None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with {id} does not exist")
post_query.update(post_schema.dict(), synchronize_session=False)
db.commit()
return post_query.first() # Return new post post
from fastapi import Depends, FastAPI, Response, status, HTTPException, APIRouter
from .. import models, schemas, utils
from sqlalchemy.orm import Session
from app.database import get_db
router = APIRouter()
# Create USER
@router.post("/users", status_code=status.HTTP_201_CREATED, response_model=schemas.UserOut)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# Hash the Password - user.password
hashed_password = utils.hash(user.password)
user.password = hashed_password
new_user = models.User(**user.dict())
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
# Get USER by ID
@router.get("/users/{id}", response_model=schemas.UserOut)
def get_user(id: int, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == id).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with id: {id} does not exist")
return user
from fastapi import Depends, FastAPI, Response, status, HTTPException, APIRouter
from .. import models, schemas
from sqlalchemy.orm import Session
from app.database import get_db
from typing import List
router = APIRouter()
# Get all posts that are stored in DB/Memory (Depending on backend)
@router.get("/posts", response_model=List[schemas.PostResponse])
def get_posts(db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" SELECT * FROM posts """)
# posts = cursor.fetchall()
# SQLAlchemy (ORM)
posts = db.query(models.Post).all()
return posts
# Creates new post and returns said post
@router.post("/posts", status_code=status.HTTP_201_CREATED, response_model=schemas.PostResponse)
def create_posts(post: schemas.PostCreate, db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" INSERT INTO posts (title, content, published) VALUES (%s, %s, %s) RETURNING *""", (post.title, post.content, post.published))
# new_post = cursor.fetchone()
# conn.commit() # Required for committing the changes
# SQLAlchemy
new_post = models.Post(**post.dict())
# Add post to database
db.add(new_post)
# Commit it to the database
db.commit()
# Returning - Basically refresh table and retrieve the post in order to get the
db.refresh(new_post)
return new_post
# Note that this is sigunlar to retrieve one single post
# Has {id} because the user needs to specify the id of the post
@router.get("/posts/{id}",response_model=schemas.PostResponse)
# 1. You would keep this as int to validate the number correctly so that the user does not pass an actual string
def get_post(id: int, db: Session = Depends(get_db)):
# 2. You would convert it to a string in the query because it expects a string
# Raw SQL
# cursor.execute(""" SELECT * FROM posts WHERE id = %s """, (str(id),)) # Note you need to set it up as (id), and not (id) - Weird issue where you need to pass tuple in
# post = cursor.fetchone()
# SQLAlchemy
post = db.query(models.Post).filter(models.Post.id == id).first()
if not post:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with id {id} was not found")
return post
# Delete a post
@router.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_post(id: int, db: Session = Depends(get_db)):
# Deleting posts
# find the index in the array that requires the ID
# my_posts.pop()
# Raw SQL
# cursor.execute(""" DELETE FROM posts * WHERE id = %s RETURNING *""", (str(id),))
# deleted_post = cursor.fetchone()
# conn.commit()
# SQLAlchemy
post = db.query(models.Post).filter(models.Post.id == id)
if post.first() == None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with {id} does not exist")
post.delete(synchronize_session=False)
db.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.put("/posts/{id}", response_model=schemas.PostResponse)
def update_posts(id: int, post_schema: schemas.PostCreate, db: Session = Depends(get_db)):
# Raw SQL
# cursor.execute(""" UPDATE posts SET title = %s, content = %s, published = %s WHERE id = %s RETURNING *""", (post.title, post.content, post.published, str(id)))
# updated_post = cursor.fetchone()
# conn.commit()
# SQLAlchemy
post_query = db.query(models.Post).filter(models.Post.id == id)
post = post_query.first()
# Raising a 404 if post index position of post is not found
if post == None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"post with {id} does not exist")
post_query.update(post_schema.dict(), synchronize_session=False)
db.commit()
return post_query.first() # Return new post post
from fastapi import Depends, FastAPI, Response, status, HTTPException
from . import models
from app.database import engine
from .routers import post, user
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
app.include_router(post.router)
app.include_router(user.router)
# Default Langing page
@app.get("/")
def root():
return {"message": "Hello World"}