banner
小鱼

小鱼's Blog

FastAPI 与 mongoDB 实现用户注册登陆

# OAuth2 实现密码哈希与 Bearer JWT 令牌验证#

## JWT 简介#

JWT 即JSON 网络令牌(JSON Web Tokens)。

JWT 是一种将 JSON 对象编码为没有空格,且难以理解的长字符串的标准。JWT 的内容如下所示:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c


image

JWT 字符串没有加密,任何人都能用它恢复原始信息。

但 JWT 使用了签名机制。接受令牌时,可以用签名校验令牌。

使用 JWT 创建有效期为一周的令牌。第二天,用户持令牌再次访问时,仍为登录状态。

令牌于一周后过期,届时,用户身份验证就会失败。只有再次登录,才能获得新的令牌。

如果用户(或第三方)篡改令牌的过期时间,因为签名不匹配会导致身份验证失败。

如需深入了解 JWT 令牌,了解它的工作方式,请参阅 JWT 官网

## 安装 python-jose#

pip install python-jose[cryptography]

Python-jose 需要安装配套的加密后端。推荐的后端是:pyca/cryptography

Python-jose 支持 PyJWT 的所有功能,还支持与其它工具集成时可能会用到的一些其它功能。

## 密码哈希化#

哈希 是指把特定内容转换为乱码形式的字节序列(其实就是字符串)。

每次传入完全相同的内容时(比如,完全相同的密码),返回的都是完全相同的乱码。

但这个乱码无法转换回传入的密码。

### 安装 passlib#

Passlib 是处理密码哈希的 Python 包。

它支持很多安全哈希算法及配套工具。

推荐的算法是 Bcrypt

先安装附带 Bcrypt 的 PassLib

pip install passlib[bcrypt]

passlib 甚至可以读取 Django、Flask 的安全插件等工具创建的密码。

例如,把 Django 应用的数据共享给 FastAPI 应用的数据库。或利用同一个数据库,可以逐步把应用从 Django 迁移到 FastAPI。用户可以同时从 Django 应用或 FastAPI 应用登录。

### 密码哈希与校验#

首先引入 passlib

# 引入 passlib 用于密码哈希化
from passlib.context import CryptContext

创建一个哈希密码上下文对象

# 创建一个密码哈希化的上下文对象,CryptContext 是用于处理密码哈希和验证的工具
# 它能接受一个或者多个密码哈希方案作为参数
# 这里使用 bcrypt 算法
# 并且 deprecated 参数设置为 auto,表示当有更安全的算法出现时,会自动使用更安全的算法
# 他有 verify() 和 hash() 两个方法,分别用于校验密码和获取 hash 后的密码
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

接着创建三个工具函数

1、将用户的密码哈希化

# 获取 hash 后的密码
def get_password_hash(password):
    # 使用 bcrypt 算法对密码进行哈希
    return pwd_context.hash(password)

2、校验接收的密码和存储的哈希密码是否一致

# 校验密码
def verify_password(plain_password, hashed_password):
    # 使用 bcrypt 算法校验密码
    return pwd_context.verify(plain_password, hashed_password)

3、用于身份验证,使用前面两个函数,来校验,并返回用户

# 通过用户名密码获取用户信息进行校验
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

## 处理 JWT 令牌#

导入已安装的模块。

创建用于 JWT 令牌签名的随机密钥。

使用以下命令,生成安全的随机密钥:

openssl rand -hex 32

如果没有 openssl 可以到 [官网](Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions (slproweb.com)) 下载

启动 window OpenSSL Command Prompt


image

复制生成的随机密钥 例如

0492fa85994a4cc7fa7a0331f4e22dbfdf3555e516fbf371f31679418963d3f9

然后,把生成的密钥复制到变量SECRET_KEY,注意,不要使用本例所示的密钥。

创建指定 JWT 令牌签名算法的变量 ALGORITHM,本例中的值为 "HS256"

