banner
小鱼

小鱼's Blog

FastAPI and mongoDB for User Registration and Login

# OAuth2 Implementation of Password Hashing and Bearer JWT Token Verification#

## Introduction to JWT#

JWT stands for JSON Web Tokens.

JWT is a standard for encoding a JSON object into a long string that is compact and hard to understand. The content of a JWT is as follows:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c


image

The JWT string is not encrypted, and anyone can use it to recover the original information.

However, JWT uses a signature mechanism. When accepting a token, the signature can be used to verify the token.

A token with a validity period of one week is created using JWT. The next day, when the user accesses again with the token, they remain logged in.

The token expires after one week, at which point user authentication will fail. A new token can only be obtained by logging in again.

If a user (or a third party) tampers with the token's expiration time, authentication will fail due to a signature mismatch.

For more in-depth information about JWT tokens and how they work, please refer to the JWT official website.

## Installing python-jose#

pip install python-jose[cryptography]

Python-jose requires a compatible cryptography backend. The recommended backend is: pyca/cryptography.

Python-jose supports all features of PyJWT and also supports some additional features that may be used when integrating with other tools.

## Password Hashing#

Hashing refers to converting specific content into a garbled byte sequence (essentially a string).

Every time the exact same content is input (for example, the exact same password), the returned garbled output is always the same.

However, this garbled output cannot be converted back to the input password.

### Installing passlib#

Passlib is a Python package for handling password hashing.

It supports many secure hashing algorithms and accompanying tools.

The recommended algorithm is Bcrypt.

First, install PassLib with Bcrypt:

pip install passlib[bcrypt]

passlib can even read passwords created by security plugins from Django, Flask, and other tools.

For example, sharing data from a Django application to a FastAPI application's database. Or using the same database, gradually migrating the application from Django to FastAPI. Users can log in from either the Django application or the FastAPI application simultaneously.

### Password Hashing and Verification#

First, import passlib:

# Import passlib for password hashing
from passlib.context import CryptContext

Create a password hashing context object:

# Create a password hashing context object; CryptContext is a tool for handling password hashing and verification
# It can accept one or more password hashing schemes as parameters
# Here, the bcrypt algorithm is used
# The deprecated parameter is set to auto, meaning that when a more secure algorithm becomes available, it will automatically use the more secure algorithm
# It has two methods, verify() and hash(), used for verifying passwords and obtaining hashed passwords, respectively
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

Next, create three utility functions:

  1. Hash the user's password:
# Get the hashed password
def get_password_hash(password):
    # Hash the password using the bcrypt algorithm
    return pwd_context.hash(password)
  1. Verify if the received password matches the stored hashed password:
# Verify password
def verify_password(plain_password, hashed_password):
    # Verify the password using the bcrypt algorithm
    return pwd_context.verify(plain_password, hashed_password)
  1. For authentication, use the previous two functions to verify and return the user:
# Authenticate user information using username and password
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

## Handling JWT Tokens#

Import the installed modules.

Create a random key for signing JWT tokens.

Use the following command to generate a secure random key:

openssl rand -hex 32

If you don't have openssl, you can download it from the official website.

Start the Windows OpenSSL Command Prompt.


image

Copy the generated random key, for example:

0492fa85994a4cc7fa7a0331f4e22dbfdf3555e516fbf371f31679418963d3f9

Then, copy the generated key to the variable SECRET_KEY. Note that you should not use the key shown in this example.

Create a variable ALGORITHM specifying the JWT token signing algorithm, with the value in this example being "HS256".

Create a variable to set the token expiration time.

Define a Pydantic model for the token endpoint response.

Create a utility function to generate a new access token.

First, import jwt:

from jose import JWTError, jwt

Next, declare:

  1. The random key SECRET_KEY, which is the one generated by OpenSSL above.
  2. The JWT token signing algorithm variable ALGORITHM, which in this example is 'HS256'.
  3. The variable ACCESS_TOKEN_EXPIRE_MINUTES for setting the token expiration time in minutes.
