Press "Enter" to skip to content

LLM SaaS部分1的FastAPI模板——身份验证和文件上传

最近,FastAPI 在 Python 后端开发者社区中引起了很大的关注,得益于其简单性、异步性和原生的 Swagger UI。

在 GitHub 上,Quivr 是那些受欢迎的低代码/无代码开源项目之一,它具有众多的星标(在撰写本文时为 24.2k)和结构良好的代码库。首先,我想向这个仓库及其贡献者致以敬意,感谢他们为 Python 社区打造如此好的参考项目所做的出色工作。

GitHub – StanGirard/quivr: 🧠 你超强大的第二大脑 🧠 你的个人效率…

🧠 你超强大的第二大脑 🧠 你的个人效率助手,可以与你的文件(PDF、CSV)及应用交互…

github.com

从这个仓库中有很多好东西我们需要仔细研究,而且,我们想为任何未来的低代码/无代码场景建立一个基于这个仓库的模板。因此,我决定将其拆分为两篇文章。在这篇文章中,我们将重点关注以下内容:

  • 高级架构
  • 使用 FastAPI 进行 Supabase Auth
  • 使用 Supabase 进行文件上传

在第 2 部分中,我们将涵盖以下内容:

  • 用于长时间运行过程的 Celery Worker 和消息队列
  • Postgres 中的 pg-vector 插件

在第 3 部分中,我们将涵盖以下内容:

  • 用于 ChatGPT 的 FastAPI,例如流式负载
  • Stripe 支付
  • API 测试
  • 任何未来用例的模板

高级架构

Source: 作者手绘

后端架构由三个主要部分组成:Supabase 数据库、FastAPI 后端服务器和 Celery 服务器。Celery 用于长时间运行的后台任务,例如对大型 PDF 文档进行嵌入。在 FastAPI 和 Celery 服务器之间,使用 Redis 作为消息代理。从 FastAPI/Celery 到 Supabase 的通信通过 Supabase 客户端(Python SDK)进行。

Supabase Auth

Supabase 是一个开源的 Firebase 替代品。它本质上是一个 Postgres 数据库,但它具有其他内置功能,例如身份验证、边缘函数、大型二进制对象存储、pg-vector 等,相比于从头开始使用 Postgres 数据库,使用 Supabase 可以简化开发工作。

使用 supabase auth,你可以从 supabase 客户端库中简单调用 signUp() 和 signIn() 函数。以下示例展示了 JavaScript 中的用法(来源:https://supabase.com/docs/guides/auth/social-login)。

async function signUpNewUser() {  const { data, error } = await supabase.auth.signUp({    email: 'example@email.com',    password: 'example-password',    options: {      redirectTo: 'https//example.com/welcome'    }  })}

async function signInWithEmail() {  const { data, error } = await supabase.auth.signInWithPassword({    email: 'example@email.com',    password: 'example-password',    options: {      redirectTo: 'https//example.com/welcome'    }  })}

async function signOut() {  const { error } = await supabase.auth.signOut()}

这是前端代码,那么后端该怎么做呢?

好问题。通过前端和 Supabase 之间的交互,Supabase 实际上会创建一张名为 auth.users 的表格。该表格位于 Supabase 仪表盘的“身份验证”部分。

Source: 作者截图

未来需要与auth.users进行引用的任何表格,只需执行

CREATE TABLE IF NOT EXISTS user_daily_usage(    user_id UUID REFERENCES auth.users (id),    email TEXT,    date TEXT,    daily_requests_count INT,    PRIMARY KEY (user_id, date));

然后我们需要为一些后端API对用户进行身份验证,如果前端直接使用Supabase.auth,后端如何对用户请求进行身份验证以用于其他API调用?

要解释这一点,我们需要了解JWT(JavaScript Web Token)的工作原理。

Source: author’s drawing

您可以在https://jwt.io/上测试JWT的编码和解码。用户在与auth服务器进行注册/登录后,将获得一个JWT,因此如果用户在令牌过期之前的短时间内再次加载网站,他/她就不需要再次输入密码。

Source: author’s screenshot

要为用户生成JWT,您需要使用“sub”作为用户ID(从auth.users自动分配给用户的UUID)和用于注册的电子邮件。

因此,要解码JWT,auth服务器或后端都需要使用256位的密钥。如果使用Supabase auth,它在管理员仪表板中称为“Anon key”。它将是后端用于解码JWT的相同密钥。

FastAPI后端的auth模块可能如下所示:

import osfrom typing import Optionalfrom auth.jwt_token_handler import decode_access_token, verify_tokenfrom fastapi import Depends, HTTPException, Requestfrom fastapi.security import HTTPAuthorizationCredentials, HTTPBearerfrom models import UserIdentityclass AuthBearer(HTTPBearer):    def __init__(self, auto_error: bool = True):        super().__init__(auto_error=auto_error)    async def __call__(        self,        request: Request,    ):        credentials: Optional[HTTPAuthorizationCredentials] = await super().__call__(            request        )        self.check_scheme(credentials)        token = credentials.credentials  # pyright: ignore reportPrivateUsage=none        return await self.authenticate(            token,        )    def check_scheme(self, credentials):        if credentials and credentials.scheme != "Bearer":            raise HTTPException(status_code=401, detail="Token must be Bearer")        elif not credentials:            raise HTTPException(                status_code=403, detail="Authentication credentials missing"            )    async def authenticate(        self,        token: str,    ) -> UserIdentity:        if os.environ.get("AUTHENTICATE") == "false":            return self.get_test_user()        elif verify_token(token):            return decode_access_token(token)        else:            raise HTTPException(status_code=401, detail="Invalid token or api key.")    def get_test_user(self) -> UserIdentity:        return UserIdentity(            email="test@example.com", id="696dda89-d395-4601-af3d-e1c66de3df1a"  # type: ignore        )  # replace with test user informationdef get_current_user(user: UserIdentity = Depends(AuthBearer())) -> UserIdentity:    return user

import osfrom datetime import datetime, timedeltafrom typing import Optionalfrom jose import jwtfrom jose.exceptions import JWTErrorfrom models import UserIdentitySECRET_KEY = os.environ.get("JWT_SECRET_KEY")ALGORITHM = "HS256"if not SECRET_KEY:    raise ValueError("JWT_SECRET_KEY environment variable not set")def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):    to_encode = data.copy()    if expires_delta:        expire = datetime.utcnow() + expires_delta    else:        expire = datetime.utcnow() + timedelta(minutes=15)    to_encode.update({"exp": expire})    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)    return encoded_jwtdef decode_access_token(token: str) -> UserIdentity:    try:        payload = jwt.decode(            token, SECRET_KEY, algorithms=[ALGORITHM], options={"verify_aud": False}        )    except JWTError:        return None  # pyright: ignore reportPrivateUsage=none    return UserIdentity(        email=payload.get("email"),        id=payload.get("sub"),  # pyright: ignore reportPrivateUsage=none    )def verify_token(token: str):    payload = decode_access_token(token)    return payload is not None