创建设置令牌过期时间的变量。

定义令牌端点响应的 Pydantic 模型。

创建生成新的访问令牌的工具函数。

首先引入 jwt

from jose import JWTError,jwt

接着声明
1、随机密钥 SECRET_KEY 也就是上面 openSSL 生成的
2、JWT 令牌签名算法的变量 ALGORITHM ,本例中使用的是 'HS256'
3、设置令牌过期时间的变量 ACCESS_TOKEN_EXPIRE_MINUTES 分钟数

SECRET_KEY = "0492fa85994a4cc7fa7a0331f4e22dbfdf3555e516fbf371f31679418963d3f9" 
ALGORITHM = "HS256" 
ACCESS_TOKEN_EXPIRE_MINUTES = 30

定义一个返回 Token 的模型类

class Token(BaseModel):
    access_token: str
    token_type: str

引入 timedelta 类型

from datetime import datetime, timedelta

创建一个新的 生成访问令牌 的函数

# 生成访问令牌的函数
# data 表示的是要存进 token 中的数据,比如用户名、用户 ID 等
# expires_delta 指的是过期时间,它是一个 timedelta 类型的变量
# 如果没有传的话,就使用默认的过期时间,这里写死为 15 分钟
def create_access_token(data: dict, expires_delta: Union[timedelta,None] = None):
    # to_encode 是要存进 token 中的数据,将其复制一份,防止修改原数据
    to_encode = data.copy()

    # expires_delta 是 timedelta 类型的变量,指的是过期的时间
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # 如果 expires_delta 为空,就使用默认的过期时间
        expire = datetime.utcnow() + timedelta(minutes=15)

    # 将过期时间存进 token 中
    to_encode.update({"exp": expire})
    # 使用 jwt.encode() 方法生成 token 入参是 用户信息 , 随机的密钥 , 加密算法
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

修改 登陆获取 token 的接口

@app.post("/token", response_model=Token):
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # 调用校验用户的函数,传入用户名和密码
    user = authenticate_user(fake_users_db,form_data.username,form_data.password)
    # 如果用户不存在
    if not user:
        raise HTTPException(
            status_code=400, 
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
            )
    # 如果用户存在,就生成 token
    # 将之前定义好的 token 过期时间 赋值给 access_token_expires 传到 expires_delta 中
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 调用生成 token 的函数,传入用户信息和过期时间
    access_token = create_access_token(
        userData = {"username": user.username}, expires_delta=access_token_expires)
    
    # 将生成 token 封装成 Token 模型类
    token = Token(access_token=access_token, token_type="bearer")

    # 将 token 返回
    return token

## 更新依赖项#

更新 get_current_user 以接收与之前相同的令牌,但这里用的是 JWT 令牌。

解码并校验接收到的令牌,然后,返回当前用户。

如果令牌无效,则直接返回 HTTP 错误。

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无效的认证凭证",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 使用 token 和定义好的 OpenSSL生成的随机密钥 SECRET_KEY 以及加密方式 
        # 调用 jwt 来解码 token,获取用户的信息,payload 的类型是 dict
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # 从 token 中获取用户名 ,也就是之前生成 token 的时候传入的 username
        username = payload.get("username")
        # 如果用户名不存在
        if username is None:
            # 抛出前面定义的异常
            raise credentials_exception
    except JWTError:
        # 如果解码失败,抛出异常
        raise credentials_exception
    # 将用户名传给 get_user 函数,获取用户信息
    user = get_user(fake_users_db, username=username)
    if user is None:
        # 如果用户不存在,抛出异常
        raise credentials_exception
    return user

代码中的

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无效的认证凭证",
        headers={"WWW-Authenticate": "Bearer"},
    )

用于表示验证凭据失败的异常情况。也就是在这段代码中,如果在解码 jwt 令牌时发生任何异常
(比如令牌无效、过期、被篡改等)都会引发这个异常