SECRET_KEY = "0492fa85994a4cc7fa7a0331f4e22dbfdf3555e516fbf371f31679418963d3f9" 
ALGORITHM = "HS256" 
ACCESS_TOKEN_EXPIRE_MINUTES = 30

Define a model class to return the Token:

class Token(BaseModel):
    access_token: str
    token_type: str

Import the timedelta type:

from datetime import datetime, timedelta

Create a new function to generate access tokens:

# Function to generate access tokens
# data represents the data to be stored in the token, such as username, user ID, etc.
# expires_delta refers to the expiration time, which is a timedelta type variable
# If not provided, the default expiration time is used, which is hardcoded to 15 minutes
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    # to_encode is the data to be stored in the token, making a copy to prevent modifying the original data
    to_encode = data.copy()

    # expires_delta is a timedelta type variable indicating the expiration time
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # If expires_delta is empty, use the default expiration time
        expire = datetime.utcnow() + timedelta(minutes=15)

    # Store the expiration time in the token
    to_encode.update({"exp": expire})
    # Use the jwt.encode() method to generate the token; parameters are user information, random key, and encryption algorithm
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

Modify the login interface to obtain the token:

@app.post("/token", response_model=Token):
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # Call the function to verify the user, passing in the username and password
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    # If the user does not exist
    if not user:
        raise HTTPException(
            status_code=400, 
            detail="Invalid username or password",
            headers={"WWW-Authenticate": "Bearer"},
            )
    # If the user exists, generate the token
    # Assign the previously defined token expiration time to access_token_expires and pass it to expires_delta
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # Call the function to generate the token, passing in user information and expiration time
    access_token = create_access_token(
        userData={"username": user.username}, expires_delta=access_token_expires)
    
    # Wrap the generated token into the Token model class
    token = Token(access_token=access_token, token_type="bearer")

    # Return the token
    return token

## Updating Dependencies#

Update get_current_user to accept the same token as before, but this time using the JWT token.

Decode and verify the received token, then return the current user.

If the token is invalid, return an HTTP error directly.

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid authentication credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # Use the token and the defined OpenSSL-generated random key SECRET_KEY and encryption method 
        # Call jwt to decode the token and obtain user information; payload is of type dict
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # Get the username from the token, which is the username passed when generating the token
        username = payload.get("username")
        # If the username does not exist
        if username is None:
            # Raise the previously defined exception
            raise credentials_exception
    except JWTError:
        # If decoding fails, raise the exception
        raise credentials_exception
    # Pass the username to the get_user function to obtain user information
    user = get_user(fake_users_db, username=username)
    if user is None:
        # If the user does not exist, raise the exception
        raise credentials_exception
    return user

In the code:

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid authentication credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

This indicates an exception for failed verification credentials. In this code, if any exception occurs while decoding the JWT token (for example, if the token is invalid, expired, or tampered with), this exception will be raised.

It returns a status_code indicating unauthorized status 401, detail error information, and headers for the token type.

## Obtaining Information of the Currently Logged-in User#

# Get the currently logged-in user who is active and not disabled using get_current_active_user
@app.get("/users/me")
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user

## Complete Code#

# Create a password hashing context object; CryptContext is a tool for handling password hashing and verification
# It can accept one or more password hashing schemes as parameters
# Here, the bcrypt algorithm is used
# The deprecated parameter is set to auto, meaning that when a more secure algorithm becomes available, it will automatically use the more secure algorithm
# It has two methods, verify() and hash(), used for verifying passwords and obtaining hashed passwords, respectively
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Obtain the token through the username and password in /token, returning the token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWT token signing algorithm variable; here, HS256 algorithm is used, along with other algorithms like RS256, ES256, etc.
ALGORITHM = "HS256"
# Use OpenSSL to randomly generate a SECRET_KEY
SECRET_KEY = "905214810165f5b010b85f3fa7985724aa2bfba9b132a333e82b7fc6b524170e"
# Token expiration time in minutes
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Define a model class to return the token
class Token(BaseModel):
    access_token: str
    token_type: str

