# OAuth2 实现密码哈希与 Bearer JWT 令牌验证#
## JWT 简介#
JWT 即JSON 网络令牌(JSON Web Tokens)。
JWT 是一种将 JSON 对象编码为没有空格,且难以理解的长字符串的标准。JWT 的内容如下所示:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
JWT 字符串没有加密,任何人都能用它恢复原始信息。
但 JWT 使用了签名机制。接受令牌时,可以用签名校验令牌。
使用 JWT 创建有效期为一周的令牌。第二天,用户持令牌再次访问时,仍为登录状态。
令牌于一周后过期,届时,用户身份验证就会失败。只有再次登录,才能获得新的令牌。
如果用户(或第三方)篡改令牌的过期时间,因为签名不匹配会导致身份验证失败。
如需深入了解 JWT 令牌,了解它的工作方式,请参阅 JWT 官网。
## 安装 python-jose#
pip install python-jose[cryptography]
Python-jose 需要安装配套的加密后端。推荐的后端是:pyca/cryptography。
Python-jose 支持 PyJWT 的所有功能,还支持与其它工具集成时可能会用到的一些其它功能。
## 密码哈希化#
哈希 是指把特定内容转换为乱码形式的字节序列(其实就是字符串)。
每次传入完全相同的内容时(比如,完全相同的密码),返回的都是完全相同的乱码。
但这个乱码无法转换回传入的密码。
### 安装 passlib#
Passlib 是处理密码哈希的 Python 包。
它支持很多安全哈希算法及配套工具。
推荐的算法是 Bcrypt。
先安装附带 Bcrypt 的 PassLib
pip install passlib[bcrypt]
passlib
甚至可以读取 Django、Flask 的安全插件等工具创建的密码。
例如,把 Django 应用的数据共享给 FastAPI 应用的数据库。或利用同一个数据库,可以逐步把应用从 Django 迁移到 FastAPI。用户可以同时从 Django 应用或 FastAPI 应用登录。
### 密码哈希与校验#
首先引入 passlib
# 引入 passlib 用于密码哈希化
from passlib.context import CryptContext
创建一个哈希密码上下文对象
# 创建一个密码哈希化的上下文对象,CryptContext 是用于处理密码哈希和验证的工具
# 它能接受一个或者多个密码哈希方案作为参数
# 这里使用 bcrypt 算法
# 并且 deprecated 参数设置为 auto,表示当有更安全的算法出现时,会自动使用更安全的算法
# 他有 verify() 和 hash() 两个方法,分别用于校验密码和获取 hash 后的密码
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
接着创建三个工具函数
1、将用户的密码哈希化
# 获取 hash 后的密码
def get_password_hash(password):
# 使用 bcrypt 算法对密码进行哈希
return pwd_context.hash(password)
2、校验接收的密码和存储的哈希密码是否一致
# 校验密码
def verify_password(plain_password, hashed_password):
# 使用 bcrypt 算法校验密码
return pwd_context.verify(plain_password, hashed_password)
3、用于身份验证,使用前面两个函数,来校验,并返回用户
# 通过用户名密码获取用户信息进行校验
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
## 处理 JWT 令牌#
导入已安装的模块。
创建用于 JWT 令牌签名的随机密钥。
使用以下命令,生成安全的随机密钥:
openssl rand -hex 32
如果没有 openssl 可以到 [官网](Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions (slproweb.com)) 下载
启动 window OpenSSL Command Prompt
复制生成的随机密钥 例如
0492fa85994a4cc7fa7a0331f4e22dbfdf3555e516fbf371f31679418963d3f9
然后,把生成的密钥复制到变量SECRET_KEY,注意,不要使用本例所示的密钥。
创建指定 JWT 令牌签名算法的变量 ALGORITHM,本例中的值为 "HS256"
。
创建设置令牌过期时间的变量。
定义令牌端点响应的 Pydantic 模型。
创建生成新的访问令牌的工具函数。
首先引入 jwt
from jose import JWTError,jwt
接着声明
1、随机密钥 SECRET_KEY 也就是上面 openSSL 生成的
2、JWT 令牌签名算法的变量 ALGORITHM ,本例中使用的是 'HS256'
3、设置令牌过期时间的变量 ACCESS_TOKEN_EXPIRE_MINUTES 分钟数
SECRET_KEY = "0492fa85994a4cc7fa7a0331f4e22dbfdf3555e516fbf371f31679418963d3f9"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
定义一个返回 Token 的模型类
class Token(BaseModel):
access_token: str
token_type: str
引入 timedelta 类型
from datetime import datetime, timedelta
创建一个新的 生成访问令牌 的函数
# 生成访问令牌的函数
# data 表示的是要存进 token 中的数据,比如用户名、用户 ID 等
# expires_delta 指的是过期时间,它是一个 timedelta 类型的变量
# 如果没有传的话,就使用默认的过期时间,这里写死为 15 分钟
def create_access_token(data: dict, expires_delta: Union[timedelta,None] = None):
# to_encode 是要存进 token 中的数据,将其复制一份,防止修改原数据
to_encode = data.copy()
# expires_delta 是 timedelta 类型的变量,指的是过期的时间
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
# 如果 expires_delta 为空,就使用默认的过期时间
expire = datetime.utcnow() + timedelta(minutes=15)
# 将过期时间存进 token 中
to_encode.update({"exp": expire})
# 使用 jwt.encode() 方法生成 token 入参是 用户信息 , 随机的密钥 , 加密算法
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
修改 登陆获取 token 的接口
@app.post("/token", response_model=Token):
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
# 调用校验用户的函数,传入用户名和密码
user = authenticate_user(fake_users_db,form_data.username,form_data.password)
# 如果用户不存在
if not user:
raise HTTPException(
status_code=400,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
# 如果用户存在,就生成 token
# 将之前定义好的 token 过期时间 赋值给 access_token_expires 传到 expires_delta 中
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# 调用生成 token 的函数,传入用户信息和过期时间
access_token = create_access_token(
userData = {"username": user.username}, expires_delta=access_token_expires)
# 将生成 token 封装成 Token 模型类
token = Token(access_token=access_token, token_type="bearer")
# 将 token 返回
return token
## 更新依赖项#
更新 get_current_user
以接收与之前相同的令牌,但这里用的是 JWT 令牌。
解码并校验接收到的令牌,然后,返回当前用户。
如果令牌无效,则直接返回 HTTP 错误。
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭证",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 使用 token 和定义好的 OpenSSL生成的随机密钥 SECRET_KEY 以及加密方式
# 调用 jwt 来解码 token,获取用户的信息,payload 的类型是 dict
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 从 token 中获取用户名 ,也就是之前生成 token 的时候传入的 username
username = payload.get("username")
# 如果用户名不存在
if username is None:
# 抛出前面定义的异常
raise credentials_exception
except JWTError:
# 如果解码失败,抛出异常
raise credentials_exception
# 将用户名传给 get_user 函数,获取用户信息
user = get_user(fake_users_db, username=username)
if user is None:
# 如果用户不存在,抛出异常
raise credentials_exception
return user
代码中的
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭证",
headers={"WWW-Authenticate": "Bearer"},
)
用于表示验证凭据失败的异常情况。也就是在这段代码中,如果在解码 jwt 令牌时发生任何异常
(比如令牌无效、过期、被篡改等)都会引发这个异常
它返回了 status_code 表示未经授权的状态 401 、 detail 错误的信息 、 headers 令牌的类型
## 获得当前登陆用户的信息#
# 通过 get_current_active_user 获取当前登陆并且没有被禁用的用户 current_user
@app.get("/users/me")
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
return current_user
## 完整代码#
# 创建一个密码哈希化的上下文对象,CryptContext 是用于处理密码哈希和验证的工具
# 它能接受一个或者多个密码哈希方案作为参数
# 这里使用 bcrypt 算法
# 并且 deprecated 参数设置为 auto,表示当有更安全的算法出现时,会自动使用更安全的算法
# 他有 verify() 和 hash() 两个方法,分别用于校验密码和获取 hash 后的密码
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 通过 /token 中的 username 和 password 获取 token, 返回 token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWT 令牌签名算法的变量,这里使用 HS256 算法,还有其他算法,比如 RS256、ES256 等
ALGORITHM = "HS256"
# 使用 openSSL 随机生成一个 SECRET_KEY
SECRET_KEY = "905214810165f5b010b85f3fa7985724aa2bfba9b132a333e82b7fc6b524170e"
# token的过期时间,单位为分钟
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 定义一个返回 token 模型类
class Token(BaseModel):
access_token: str
token_type: str
# 定义一个 token 数据模型类
class TokenData(BaseModel):
username: Union[str, None] = None
# 生成访问令牌的函数
# userData 表示的是要存进 token 中的数据,比如用户名、用户 ID 等
# expires_delta 指的是过期时间,它是一个 timedelta 类型的变量
# 如果没有传的话,就使用默认的过期时间,这里写死为 15 分钟
def create_access_token(userData: dict, expires_delta: Union[timedelta,None] = None):
# to_encode 是要存进 token 中的数据,将其复制一份,防止修改原数据
to_encode = userData.copy()
# expires_delta 是 timedelta 类型的变量,指的是过期的时间
# 使用 datetime.utcnow() 获取当前时间 + 过期时间 得到 token 到期的时间
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
# 如果 expires_delta 为空,就使用默认的过期时间
expire = datetime.utcnow() + timedelta(minutes=15)
# 将到期时间存进 token 中
to_encode.update({"exp": expire})
# 使用 jwt.encode() 方法生成 token 入参是 用户信息 , 随机的密钥 , 加密算法
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 模拟数据库
fake_users_db = {
"bryce": {
"username": "bryce",
"full_name": "bryce Yu",
"email": "[email protected]",
# 123123
"hashed_password": "$2b$12$w2IXuK5lx6mMxNVBBoxRl.jHYWRblX1QjVg6UCtBZ7BzBMOH7RvkW",
"disabled": False,
},
"yu": {
"username": "yu",
"full_name": "yu",
"email": "[email protected]",
# 123456
"hashed_password": "$2b$12$BknHh482Zqkt9ODqSSbSiepUBdL1GGoeaxKCnGNXmfUbjbe7LKcxy",
"disabled": True,
},
}
# 创建一个 用户 模型类
class UserAuth(BaseModel):
username: str
# 先使用 Union[str, None],后面会改成 EmailStr
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
# 创建一个 用户 数据库 模型类
class UserDB(UserAuth):
hashed_password: str
# 校验密码
def verify_password(plain_password, hashed_password):
# 使用 bcrypt 算法校验密码
return pwd_context.verify(plain_password, hashed_password)
# 获取 hash 后的密码
def get_password_hash(password):
# 使用 bcrypt 算法对密码进行哈希
return pwd_context.hash(password)
# 通过用户名密码获取用户信息进行校验
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# 从 db 数据库中,获取用户名为 username 的用户信息
def get_user(db, username: str):
# 现在不安全的使用 用户名 作为 token
if username in db:
user_dict = db[username]
return UserDB(**user_dict)
# 这个方法会就是通过依赖 oauth2_scheme 来获取 token,并校验 token 的有效性
# 如果 token 有效,调用 get_user 函数,获取当前的用户信息,返回用户信息
# 如果无效,就抛出异常
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭证",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 使用 token 和定义好的 OpenSSL生成的随机密钥 SECRET_KEY 以及加密方式
# 调用 jwt 来解码 token,获取用户的信息,payload 的类型是 dict
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 从 token 中获取用户名 ,也就是之前生成 token 的时候传入的 username
username = payload.get("username")
# 如果用户名不存在
if username is None:
# 抛出前面定义的异常
raise credentials_exception
except JWTError:
# 如果解码失败,抛出异常
raise credentials_exception
# 将用户名传给 get_user 函数,获取用户信息
user = get_user(fake_users_db, username=username)
if user is None:
# 如果用户不存在,抛出异常
raise credentials_exception
return user
# 获取没有被禁用的用户,这个函数会 调用依赖 get_current_user 函数,获得当前登陆的用户
# 接着判断这个用户是否被禁用,如果被禁用,就抛出异常,如果没有被禁用,就返回用户信息
async def get_current_active_user(current_user: UserAuth = Depends(get_current_user)):
# 通过 get_current_user 获取当前登陆的用户 current_user
# 接着判断用户是否被禁用
if current_user.disabled:
raise HTTPException(status_code=400, detail="用户被禁用")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
# 调用校验用户的函数,传入用户名和密码
user = authenticate_user(fake_users_db,form_data.username,form_data.password)
# 如果用户不存在
if not user:
raise HTTPException(
status_code=400,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
# 如果用户存在,就生成 token
# 将之前定义好的 token 过期时间 赋值给 access_token_expires 传到 expires_delta 中
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# 调用生成 token 的函数,传入用户信息和过期时间
access_token = create_access_token(
userData = {"username": user.username}, expires_delta=access_token_expires)
# 将生成 token 封装成 Token 模型类
token = Token(access_token=access_token, token_type="bearer")
# 将 token 返回
return token
# 通过 get_current_active_user 获取当前登陆并且没有被禁用的用户 current_user
@app.get("/users/me")
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
return current_user
# 测试使用,获取密码的哈希值
@app.get("/password/")
async def read_password(password: str):
return get_password_hash(password)
### 流程总结#
1、登陆
首先调用 /token 方法进行用户登陆,方法通过 OAuth2PasswordRequestForm 获得表单数据
表单数据中存有 username 和 password,
此时调用 authenticate 方法,传入 模拟的用户数据库,以及用户名和密码
在 authenticate 方法中,使用 get_user 方法比较 模拟数据库中用户名相同的用户
如果找到了用户,调用 verify_password 方法,传入找到的用户数据库存放的哈希密码和登陆密码
verify_password 方法中
调用 passlib 提供的上下文加密 CryptContext 类实例对象
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
的 verify 方法进行校验,会返回校验结果,如果为 true,就会把 get_user 函数获取到的 user 返回
接着设置好 token 过期时间,通过 create_access_token 函数,传入用户信息
create_access_token 中会将用户信息和过期时间内整合成一个 dict 调用 jwt.encode 生成 token
接着把生成的 token 返回
2、获取当前的登陆用户
调用 /users/me 接口,消息头中需要添加 Authorization 值,值为 'Bearer ' + 登陆返回的 token
接口函数调用依赖 get_current_active_user 来获取没有被禁用的当前的用户
get_current_active_user 会通过 get_current_user 函数获取当前用户,进一步判断有没有被禁用
get_current_user 会通过依赖 oauth2_scheme 获取消息头中的 token
并使用 jwt.decode 解码 token 从而获得登录时存进去的用户信息,比如用户名
使用用户名,从数据库中得到 用户的数据,并返回
注意!请求中 Authorization
响应头的值以 Bearer
开头。
### 接口测试#
表单调用 /token
Get 请求获取当前登陆用户
# 连接 mongoDB#
## 引入 pymongo#
首先需要安装 python 对 mongoDB 操作的库
pip install pymongo
接着在 文件中 引入
# 引入 mongodb 数据库
from pymongo import MongoClient
## 配置 mongoDB 连接#
# 创建 mongodb 数据库连接
client = MongoClient('mongodb://fastapi:fastapi@localhost:27017/fastapi')
# 创建数据库, 数据库的操作都使用 db 对象来完成
# 比如 db.users.insert_one() 表示在 users 集合中插入一条数据
db = client['fastapi']
## 注册用户使用 mongoDB 新增#
新增一个用户注册的接口
代码如下
@app.post("/users/register")
async def register_user(user: UserInDB):
userdict = user.dict()
userdb = db.users.find_one({"username": user.username})
if userdb:
raise HTTPException(
status_code=400,
detail="用户已存在",
)
hashdPassword = get_password_hash(user.password)
userdict.update({"hashed_password": hashdPassword})
# 将用户信息转换为 json 格式
json_compatible_item_data = jsonable_encoder(userdict)
# 将用户信息存进数据库
db.users.insert_one(json_compatible_item_data)
# 返回用户信息
return user
## 登陆时使用查询用户#
修改之前的 get_user 函数
# 从 db 数据库中,获取用户名为 username 的用户信息
def get_user( username: str):
userdb = db.users.find_one({"username": username})
if userdb:
return UserDB(**userdb)
## 修改用户信息#
创建用户更新模型,因为可以不更新密码,所以密码可以为空
# 创建一个 用户 更新 模型类
# 因为可以不更新密码,所以密码可以为空
class UserUpdate(UserAuth):
password: Union[str, None] = None
以及一个 UserOut 模型,用来返回,不返回密码
# 用来返回的用户模型类
class UserOut(UserAuth):
pass
同时获取当前登陆用户的模型也要修改
# 通过 get_current_active_user 获取当前登陆并且没有被禁用的用户 current_user
@app.get("/users/me",response_model=UserOut)
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
return current_user
@app.put("/users/update",response_model=UserOut)
async def update_user(userInDB: UserUpdate,current_user: UserAuth = Depends(get_current_active_user)):
# 将当前登陆用户的信息转换为 dict
current_user_dict = current_user.dict()
# 判断要更新的用户名和当前登陆的用户名是否一致
if userInDB.username != current_user_dict.get("username"):
raise HTTPException(
status_code=400,
detail="无权修改",
)
# 将要更新的用户信息转换为 dict
userdict = userInDB.dict()
# 如果密码不为空,就更新密码
if userInDB.password:
hashdPassword = get_password_hash(userInDB.password)
userdict.update({"hashed_password": hashdPassword})
else:
# 如果密码为空,就使用当前登陆用户的密码
userdict.update({"hashed_password": current_user_dict.get("hashed_password")})
# 将用户信息中的密码删除,不能将明文密码存进数据库
userdict.pop("password")
# 将用户信息转换为 json 格式
json_compatible_item_data = jsonable_encoder(userdict)
# 将用户信息存进数据库
db.users.update_one({"username": userInDB.username}, {"$set": json_compatible_item_data})
# 返回用户信息
return {"username": userInDB.username,"email": userInDB.email,"full_name": userInDB.full_name}
## 查看 mongo 库数据#
使用 mongo compass 查看
# 进一步整理#
使用 router 的方式来实现接口
文件的目录将会是
├── MyApi
│ ├── app # 「app」是一个 Python 包
│ ├── __init__.py # 这个文件使「app」成为一个 Python 包
│ ├── main.py # 「main」模块,例如 import app.main
│ ├── dependencies.py # 「dependencies」模块,例如 import app.dependencies
│ └── routers # 「routers」是一个「Python 子包」
│ ├── __init__.py # 使「routers」成为一个「Python 子包」
│ └── users.py # 「users」子模块,例如 import app.routers.users
注意文件夹中要放入一个 __init__.py
空文件,用来告诉 python 这是文件目录
每个目录或子目录中都有一个。
这就是能将代码从一个文件导入到另一个文件的原因。
例如 在 app/main.py 中 可以直接使用
# 绝对路径
from app.routers import items
# 相对路径
from .routers import items
## routers 路由#
首先在 app 文件夹中
创建一个 routers 文件夹,用来存放路由
在文件夹中新建一个 users.py
文件中首先引入
from fastapi import APIRouter
对此路由进行配置
# 对路由进行配置
router = APIRouter(
# 定义访问路由的前缀,前缀后面不能有斜杠
# 在这里设置了前缀,就不用在每个路径操作中都写上前缀了
prefix="/users",
# 为了在文档中显示自定义的标签,我们可以使用 tags 参数
tags=["users"],
# 为了在文档中显示自定义的响应,我们可以使用 responses 参数定义返回值
responses={404: {"description": "Not found"}},
)
接着就可以将直接 user 相关的路由全部移动到这里,将 app 改为 router
@router.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
# 调用校验用户的函数,传入用户名和密码
user = authenticate_user(form_data.username,form_data.password)
# 如果用户不存在
if not user:
raise HTTPException(
status_code=400,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
# 如果用户存在,就生成 token
# 将之前定义好的 token 过期时间 赋值给 access_token_expires 传到 expires_delta 中
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# 调用生成 token 的函数,传入用户信息和过期时间
access_token = create_access_token(
userData = {"username": user.username}, expires_delta=access_token_expires)
# 将生成 token 封装成 Token 模型类
token = Token(access_token=access_token, token_type="bearer")
# 将 token 返回
return token
# 通过 get_current_active_user 获取当前登陆并且没有被禁用的用户 current_user
@router.get("/me",response_model=UserOut)
async def read_users_me(current_user: UserAuth = Depends(get_current_active_user)):
return current_user
# 测试使用,获取密码的哈希值
@router.get("/password/")
async def read_password(password: str):
return get_password_hash(password)
@router.post("/register",response_model=UserOut)
async def register_user(user: UserInDB):
userdict = user.dict()
userdb = db.users.find_one({"username": user.username})
if userdb:
raise HTTPException(
status_code=400,
detail="用户已存在",
)
hashdPassword = get_password_hash(user.password)
userdict.update({"hashed_password": hashdPassword})
# 将用户信息中的密码删除,不能将明文密码存进数据库
userdict.pop("password")
# 将用户信息转换为 json 格式
json_compatible_item_data = jsonable_encoder(userdict)
# 将用户信息存进数据库
db.users.insert_one(json_compatible_item_data)
# 返回用户信息
return user
@router.put("/update",response_model=UserOut)
async def update_user(userInDB: UserUpdate,current_user: UserAuth = Depends(get_current_active_user)):
# 将当前登陆用户的信息转换为 dict
current_user_dict = current_user.dict()
# 判断要更新的用户名和当前登陆的用户名是否一致
if userInDB.username != current_user_dict.get("username"):
raise HTTPException(
status_code=400,
detail="无权修改",
)
# 将要更新的用户信息转换为 dict
userdict = userInDB.dict()
# 如果密码不为空,就更新密码
if userInDB.password:
hashdPassword = get_password_hash(userInDB.password)
userdict.update({"hashed_password": hashdPassword})
else:
# 如果密码为空,就使用当前登陆用户的密码
userdict.update({"hashed_password": current_user_dict.get("hashed_password")})
# 将用户信息中的密码删除,不能将明文密码存进数据库
userdict.pop("password")
# 将用户信息转换为 json 格式
json_compatible_item_data = jsonable_encoder(userdict)
# 将用户信息存进数据库
db.users.update_one({"username": userInDB.username}, {"$set": json_compatible_item_data})
# 返回用户信息
return {"username": userInDB.username,"email": userInDB.email,"full_name": userInDB.full_name}
这时候会出现很多错误,因为之前 import 的库都不存在
## dependencies 依赖项#
当存在有很多地方都要用到的相同的依赖项的时候
可以全部放在 dependencies 中统一存放
在 /app 目录下 ,新增一个 dependencies.py 文件
就可以讲,之前 users 中那些 import 和 一直自定义的变量 全部放到 dependencies.py 中
例如:
# Path: dependencies.py
# 这个文件是用来处理依赖的,
# 比如需要一些在应用程序的好几个地方所使用的依赖项,
# 那么就可以将这些依赖项放在这个文件中
from fastapi import Depends, HTTPException, status
# 引入 OAuth2PasswordBearer 类,用于声明 oauth2_scheme 变量,用于身份验证
from fastapi.security import OAuth2PasswordBearer,OAuth2PasswordRequestForm
# 引入 Union
from typing import Union
# 引入 BaseModel,用来解析请求体
from pydantic import BaseModel,HttpUrl,EmailStr
# 引入时间模块
from datetime import datetime,timedelta
# 引入 JSONCompatibleEncoder 用于将模型转换为 JSON
from fastapi.encoders import jsonable_encoder
# 引入 passlib 用于密码哈希化
from passlib.context import CryptContext
# 引入 JWT 用于生成 token
from jose import JWTError,jwt
# 引入 mongodb 数据库
from pymongo import MongoClient
# 创建 mongodb 数据库连接
client = MongoClient('mongodb://fastapi:[email protected]:27017/fastapi')
# 创建数据库, 数据库的操作都使用 db 对象来完成
# 比如 db.users.insert_one() 表示在 users 集合中插入一条数据
db = client['fastapi']
# 创建一个密码哈希化的上下文对象,CryptContext 是用于处理密码哈希和验证的工具
# 它能接受一个或者多个密码哈希方案作为参数
# 这里使用 bcrypt 算法
# 并且 deprecated 参数设置为 auto,表示当有更安全的算法出现时,会自动使用更安全的算法
# 他有 verify() 和 hash() 两个方法,分别用于校验密码和获取 hash 后的密码
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 通过 /token 中的 username 和 password 获取 token, 返回 token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWT 令牌签名算法的变量,这里使用 HS256 算法,还有其他算法,比如 RS256、ES256 等
ALGORITHM = "HS256"
# 使用 openSSL 随机生成一个 SECRET_KEY
SECRET_KEY = "905214810165f5b010b85f3fa7985724aa2bfba9b132a333e82b7fc6b524170e"
# token的过期时间,单位为分钟
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 定义一个返回 token 模型类
class Token(BaseModel):
access_token: str
token_type: str
# 定义一个 token 数据模型类
class TokenData(BaseModel):
username: Union[str, None] = None
# 创建一个 用户 模型类
class UserAuth(BaseModel):
username: str
# 先使用 Union[str, None],后面会改成 EmailStr
email: Union[EmailStr, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = False
# 创建一个 用户 数据库 模型类
class UserDB(UserAuth):
hashed_password: str
class UserInDB(UserAuth):
password: str
# 创建一个 用户 更新 模型类
# 因为可以不更新密码,所以密码可以为空
class UserUpdate(UserAuth):
password: Union[str, None] = None
# 用来返回的用户模型类
class UserOut(UserAuth):
pass
# 生成访问令牌的函数
# userData 表示的是要存进 token 中的数据,比如用户名、用户 ID 等
# expires_delta 指的是过期时间,它是一个 timedelta 类型的变量
# 如果没有传的话,就使用默认的过期时间,这里写死为 15 分钟
def create_access_token(userData: dict, expires_delta: Union[timedelta,None] = None):
# to_encode 是要存进 token 中的数据,将其复制一份,防止修改原数据
to_encode = userData.copy()
# expires_delta 是 timedelta 类型的变量,指的是过期的时间
# 使用 datetime.utcnow() 获取当前时间 + 过期时间 得到 token 到期的时间
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
# 如果 expires_delta 为空,就使用默认的过期时间
expire = datetime.utcnow() + timedelta(minutes=15)
# 将到期时间存进 token 中
to_encode.update({"exp": expire})
# 使用 jwt.encode() 方法生成 token 入参是 用户信息 , 随机的密钥 , 加密算法
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 校验密码
def verify_password(plain_password, hashed_password):
# 使用 bcrypt 算法校验密码
return pwd_context.verify(plain_password, hashed_password)
# 获取 hash 后的密码
def get_password_hash(password):
# 使用 bcrypt 算法对密码进行哈希
return pwd_context.hash(password)
# 通过用户名密码获取用户信息进行校验
def authenticate_user(username: str, password: str):
user = get_user(username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# 从 db 数据库中,获取用户名为 username 的用户信息
def get_user( username: str):
userdb = db.users.find_one({"username": username})
if userdb:
return UserDB(**userdb)
# 这个方法会就是通过依赖 oauth2_scheme 来获取 token,并校验 token 的有效性
# 如果 token 有效,调用 get_user 函数,获取当前的用户信息,返回用户信息
# 如果无效,就抛出异常
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭证",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 使用 token 和定义好的 OpenSSL生成的随机密钥 SECRET_KEY 以及加密方式
# 调用 jwt 来解码 token,获取用户的信息,payload 的类型是 dict
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 从 token 中获取用户名 ,也就是之前生成 token 的时候传入的 username
username = payload.get("username")
# 如果用户名不存在
if username is None:
# 抛出前面定义的异常
raise credentials_exception
except JWTError:
# 如果解码失败,抛出异常
raise credentials_exception
# 将用户名传给 get_user 函数,获取用户信息
user = get_user(username=username)
if user is None:
# 如果用户不存在,抛出异常
raise credentials_exception
return user
# 获取没有被禁用的用户,这个函数会 调用依赖 get_current_user 函数,获得当前登陆的用户
# 接着判断这个用户是否被禁用,如果被禁用,就抛出异常,如果没有被禁用,就返回用户信息
async def get_current_active_user(current_user: UserAuth = Depends(get_current_user)):
# 通过 get_current_user 获取当前登陆的用户 current_user
# 接着判断用户是否被禁用
if current_user.disabled:
raise HTTPException(status_code=400, detail="用户被禁用")
return current_user
接着在 /routers/users.py 中 引入 dependencies 即可
from fastapi import APIRouter, Depends
# 引入 dependencies.py 中的依赖
from dependencies import *
## main.py#
这时候在 /app 目录中新增一个 main.py 文件
from fastapi import FastAPI
# 引入 CORS 中间件,用于跨域
from fastapi.middleware.cors import CORSMiddleware
# 从 routers 中导入 users
from .routers import users
app = FastAPI()
# 跨域设置
app.add_middleware(
CORSMiddleware,
# 允许所有的域名访问
allow_origins=["*"],
# 允许所有的请求携带 cookie
allow_credentials=True,
# 允许所有的请求方法
allow_methods=["*"],
# 允许所有的请求头
allow_headers=["*"],
)
# 引入 router 中的 users 路由
app.include_router(users.router)
## 启动项目#
进入 MyApi 目录中执行
uvicorn app.main:app --reload
即可启动程序
此时 从 openApi 的文档中也能看到
Tags 生效了