它返回了 status_code 表示未经授权的状态 401 、 detail 错误的信息 、 headers 令牌的类型

## 获得当前登陆用户的信息#

# 通过 get_current_active_user 获取当前登陆并且没有被禁用的用户 current_user
@app.get("/users/me")
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user

## 完整代码#

# 创建一个密码哈希化的上下文对象,CryptContext 是用于处理密码哈希和验证的工具
# 它能接受一个或者多个密码哈希方案作为参数
# 这里使用 bcrypt 算法
# 并且 deprecated 参数设置为 auto,表示当有更安全的算法出现时,会自动使用更安全的算法
# 他有 verify() 和 hash() 两个方法,分别用于校验密码和获取 hash 后的密码
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 通过 /token 中的 username 和 password 获取 token, 返回 token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWT 令牌签名算法的变量,这里使用 HS256 算法,还有其他算法,比如 RS256、ES256 等
ALGORITHM = "HS256"
# 使用 openSSL 随机生成一个 SECRET_KEY
SECRET_KEY = "905214810165f5b010b85f3fa7985724aa2bfba9b132a333e82b7fc6b524170e"
# token的过期时间,单位为分钟
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 定义一个返回 token 模型类
class Token(BaseModel):
    access_token: str
    token_type: str

# 定义一个 token 数据模型类
class TokenData(BaseModel):
    username: Union[str, None] = None

# 生成访问令牌的函数
# userData 表示的是要存进 token 中的数据,比如用户名、用户 ID 等
# expires_delta 指的是过期时间,它是一个 timedelta 类型的变量
# 如果没有传的话,就使用默认的过期时间,这里写死为 15 分钟
def create_access_token(userData: dict, expires_delta: Union[timedelta,None] = None):
    # to_encode 是要存进 token 中的数据,将其复制一份,防止修改原数据
    to_encode = userData.copy()

    # expires_delta 是 timedelta 类型的变量,指的是过期的时间
    # 使用 datetime.utcnow() 获取当前时间 + 过期时间 得到 token 到期的时间
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # 如果 expires_delta 为空,就使用默认的过期时间
        expire = datetime.utcnow() + timedelta(minutes=15)

    # 将到期时间存进 token 中
    to_encode.update({"exp": expire})
    # 使用 jwt.encode() 方法生成 token 入参是 用户信息 , 随机的密钥 , 加密算法
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


# 模拟数据库
fake_users_db = {
    "bryce": {
        "username": "bryce",
        "full_name": "bryce Yu",
        "email": "[email protected]",
        # 123123
        "hashed_password": "$2b$12$w2IXuK5lx6mMxNVBBoxRl.jHYWRblX1QjVg6UCtBZ7BzBMOH7RvkW",
        "disabled": False,
    },
    "yu": {
        "username": "yu",
        "full_name": "yu",
        "email": "[email protected]",
        # 123456
        "hashed_password": "$2b$12$BknHh482Zqkt9ODqSSbSiepUBdL1GGoeaxKCnGNXmfUbjbe7LKcxy",
        "disabled": True,
    },
}

# 创建一个 用户 模型类
class UserAuth(BaseModel):
    username: str
    # 先使用 Union[str, None],后面会改成 EmailStr
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None

#  创建一个 用户 数据库 模型类
class UserDB(UserAuth):
    hashed_password: str

# 校验密码
def verify_password(plain_password, hashed_password):
    # 使用 bcrypt 算法校验密码
    return pwd_context.verify(plain_password, hashed_password)

# 获取 hash 后的密码
def get_password_hash(password):
    # 使用 bcrypt 算法对密码进行哈希
    return pwd_context.hash(password)

# 通过用户名密码获取用户信息进行校验
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

# 从 db 数据库中,获取用户名为 username 的用户信息
def get_user(db, username: str):
    # 现在不安全的使用 用户名 作为 token
    if username in db:
        user_dict = db[username]
        return UserDB(**user_dict)