# Define a token data model class
class TokenData(BaseModel):
    username: Union[str, None] = None

# Function to generate access tokens
# userData represents the data to be stored in the token, such as username, user ID, etc.
# expires_delta refers to the expiration time, which is a timedelta type variable
# If not provided, the default expiration time is used, which is hardcoded to 15 minutes
def create_access_token(userData: dict, expires_delta: Union[timedelta, None] = None):
    # to_encode is the data to be stored in the token, making a copy to prevent modifying the original data
    to_encode = userData.copy()

    # expires_delta is a timedelta type variable indicating the expiration time
    # Use datetime.utcnow() to get the current time + expiration time to obtain the token expiration time
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # If expires_delta is empty, use the default expiration time
        expire = datetime.utcnow() + timedelta(minutes=15)

    # Store the expiration time in the token
    to_encode.update({"exp": expire})
    # Use jwt.encode() method to generate the token; parameters are user information, random key, and encryption algorithm
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


# Simulated database
fake_users_db = {
    "bryce": {
        "username": "bryce",
        "full_name": "bryce Yu",
        "email": "[email protected]",
        # 123123
        "hashed_password": "$2b$12$w2IXuK5lx6mMxNVBBoxRl.jHYWRblX1QjVg6UCtBZ7BzBMOH7RvkW",
        "disabled": False,
    },
    "yu": {
        "username": "yu",
        "full_name": "yu",
        "email": "[email protected]",
        # 123456
        "hashed_password": "$2b$12$BknHh482Zqkt9ODqSSbSiepUBdL1GGoeaxKCnGNXmfUbjbe7LKcxy",
        "disabled": True,
    },
}

# Create a User model class
class UserAuth(BaseModel):
    username: str
    # Initially use Union[str, None], later will change to EmailStr
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None

# Create a User database model class
class UserDB(UserAuth):
    hashed_password: str

# Verify password
def verify_password(plain_password, hashed_password):
    # Verify the password using the bcrypt algorithm
    return pwd_context.verify(plain_password, hashed_password)

# Get the hashed password
def get_password_hash(password):
    # Hash the password using the bcrypt algorithm
    return pwd_context.hash(password)

# Authenticate user information using username and password
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

# Get user information from the db database by username
def get_user(db, username: str):
    # Currently, it is unsafe to use the username as a token
    if username in db:
        user_dict = db[username]
        return UserDB(**user_dict)

# This method will use the dependency oauth2_scheme to obtain the token and verify the token's validity
# If the token is valid, call the get_user function to obtain the current user's information and return it
# If invalid, raise an exception
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid authentication credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # Use the token and the defined OpenSSL-generated random key SECRET_KEY and encryption method 
        # Call jwt to decode the token and obtain user information; payload is of type dict
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # Get the username from the token, which is the username passed when generating the token
        username = payload.get("username")
        # If the username does not exist
        if username is None:
            # Raise the previously defined exception
            raise credentials_exception
    except JWTError:
        # If decoding fails, raise the exception
        raise credentials_exception
    # Pass the username to the get_user function to obtain user information
    user = get_user(fake_users_db, username=username)
    if user is None:
        # If the user does not exist, raise the exception
        raise credentials_exception
    return user


# Get the currently logged-in user who is active and not disabled using get_current_active_user
async def get_current_active_user(current_user: UserAuth = Depends(get_current_user)):
    # Get the currently logged-in user current_user through get_current_user
    # Then check if the user is disabled
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="User is disabled")
    return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # Call the function to verify the user, passing in the username and password
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    # If the user does not exist
    if not user:
        raise HTTPException(
            status_code=400, 
            detail="Invalid username or password",
            headers={"WWW-Authenticate": "Bearer"},
            )
    # If the user exists, generate the token
    # Assign the previously defined token expiration time to access_token_expires and pass it to expires_delta
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # Call the function to generate the token, passing in user information and expiration time
    access_token = create_access_token(
        userData={"username": user.username}, expires_delta=access_token_expires)
    
    # Wrap the generated token into the Token model class
    token = Token(access_token=access_token, token_type="bearer")

    # Return the token
    return token
    