使用 Supabase 进行文件上传

您将能够直接调用 Supabase 客户端库来上传文件。可以编写一个实用函数如下:

import jsonfrom multiprocessing import get_loggerfrom langchain.pydantic_v1 import Fieldfrom langchain.schema import Documentfrom supabase.client import Client, create_clientimport osfrom dotenv import load_dotenvload_dotenv()logger = get_logger()def get_supabase_client() -> Client:    supabase_client: Client = create_client(        os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY")    )    return supabase_clientdef upload_file_storage(file, file_identifier: str):    supabase_client: Client = get_supabase_client()    # res = supabase_client.storage.create_bucket("quivr")    response = None    try:        response = supabase_client.storage.from_(os.getenv("SUPABASE_BUCKET")).upload(file_identifier, file)        return response    except Exception as e:        logger.error(e)        raise e

然后是 FastAPI 的路由

import osfrom typing import Optionalfrom uuid import UUIDfrom auth import AuthBearer, get_current_userfrom fastapi import APIRouter, Depends, HTTPException, Query, Request, UploadFilefrom logger import get_loggerfrom models import UserIdentity, UserUsagefrom repository.files.upload_file import upload_file_storagefrom repository.user_identity import get_user_identitylogger = get_logger(__name__)upload_router = APIRouter()@upload_router.get("/upload/healthz", tags=["Health"])async def healthz():    return {"status": "ok"}@upload_router.post("/upload", dependencies=[Depends(AuthBearer())], tags=["Upload"])async def upload_file(    request: Request,    uploadFile: UploadFile,    chat_id: Optional[UUID] = Query(None, description="聊天的 ID"),    current_user: UserIdentity = Depends(get_current_user),):    file_content = await uploadFile.read()    filename_with_user_id = str(current_user.id) + "/" + str(uploadFile.filename)    try:        fileInStorage = upload_file_storage(file_content, filename_with_user_id)        logger.info(f"文件 {fileInStorage} 上传成功")    except Exception as e:        if "The resource already exists" in str(e):            raise HTTPException(                status_code=403,                detail=f"文件 {uploadFile.filename} 已存在于存储中。",            )        else:            raise HTTPException(                status_code=500, detail="文件上传失败。"            )    return {"message": "文件处理已开始。"}

待续…(第二部分)

如果您不熟悉 FastAPI,这可能有点麻烦。但在第三部分结束时,我将分享整个 GitHub 仓库,这将更加清晰。请继续关注。

Leave a Reply

Your email address will not be published. Required fields are marked *