# 这个方法会就是通过依赖 oauth2_scheme 来获取 token,并校验 token 的有效性
# 如果 token 有效,调用 get_user 函数,获取当前的用户信息,返回用户信息
# 如果无效,就抛出异常
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无效的认证凭证",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 使用 token 和定义好的 OpenSSL生成的随机密钥 SECRET_KEY 以及加密方式 
        # 调用 jwt 来解码 token,获取用户的信息,payload 的类型是 dict
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # 从 token 中获取用户名 ,也就是之前生成 token 的时候传入的 username
        username = payload.get("username")
        # 如果用户名不存在
        if username is None:
            # 抛出前面定义的异常
            raise credentials_exception
    except JWTError:
        # 如果解码失败,抛出异常
        raise credentials_exception
    # 将用户名传给 get_user 函数,获取用户信息
    user = get_user(fake_users_db, username=username)
    if user is None:
        # 如果用户不存在,抛出异常
        raise credentials_exception
    return user


# 获取没有被禁用的用户,这个函数会 调用依赖 get_current_user 函数,获得当前登陆的用户
# 接着判断这个用户是否被禁用,如果被禁用,就抛出异常,如果没有被禁用,就返回用户信息
async def get_current_active_user(current_user: UserAuth = Depends(get_current_user)):
    # 通过 get_current_user 获取当前登陆的用户 current_user
    # 接着判断用户是否被禁用
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="用户被禁用")
    return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # 调用校验用户的函数,传入用户名和密码
    user = authenticate_user(fake_users_db,form_data.username,form_data.password)
    # 如果用户不存在
    if not user:
        raise HTTPException(
            status_code=400, 
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
            )
    # 如果用户存在,就生成 token
    # 将之前定义好的 token 过期时间 赋值给 access_token_expires 传到 expires_delta 中
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 调用生成 token 的函数,传入用户信息和过期时间
    access_token = create_access_token(
        userData = {"username": user.username}, expires_delta=access_token_expires)
    
    # 将生成 token 封装成 Token 模型类
    token = Token(access_token=access_token, token_type="bearer")

    # 将 token 返回
    return token
    

# 通过 get_current_active_user 获取当前登陆并且没有被禁用的用户 current_user
@app.get("/users/me")
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user

# 测试使用,获取密码的哈希值
@app.get("/password/")
async def read_password(password: str):
    return get_password_hash(password)

### 流程总结#

1、登陆

首先调用 /token 方法进行用户登陆,方法通过 OAuth2PasswordRequestForm 获得表单数据

表单数据中存有 username 和 password,

此时调用 authenticate 方法,传入 模拟的用户数据库,以及用户名和密码

在 authenticate 方法中,使用 get_user 方法比较 模拟数据库中用户名相同的用户

如果找到了用户,调用 verify_password 方法,传入找到的用户数据库存放的哈希密码和登陆密码

verify_password 方法中

调用 passlib 提供的上下文加密 CryptContext 类实例对象

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

的 verify 方法进行校验,会返回校验结果,如果为 true,就会把 get_user 函数获取到的 user 返回

接着设置好 token 过期时间,通过 create_access_token 函数,传入用户信息

create_access_token 中会将用户信息和过期时间内整合成一个 dict 调用 jwt.encode 生成 token

接着把生成的 token 返回

2、获取当前的登陆用户

调用 /users/me 接口,消息头中需要添加 Authorization 值,值为 'Bearer ' + 登陆返回的 token

接口函数调用依赖 get_current_active_user 来获取没有被禁用的当前的用户

get_current_active_user 会通过 get_current_user 函数获取当前用户,进一步判断有没有被禁用

get_current_user 会通过依赖 oauth2_scheme 获取消息头中的 token

并使用 jwt.decode 解码 token 从而获得登录时存进去的用户信息,比如用户名

使用用户名,从数据库中得到 用户的数据,并返回

注意!请求中 Authorization 响应头的值以 Bearer 开头。