# Get the currently logged-in user who is active and not disabled using get_current_active_user
@app.get("/users/me")
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user

# Test use, get the hash value of the password
@app.get("/password/")
async def read_password(password: str):
    return get_password_hash(password)

### Process Summary#

  1. Login

First, call the /token method to log in the user, which obtains form data through OAuth2PasswordRequestForm.

The form data contains the username and password.

At this point, the authenticate method is called, passing in the simulated user database, username, and password.

In the authenticate method, the get_user method is used to compare the user with the same username in the simulated database.

If the user is found, the verify_password method is called, passing in the hashed password stored in the found user database and the login password.

In the verify_password method:

The verify method of the CryptContext class instance provided by passlib is called for verification, which will return the verification result. If true, the user obtained from the get_user function will be returned.

Next, set the token expiration time and call the create_access_token function, passing in user information.

In create_access_token, user information and expiration time are integrated into a dict and the jwt.encode method is called to generate the token.

Then, the generated token is returned.

  1. Obtain the currently logged-in user

Call the /users/me interface, and the Authorization value needs to be added to the request header, with the value being 'Bearer ' + the token returned upon login.

The interface function calls the dependency get_current_active_user to obtain the currently logged-in user who is not disabled.

get_current_active_user will obtain the current user through the get_current_user function and further check if they are disabled.

get_current_user will use the dependency oauth2_scheme to obtain the token from the request header.

And use jwt.decode to decode the token to obtain the user information stored during login, such as the username.

Using the username, the user data is obtained from the database and returned.

Note! The value of the Authorization response header in the request starts with Bearer.

### Interface Testing#

Form call to /token


image

Get request to obtain the currently logged-in user


image

# Connecting to MongoDB#

## Importing pymongo#

First, you need to install the library for Python to operate on MongoDB:

pip install pymongo

Then, import it in the file:

# Import MongoDB database
from pymongo import MongoClient

## Configuring MongoDB Connection#

# Create MongoDB database connection
client = MongoClient('mongodb://fastapi:fastapi@localhost:27017/fastapi')

# Create the database; all database operations are done using the db object
# For example, db.users.insert_one() indicates inserting a piece of data into the users collection
db = client['fastapi']

## Register User Using MongoDB#

Add a new user registration interface.

The code is as follows:

@app.post("/users/register")
async def register_user(user: UserInDB):
    userdict = user.dict()
    userdb = db.users.find_one({"username": user.username})
    if userdb:
        raise HTTPException(
            status_code=400,
            detail="User already exists",
        )
    hashdPassword = get_password_hash(user.password)
    userdict.update({"hashed_password": hashdPassword})

    # Convert user information to JSON format
    json_compatible_item_data = jsonable_encoder(userdict)
    # Store user information in the database
    db.users.insert_one(json_compatible_item_data)
    # Return user information
    return user

## Query User During Login#

Modify the previous get_user function:

# Get user information from the db database by username
def get_user(username: str):
    userdb = db.users.find_one({"username": username})
    if userdb:
        return UserDB(**userdb)

## Modify User Information#

Create a user update model, as the password can be empty since it may not be updated:

# Create a User Update model class
# The password can be empty since it may not be updated
class UserUpdate(UserAuth):
    password: Union[str, None] = None

And a UserOut model to return without the password:

# User model class for returning
class UserOut(UserAuth):
    pass

The model for obtaining the currently logged-in user also needs to be modified:

