最近,FastAPI 在 Python 后端开发者社区中引起了很大的关注,得益于其简单性、异步性和原生的 Swagger UI。
在 GitHub 上,Quivr 是那些受欢迎的低代码/无代码开源项目之一,它具有众多的星标(在撰写本文时为 24.2k)和结构良好的代码库。首先,我想向这个仓库及其贡献者致以敬意,感谢他们为 Python 社区打造如此好的参考项目所做的出色工作。
GitHub – StanGirard/quivr: 🧠 你超强大的第二大脑 🧠 你的个人效率…
🧠 你超强大的第二大脑 🧠 你的个人效率助手,可以与你的文件(PDF、CSV)及应用交互…
github.com
从这个仓库中有很多好东西我们需要仔细研究,而且,我们想为任何未来的低代码/无代码场景建立一个基于这个仓库的模板。因此,我决定将其拆分为两篇文章。在这篇文章中,我们将重点关注以下内容:
- 高级架构
- 使用 FastAPI 进行 Supabase Auth
- 使用 Supabase 进行文件上传
在第 2 部分中,我们将涵盖以下内容:
- 用于长时间运行过程的 Celery Worker 和消息队列
- Postgres 中的 pg-vector 插件
在第 3 部分中,我们将涵盖以下内容:
- 用于 ChatGPT 的 FastAPI,例如流式负载
- Stripe 支付
- API 测试
- 任何未来用例的模板
高级架构
后端架构由三个主要部分组成:Supabase 数据库、FastAPI 后端服务器和 Celery 服务器。Celery 用于长时间运行的后台任务,例如对大型 PDF 文档进行嵌入。在 FastAPI 和 Celery 服务器之间,使用 Redis 作为消息代理。从 FastAPI/Celery 到 Supabase 的通信通过 Supabase 客户端(Python SDK)进行。
Supabase Auth
Supabase 是一个开源的 Firebase 替代品。它本质上是一个 Postgres 数据库,但它具有其他内置功能,例如身份验证、边缘函数、大型二进制对象存储、pg-vector 等,相比于从头开始使用 Postgres 数据库,使用 Supabase 可以简化开发工作。
使用 supabase auth,你可以从 supabase 客户端库中简单调用 signUp() 和 signIn() 函数。以下示例展示了 JavaScript 中的用法(来源:https://supabase.com/docs/guides/auth/social-login)。
async function signUpNewUser() { const { data, error } = await supabase.auth.signUp({ email: 'example@email.com', password: 'example-password', options: { redirectTo: 'https//example.com/welcome' } })}
async function signInWithEmail() { const { data, error } = await supabase.auth.signInWithPassword({ email: 'example@email.com', password: 'example-password', options: { redirectTo: 'https//example.com/welcome' } })}
async function signOut() { const { error } = await supabase.auth.signOut()}
这是前端代码,那么后端该怎么做呢?
好问题。通过前端和 Supabase 之间的交互,Supabase 实际上会创建一张名为 auth.users 的表格。该表格位于 Supabase 仪表盘的“身份验证”部分。
未来需要与auth.users进行引用的任何表格,只需执行
CREATE TABLE IF NOT EXISTS user_daily_usage( user_id UUID REFERENCES auth.users (id), email TEXT, date TEXT, daily_requests_count INT, PRIMARY KEY (user_id, date));
然后我们需要为一些后端API对用户进行身份验证,如果前端直接使用Supabase.auth,后端如何对用户请求进行身份验证以用于其他API调用?
要解释这一点,我们需要了解JWT(JavaScript Web Token)的工作原理。
您可以在https://jwt.io/上测试JWT的编码和解码。用户在与auth服务器进行注册/登录后,将获得一个JWT,因此如果用户在令牌过期之前的短时间内再次加载网站,他/她就不需要再次输入密码。
要为用户生成JWT,您需要使用“sub”作为用户ID(从auth.users自动分配给用户的UUID)和用于注册的电子邮件。
因此,要解码JWT,auth服务器或后端都需要使用256位的密钥。如果使用Supabase auth,它在管理员仪表板中称为“Anon key”。它将是后端用于解码JWT的相同密钥。
FastAPI后端的auth模块可能如下所示:
import osfrom typing import Optionalfrom auth.jwt_token_handler import decode_access_token, verify_tokenfrom fastapi import Depends, HTTPException, Requestfrom fastapi.security import HTTPAuthorizationCredentials, HTTPBearerfrom models import UserIdentityclass AuthBearer(HTTPBearer): def __init__(self, auto_error: bool = True): super().__init__(auto_error=auto_error) async def __call__( self, request: Request, ): credentials: Optional[HTTPAuthorizationCredentials] = await super().__call__( request ) self.check_scheme(credentials) token = credentials.credentials # pyright: ignore reportPrivateUsage=none return await self.authenticate( token, ) def check_scheme(self, credentials): if credentials and credentials.scheme != "Bearer": raise HTTPException(status_code=401, detail="Token must be Bearer") elif not credentials: raise HTTPException( status_code=403, detail="Authentication credentials missing" ) async def authenticate( self, token: str, ) -> UserIdentity: if os.environ.get("AUTHENTICATE") == "false": return self.get_test_user() elif verify_token(token): return decode_access_token(token) else: raise HTTPException(status_code=401, detail="Invalid token or api key.") def get_test_user(self) -> UserIdentity: return UserIdentity( email="test@example.com", id="696dda89-d395-4601-af3d-e1c66de3df1a" # type: ignore ) # replace with test user informationdef get_current_user(user: UserIdentity = Depends(AuthBearer())) -> UserIdentity: return user
import osfrom datetime import datetime, timedeltafrom typing import Optionalfrom jose import jwtfrom jose.exceptions import JWTErrorfrom models import UserIdentitySECRET_KEY = os.environ.get("JWT_SECRET_KEY")ALGORITHM = "HS256"if not SECRET_KEY: raise ValueError("JWT_SECRET_KEY environment variable not set")def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtdef decode_access_token(token: str) -> UserIdentity: try: payload = jwt.decode( token, SECRET_KEY, algorithms=[ALGORITHM], options={"verify_aud": False} ) except JWTError: return None # pyright: ignore reportPrivateUsage=none return UserIdentity( email=payload.get("email"), id=payload.get("sub"), # pyright: ignore reportPrivateUsage=none )def verify_token(token: str): payload = decode_access_token(token) return payload is not None
使用 Supabase 进行文件上传
您将能够直接调用 Supabase 客户端库来上传文件。可以编写一个实用函数如下:
import jsonfrom multiprocessing import get_loggerfrom langchain.pydantic_v1 import Fieldfrom langchain.schema import Documentfrom supabase.client import Client, create_clientimport osfrom dotenv import load_dotenvload_dotenv()logger = get_logger()def get_supabase_client() -> Client: supabase_client: Client = create_client( os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY") ) return supabase_clientdef upload_file_storage(file, file_identifier: str): supabase_client: Client = get_supabase_client() # res = supabase_client.storage.create_bucket("quivr") response = None try: response = supabase_client.storage.from_(os.getenv("SUPABASE_BUCKET")).upload(file_identifier, file) return response except Exception as e: logger.error(e) raise e
然后是 FastAPI 的路由
import osfrom typing import Optionalfrom uuid import UUIDfrom auth import AuthBearer, get_current_userfrom fastapi import APIRouter, Depends, HTTPException, Query, Request, UploadFilefrom logger import get_loggerfrom models import UserIdentity, UserUsagefrom repository.files.upload_file import upload_file_storagefrom repository.user_identity import get_user_identitylogger = get_logger(__name__)upload_router = APIRouter()@upload_router.get("/upload/healthz", tags=["Health"])async def healthz(): return {"status": "ok"}@upload_router.post("/upload", dependencies=[Depends(AuthBearer())], tags=["Upload"])async def upload_file( request: Request, uploadFile: UploadFile, chat_id: Optional[UUID] = Query(None, description="聊天的 ID"), current_user: UserIdentity = Depends(get_current_user),): file_content = await uploadFile.read() filename_with_user_id = str(current_user.id) + "/" + str(uploadFile.filename) try: fileInStorage = upload_file_storage(file_content, filename_with_user_id) logger.info(f"文件 {fileInStorage} 上传成功") except Exception as e: if "The resource already exists" in str(e): raise HTTPException( status_code=403, detail=f"文件 {uploadFile.filename} 已存在于存储中。", ) else: raise HTTPException( status_code=500, detail="文件上传失败。" ) return {"message": "文件处理已开始。"}
待续…(第二部分)
如果您不熟悉 FastAPI,这可能有点麻烦。但在第三部分结束时,我将分享整个 GitHub 仓库,这将更加清晰。请继续关注。