### 接口测试#

表单调用 /token


image

Get 请求获取当前登陆用户


image

# 连接 mongoDB#

## 引入 pymongo#

首先需要安装 python 对 mongoDB 操作的库

pip install pymongo

接着在 文件中 引入

# 引入 mongodb 数据库
from pymongo import MongoClient

## 配置 mongoDB 连接#

# 创建 mongodb 数据库连接
client = MongoClient('mongodb://fastapi:fastapi@localhost:27017/fastapi')


# 创建数据库, 数据库的操作都使用 db 对象来完成
# 比如 db.users.insert_one() 表示在 users 集合中插入一条数据
db = client['fastapi']

## 注册用户使用 mongoDB 新增#

新增一个用户注册的接口

代码如下

@app.post("/users/register")
async def register_user(user: UserInDB):
    userdict = user.dict()
    userdb = db.users.find_one({"username": user.username})
    if userdb:
        raise HTTPException(
            status_code=400,
            detail="用户已存在",
        )
    hashdPassword = get_password_hash(user.password)
    userdict.update({"hashed_password": hashdPassword})

    # 将用户信息转换为 json 格式
    json_compatible_item_data = jsonable_encoder(userdict)
    # 将用户信息存进数据库
    db.users.insert_one(json_compatible_item_data)
    # 返回用户信息
    return user

## 登陆时使用查询用户#

修改之前的 get_user 函数

# 从 db 数据库中,获取用户名为 username 的用户信息
def get_user( username: str):
    userdb = db.users.find_one({"username": username})
    if userdb:
        return UserDB(**userdb)

## 修改用户信息#

创建用户更新模型,因为可以不更新密码,所以密码可以为空

# 创建一个 用户 更新 模型类
# 因为可以不更新密码,所以密码可以为空
class UserUpdate(UserAuth):
    password: Union[str, None] = None

以及一个 UserOut 模型,用来返回,不返回密码

# 用来返回的用户模型类
class UserOut(UserAuth):
    pass

同时获取当前登陆用户的模型也要修改

# 通过 get_current_active_user 获取当前登陆并且没有被禁用的用户 current_user
@app.get("/users/me",response_model=UserOut)
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user
@app.put("/users/update",response_model=UserOut)
async def update_user(userInDB: UserUpdate,current_user: UserAuth = Depends(get_current_active_user)):
    # 将当前登陆用户的信息转换为 dict
    current_user_dict = current_user.dict()

    # 判断要更新的用户名和当前登陆的用户名是否一致
    if userInDB.username != current_user_dict.get("username"):
        raise HTTPException(
            status_code=400,
            detail="无权修改",
        )
    
    # 将要更新的用户信息转换为 dict
    userdict = userInDB.dict()

    # 如果密码不为空,就更新密码
    if userInDB.password:
        hashdPassword = get_password_hash(userInDB.password)
        userdict.update({"hashed_password": hashdPassword})
    else:
        # 如果密码为空,就使用当前登陆用户的密码
        userdict.update({"hashed_password": current_user_dict.get("hashed_password")})

    # 将用户信息中的密码删除,不能将明文密码存进数据库
    userdict.pop("password")

    # 将用户信息转换为 json 格式
    json_compatible_item_data = jsonable_encoder(userdict)
    # 将用户信息存进数据库
    db.users.update_one({"username": userInDB.username}, {"$set": json_compatible_item_data})
    # 返回用户信息
    return {"username": userInDB.username,"email": userInDB.email,"full_name": userInDB.full_name}

## 查看 mongo 库数据#

使用 mongo compass 查看


image

# 进一步整理#

使用 router 的方式来实现接口

文件的目录将会是