# Get the currently logged-in user who is active and not disabled using get_current_active_user
@app.get("/users/me", response_model=UserOut)
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user
@app.put("/users/update", response_model=UserOut)
async def update_user(userInDB: UserUpdate, current_user: UserAuth = Depends(get_current_active_user)):
    # Convert the currently logged-in user's information to dict
    current_user_dict = current_user.dict()

    # Check if the username to be updated matches the currently logged-in username
    if userInDB.username != current_user_dict.get("username"):
        raise HTTPException(
            status_code=400,
            detail="Unauthorized modification",
        )
    
    # Convert the user information to be updated to dict
    userdict = userInDB.dict()

    # If the password is not empty, update the password
    if userInDB.password:
        hashdPassword = get_password_hash(userInDB.password)
        userdict.update({"hashed_password": hashdPassword})
    else:
        # If the password is empty, use the currently logged-in user's password
        userdict.update({"hashed_password": current_user_dict.get("hashed_password")})

    # Remove the password from user information; plaintext passwords cannot be stored in the database
    userdict.pop("password")

    # Convert user information to JSON format
    json_compatible_item_data = jsonable_encoder(userdict)
    # Store user information in the database
    db.users.update_one({"username": userInDB.username}, {"$set": json_compatible_item_data})
    # Return user information
    return {"username": userInDB.username, "email": userInDB.email, "full_name": userInDB.full_name}

## Viewing MongoDB Data#

Use MongoDB Compass to view:


image

# Further Organization#

Use the router method to implement the interface.

The directory structure will be:

├── MyApi
│	├── app							# "app" is a Python package
│   	├── __init__.py				# This file makes "app" a Python package
│   	├── main.py					# "main" module, e.g., import app.main
│   	├── dependencies.py		    # "dependencies" module, e.g., import app.dependencies
│   	└── routers					# "routers" is a "Python subpackage"
│   	    ├── __init__.py			# Makes "routers" a "Python subpackage"
│   	    └── users.py			# "users" submodule, e.g., import app.routers.users

Note that an empty __init__.py file should be placed in each folder to indicate to Python that this is a directory.

Each directory or subdirectory has one.

This is why code can be imported from one file to another.

For example, in app/main.py, you can directly use:

# Absolute path
from app.routers import items
# Relative path
from .routers import items

## Router Configuration#

First, create a routers folder in the app folder to store the routes.

In the folder, create a new users.py file.

First, import:

from fastapi import APIRouter

Configure the router:

# Configure the router
router = APIRouter(
    # Define the prefix for accessing the route; there should be no slash after the prefix
    # By setting the prefix here, you don't need to write the prefix in every path operation
    prefix="/users",
    # To display custom tags in the documentation, we can use the tags parameter
    tags=["users"],
    # To display custom responses in the documentation, we can use the responses parameter to define return values
    responses={404: {"description": "Not found"}},
)

Then, you can move all user-related routes here and change app to router:

@router.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # Call the function to verify the user, passing in the username and password
    user = authenticate_user(form_data.username, form_data.password)
    # If the user does not exist
    if not user:
        raise HTTPException(
            status_code=400, 
            detail="Invalid username or password",
            headers={"WWW-Authenticate": "Bearer"},
            )
    # If the user exists, generate the token
    # Assign the previously defined token expiration time to access_token_expires and pass it to expires_delta
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # Call the function to generate the token, passing in user information and expiration time
    access_token = create_access_token(
        userData={"username": user.username}, expires_delta=access_token_expires)
    
    # Wrap the generated token into the Token model class
    token = Token(access_token=access_token, token_type="bearer")

    # Return the token
    return token
    

# Get the currently logged-in user who is active and not disabled using get_current_active_user
@router.get("/me", response_model=UserOut)
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user

# Test use, get the hash value of the password
@router.get("/password/")
async def read_password(password: str):
    return get_password_hash(password)


@router.post("/register", response_model=UserOut)
async def register_user(user: UserInDB):
    userdict = user.dict()
    userdb = db.users.find_one({"username": user.username})
    if userdb:
        raise HTTPException(
            status_code=400,
            detail="User already exists",
        )
    hashdPassword = get_password_hash(user.password)
    userdict.update({"hashed_password": hashdPassword})

    # Remove the password from user information; plaintext passwords cannot be stored in the database
    userdict.pop("password")

    # Convert user information to JSON format
    json_compatible_item_data = jsonable_encoder(userdict)
    # Store user information in the database
    db.users.insert_one(json_compatible_item_data)
    # Return user information
    return user

@router.put("/update", response_model=UserOut)
async def update_user(userInDB: UserUpdate, current_user: UserAuth = Depends(get_current_active_user)):
    # Convert the currently logged-in user's information to dict
    current_user_dict = current_user.dict()

    # Check if the username to be updated matches the currently logged-in username
    if userInDB.username != current_user_dict.get("username"):
        raise HTTPException(
            status_code=400,
            detail="Unauthorized modification",
        )
    
    # Convert the user information to be updated to dict
    userdict = userInDB.dict()

    # If the password is not empty, update the password
    if userInDB.password:
        hashdPassword = get_password_hash(userInDB.password)
        userdict.update({"hashed_password": hashdPassword})
    else:
        # If the password is empty, use the currently logged-in user's password
        userdict.update({"hashed_password": current_user_dict.get("hashed_password")})

    # Remove the password from user information; plaintext passwords cannot be stored in the database
    userdict.pop("password")

    # Convert user information to JSON format
    json_compatible_item_data = jsonable_encoder(userdict)
    # Store user information in the database
    db.users.update_one({"username": userInDB.username}, {"$set": json_compatible_item_data})
    # Return user information
    return {"username": userInDB.username, "email": userInDB.email, "full_name": userInDB.full_name}

At this point, many errors will occur because the previously imported libraries no longer exist.

## Dependencies#

When there are many places that require the same dependencies, you can store them all in dependencies for unified management.

In the /app directory, create a new dependencies.py file.

You can place all the imports and custom variables from users in dependencies.py.

For example:

# Path: dependencies.py

# This file is used to handle dependencies,
# For example, if some dependencies are used in several places in the application,
# You can place these dependencies in this file
from fastapi import Depends, HTTPException, status
# Import OAuth2PasswordBearer class to declare oauth2_scheme variable for authentication
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# Import Union
from typing import Union
# Import BaseModel for parsing request bodies
from pydantic import BaseModel, HttpUrl, EmailStr
# Import time module
from datetime import datetime, timedelta
# Import JSONCompatibleEncoder to convert models to JSON
from fastapi.encoders import jsonable_encoder
# Import passlib for password hashing
from passlib.context import CryptContext
# Import JWT for generating tokens
from jose import JWTError, jwt
# Import MongoDB database
from pymongo import MongoClient


# Create MongoDB database connection
client = MongoClient('mongodb://fastapi:[email protected]:27017/fastapi')

# Create the database; all database operations are done using the db object
# For example, db.users.insert_one() indicates inserting a piece of data into the users collection
db = client['fastapi']


# Create a password hashing context object; CryptContext is a tool for handling password hashing and verification
# It can accept one or more password hashing schemes as parameters
# Here, the bcrypt algorithm is used
# The deprecated parameter is set to auto, meaning that when a more secure algorithm becomes available, it will automatically use the more secure algorithm
# It has two methods, verify() and hash(), used for verifying passwords and obtaining hashed passwords, respectively
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Obtain the token through the username and password in /token, returning the token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWT token signing algorithm variable; here, HS256 algorithm is used, along with other algorithms like RS256, ES256, etc.
ALGORITHM = "HS256"
# Use OpenSSL to randomly generate a SECRET_KEY
SECRET_KEY = "905214810165f5b010b85f3fa7985724aa2bfba9b132a333e82b7fc6b524170e"
# Token expiration time in minutes
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Define a model class to return the token
class Token(BaseModel):
    access_token: str
    token_type: str

# Define a token data model class
class TokenData(BaseModel):
    username: Union[str, None] = None