├── MyApi
│	├── app							# 「app」是一个 Python 包
│   	├── __init__.py				# 这个文件使「app」成为一个 Python 包
│   	├── main.py					# 「main」模块,例如 import app.main
│   	├── dependencies.py		    # 「dependencies」模块,例如 import app.dependencies
│   	└── routers					# 「routers」是一个「Python 子包」
│   	    ├── __init__.py			# 使「routers」成为一个「Python 子包」
│   	    └── users.py			# 「users」子模块,例如 import app.routers.users

注意文件夹中要放入一个 __init__.py 空文件,用来告诉 python 这是文件目录

每个目录或子目录中都有一个。

这就是能将代码从一个文件导入到另一个文件的原因。

例如 在 app/main.py 中 可以直接使用

# 绝对路径
from app.routers import items
# 相对路径
from .routers import items

## routers 路由#

首先在 app 文件夹中

创建一个 routers 文件夹,用来存放路由

在文件夹中新建一个 users.py

文件中首先引入

from fastapi import APIRouter

对此路由进行配置

# 对路由进行配置
router = APIRouter(
    # 定义访问路由的前缀,前缀后面不能有斜杠
    # 在这里设置了前缀,就不用在每个路径操作中都写上前缀了
    prefix="/users",
    # 为了在文档中显示自定义的标签,我们可以使用 tags 参数
    tags=["users"],
    # 为了在文档中显示自定义的响应,我们可以使用 responses 参数定义返回值
    responses={404: {"description": "Not found"}},
)

接着就可以将直接 user 相关的路由全部移动到这里,将 app 改为 router

@router.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # 调用校验用户的函数,传入用户名和密码
    user = authenticate_user(form_data.username,form_data.password)
    # 如果用户不存在
    if not user:
        raise HTTPException(
            status_code=400, 
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
            )
    # 如果用户存在,就生成 token
    # 将之前定义好的 token 过期时间 赋值给 access_token_expires 传到 expires_delta 中
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 调用生成 token 的函数,传入用户信息和过期时间
    access_token = create_access_token(
        userData = {"username": user.username}, expires_delta=access_token_expires)
    
    # 将生成 token 封装成 Token 模型类
    token = Token(access_token=access_token, token_type="bearer")

    # 将 token 返回
    return token
    

# 通过 get_current_active_user 获取当前登陆并且没有被禁用的用户 current_user
@router.get("/me",response_model=UserOut)
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
    return current_user

# 测试使用,获取密码的哈希值
@router.get("/password/")
async def read_password(password: str):
    return get_password_hash(password)


@router.post("/register",response_model=UserOut)
async def register_user(user: UserInDB):
    userdict = user.dict()
    userdb = db.users.find_one({"username": user.username})
    if userdb:
        raise HTTPException(
            status_code=400,
            detail="用户已存在",
        )
    hashdPassword = get_password_hash(user.password)
    userdict.update({"hashed_password": hashdPassword})

    # 将用户信息中的密码删除,不能将明文密码存进数据库
    userdict.pop("password")

    # 将用户信息转换为 json 格式
    json_compatible_item_data = jsonable_encoder(userdict)
    # 将用户信息存进数据库
    db.users.insert_one(json_compatible_item_data)
    # 返回用户信息
    return user

@router.put("/update",response_model=UserOut)
async def update_user(userInDB: UserUpdate,current_user: UserAuth = Depends(get_current_active_user)):
    # 将当前登陆用户的信息转换为 dict
    current_user_dict = current_user.dict()

    # 判断要更新的用户名和当前登陆的用户名是否一致
    if userInDB.username != current_user_dict.get("username"):
        raise HTTPException(
            status_code=400,
            detail="无权修改",
        )
    
    # 将要更新的用户信息转换为 dict
    userdict = userInDB.dict()

    # 如果密码不为空,就更新密码
    if userInDB.password:
        hashdPassword = get_password_hash(userInDB.password)
        userdict.update({"hashed_password": hashdPassword})
    else:
        # 如果密码为空,就使用当前登陆用户的密码
        userdict.update({"hashed_password": current_user_dict.get("hashed_password")})

    # 将用户信息中的密码删除,不能将明文密码存进数据库
    userdict.pop("password")

    # 将用户信息转换为 json 格式
    json_compatible_item_data = jsonable_encoder(userdict)
    # 将用户信息存进数据库
    db.users.update_one({"username": userInDB.username}, {"$set": json_compatible_item_data})
    # 返回用户信息
    return {"username": userInDB.username,"email": userInDB.email,"full_name": userInDB.full_name}

这时候会出现很多错误,因为之前 import 的库都不存在

## dependencies 依赖项#

当存在有很多地方都要用到的相同的依赖项的时候

可以全部放在 dependencies 中统一存放

在 /app 目录下 ,新增一个 dependencies.py 文件

就可以讲,之前 users 中那些 import 和 一直自定义的变量 全部放到 dependencies.py 中

例如:

# Path: dependencies.py

# 这个文件是用来处理依赖的,
# 比如需要一些在应用程序的好几个地方所使用的依赖项,
# 那么就可以将这些依赖项放在这个文件中
from fastapi import Depends, HTTPException, status
# 引入 OAuth2PasswordBearer 类,用于声明 oauth2_scheme 变量,用于身份验证
from fastapi.security import OAuth2PasswordBearer,OAuth2PasswordRequestForm
# 引入 Union
from typing import Union
# 引入 BaseModel,用来解析请求体
from pydantic import BaseModel,HttpUrl,EmailStr
# 引入时间模块
from datetime import datetime,timedelta
# 引入 JSONCompatibleEncoder 用于将模型转换为 JSON
from fastapi.encoders import jsonable_encoder
# 引入 passlib 用于密码哈希化
from passlib.context import CryptContext
# 引入 JWT 用于生成 token
from jose import JWTError,jwt
# 引入 mongodb 数据库
from pymongo import MongoClient


# 创建 mongodb 数据库连接
client = MongoClient('mongodb://fastapi:[email protected]:27017/fastapi')

# 创建数据库, 数据库的操作都使用 db 对象来完成
# 比如 db.users.insert_one() 表示在 users 集合中插入一条数据
db = client['fastapi']


# 创建一个密码哈希化的上下文对象,CryptContext 是用于处理密码哈希和验证的工具
# 它能接受一个或者多个密码哈希方案作为参数
# 这里使用 bcrypt 算法
# 并且 deprecated 参数设置为 auto,表示当有更安全的算法出现时,会自动使用更安全的算法
# 他有 verify() 和 hash() 两个方法,分别用于校验密码和获取 hash 后的密码
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 通过 /token 中的 username 和 password 获取 token, 返回 token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWT 令牌签名算法的变量,这里使用 HS256 算法,还有其他算法,比如 RS256、ES256 等
ALGORITHM = "HS256"
# 使用 openSSL 随机生成一个 SECRET_KEY
SECRET_KEY = "905214810165f5b010b85f3fa7985724aa2bfba9b132a333e82b7fc6b524170e"
# token的过期时间,单位为分钟
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 定义一个返回 token 模型类
class Token(BaseModel):
    access_token: str
    token_type: str

# 定义一个 token 数据模型类
class TokenData(BaseModel):
    username: Union[str, None] = None