# Create a User model class
class UserAuth(BaseModel):
    username: str
    # Initially use Union[str, None], later will change to EmailStr
    email: Union[EmailStr, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = False

# Create a User database model class
class UserDB(UserAuth):
    hashed_password: str

class UserInDB(UserAuth):
    password: str

# Create a User Update model class
# The password can be empty since it may not be updated
class UserUpdate(UserAuth):
    password: Union[str, None] = None

# User model class for returning
class UserOut(UserAuth):
    pass


# Function to generate access tokens
# userData represents the data to be stored in the token, such as username, user ID, etc.
# expires_delta refers to the expiration time, which is a timedelta type variable
# If not provided, the default expiration time is used, which is hardcoded to 15 minutes
def create_access_token(userData: dict, expires_delta: Union[timedelta, None] = None):
    # to_encode is the data to be stored in the token, making a copy to prevent modifying the original data
    to_encode = userData.copy()

    # expires_delta is a timedelta type variable indicating the expiration time
    # Use datetime.utcnow() to get the current time + expiration time to obtain the token expiration time
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # If expires_delta is empty, use the default expiration time
        expire = datetime.utcnow() + timedelta(minutes=15)

    # Store the expiration time in the token
    to_encode.update({"exp": expire})
    # Use jwt.encode() method to generate the token; parameters are user information, random key, and encryption algorithm
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


# Verify password
def verify_password(plain_password, hashed_password):
    # Verify the password using the bcrypt algorithm
    return pwd_context.verify(plain_password, hashed_password)

# Get the hashed password
def get_password_hash(password):
    # Hash the password using the bcrypt algorithm
    return pwd_context.hash(password)


# Authenticate user information using username and password
def authenticate_user(username: str, password: str):
    user = get_user(username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


# Get user information from the db database by username
def get_user(username: str):
    userdb = db.users.find_one({"username": username})
    if userdb:
        return UserDB(**userdb)


# This method will use the dependency oauth2_scheme to obtain the token and verify the token's validity
# If the token is valid, call the get_user function to obtain the current user's information and return it
# If invalid, raise an exception
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid authentication credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # Use the token and the defined OpenSSL-generated random key SECRET_KEY and encryption method 
        # Call jwt to decode the token and obtain user information; payload is of type dict
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # Get the username from the token, which is the username passed when generating the token
        username = payload.get("username")
        # If the username does not exist
        if username is None:
            # Raise the previously defined exception
            raise credentials_exception
    except JWTError:
        # If decoding fails, raise the exception
        raise credentials_exception
    # Pass the username to the get_user function to obtain user information
    user = get_user(username=username)
    if user is None:
        # If the user does not exist, raise the exception
        raise credentials_exception
    return user



# Get the currently logged-in user who is active and not disabled using get_current_active_user
async def get_current_active_user(current_user: UserAuth = Depends(get_current_user)):
    # Get the currently logged-in user current_user through get_current_user
    # Then check if the user is disabled
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="User is disabled")
    return current_user

Then, in /routers/users.py, import dependencies:

from fastapi import APIRouter, Depends

# Import dependencies from dependencies.py
from dependencies import *

## main.py#

At this point, create a new main.py file in the /app directory:

from fastapi import FastAPI
# Import CORS middleware for cross-origin requests
from fastapi.middleware.cors import CORSMiddleware
# Import users from routers
from .routers import users

app = FastAPI()
  

# CORS settings
app.add_middleware(
    CORSMiddleware,
    # Allow all domain names to access
    allow_origins=["*"],
    # Allow all requests to carry cookies
    allow_credentials=True,
    # Allow all request methods
    allow_methods=["*"],
    # Allow all request headers
    allow_headers=["*"],
)

# Include the users routes from the router
app.include_router(users.router)

## Starting the Project#

Navigate to the MyApi directory and execute:

uvicorn app.main:app --reload

This will start the program.

At this point, you can also see that the Tags are effective in the OpenAPI documentation.


image

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.