# 创建一个 用户 模型类
class UserAuth(BaseModel):
    username: str
    # 先使用 Union[str, None],后面会改成 EmailStr
    email: Union[EmailStr, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = False

#  创建一个 用户 数据库 模型类
class UserDB(UserAuth):
    hashed_password: str

class UserInDB(UserAuth):
    password: str

# 创建一个 用户 更新 模型类
# 因为可以不更新密码,所以密码可以为空
class UserUpdate(UserAuth):
    password: Union[str, None] = None

# 用来返回的用户模型类
class UserOut(UserAuth):
    pass


# 生成访问令牌的函数
# userData 表示的是要存进 token 中的数据,比如用户名、用户 ID 等
# expires_delta 指的是过期时间,它是一个 timedelta 类型的变量
# 如果没有传的话,就使用默认的过期时间,这里写死为 15 分钟
def create_access_token(userData: dict, expires_delta: Union[timedelta,None] = None):
    # to_encode 是要存进 token 中的数据,将其复制一份,防止修改原数据
    to_encode = userData.copy()

    # expires_delta 是 timedelta 类型的变量,指的是过期的时间
    # 使用 datetime.utcnow() 获取当前时间 + 过期时间 得到 token 到期的时间
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        # 如果 expires_delta 为空,就使用默认的过期时间
        expire = datetime.utcnow() + timedelta(minutes=15)

    # 将到期时间存进 token 中
    to_encode.update({"exp": expire})
    # 使用 jwt.encode() 方法生成 token 入参是 用户信息 , 随机的密钥 , 加密算法
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


# 校验密码
def verify_password(plain_password, hashed_password):
    # 使用 bcrypt 算法校验密码
    return pwd_context.verify(plain_password, hashed_password)

# 获取 hash 后的密码
def get_password_hash(password):
    # 使用 bcrypt 算法对密码进行哈希
    return pwd_context.hash(password)


# 通过用户名密码获取用户信息进行校验
def authenticate_user(username: str, password: str):
    user = get_user(username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


# 从 db 数据库中,获取用户名为 username 的用户信息
def get_user( username: str):
    userdb = db.users.find_one({"username": username})
    if userdb:
        return UserDB(**userdb)


# 这个方法会就是通过依赖 oauth2_scheme 来获取 token,并校验 token 的有效性
# 如果 token 有效,调用 get_user 函数,获取当前的用户信息,返回用户信息
# 如果无效,就抛出异常
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无效的认证凭证",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 使用 token 和定义好的 OpenSSL生成的随机密钥 SECRET_KEY 以及加密方式 
        # 调用 jwt 来解码 token,获取用户的信息,payload 的类型是 dict
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # 从 token 中获取用户名 ,也就是之前生成 token 的时候传入的 username
        username = payload.get("username")
        # 如果用户名不存在
        if username is None:
            # 抛出前面定义的异常
            raise credentials_exception
    except JWTError:
        # 如果解码失败,抛出异常
        raise credentials_exception
    # 将用户名传给 get_user 函数,获取用户信息
    user = get_user(username=username)
    if user is None:
        # 如果用户不存在,抛出异常
        raise credentials_exception
    return user



# 获取没有被禁用的用户,这个函数会 调用依赖 get_current_user 函数,获得当前登陆的用户
# 接着判断这个用户是否被禁用,如果被禁用,就抛出异常,如果没有被禁用,就返回用户信息
async def get_current_active_user(current_user: UserAuth = Depends(get_current_user)):
    # 通过 get_current_user 获取当前登陆的用户 current_user
    # 接着判断用户是否被禁用
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="用户被禁用")
    return current_user

接着在 /routers/users.py 中 引入 dependencies 即可

from fastapi import APIRouter, Depends

# 引入 dependencies.py 中的依赖
from dependencies import *

## main.py#

这时候在 /app 目录中新增一个 main.py 文件

from fastapi import FastAPI
# 引入 CORS 中间件,用于跨域
from fastapi.middleware.cors import CORSMiddleware
# 从 routers 中导入 users
from .routers import users

app = FastAPI()
  

# 跨域设置
app.add_middleware(
    CORSMiddleware,
    # 允许所有的域名访问
    allow_origins=["*"],
    # 允许所有的请求携带 cookie
    allow_credentials=True,
    # 允许所有的请求方法
    allow_methods=["*"],
    # 允许所有的请求头
    allow_headers=["*"],
)

# 引入 router 中的 users 路由
app.include_router(users.router)

## 启动项目#

进入 MyApi 目录中执行

uvicorn app.main:app --reload

即可启动程序

此时 从 openApi 的文档中也能看到

Tags 生效了


image

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。