如何稳定调用ChatGPT API以构建强大的应用程序
LLM现在无处不在,尤其是ChatGPT。许多应用程序都建立在其之上,如果你还没有尝试过,那么你应该试一试。

在ChatGPT之上构建应用程序通常需要进行多次并行调用。不幸的是,你并不是唯一一个这样做的人。由于有很多应用程序每天执行数百万次请求(顺便为他们的工程团队喝彩),API经常会返回“请求太多”的错误。因此,我们需要一种良好的方式在进行多次并行调用时处理此类错误。
在这个小的Python教程中,我们将涵盖以下两个重要主题,以有效地调用ChatGPT API:
- 并行执行多个调用
- 在调用失败时重试
1. 并行执行多个调用
执行调用的最简单方式是同步执行,也就是发送请求并等待响应到达以继续程序。我们可以简单地按以下方式执行:
import requestsheaders = { "Content-Type": "application/json", "Authorization": f"Bearer {OPENAI_API_KEY}"}response_json = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json={ "model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": "ping"}], "temperature": 0}).json()print(response_json["choices"][0]["message"]["content"])
Pong!
如果我们在一个简单的系统中工作,这种方式是可以的。然而,如果我们希望并行调用API或其他资源,比如数据库,我们可以使用异步方式以获得更快的响应。
异步执行任务将触发每个动作并等待它们并行完成,从而减少等待时间。
一个基本的做法是创建不同的线程来处理每个请求,然而,使用异步调用的方法会更好。
异步调用通常更高效,因为你可以指定应用程序应该等待的确切位置,而在传统的线程中,系统将自动将线程置于等待状态,这可能是次优的。
下面我们展示了使用同步调用和异步调用的区别的示例。
# 同步调用import timedef delay_print(msg): print(msg, end=" ") time.sleep(1)def sync_print(): for i in range(10): delay_print(i)start_time = time.time()sync_print()print("\n", time.time() - start_time, "seconds.")
0 1 2 3 4 5 6 7 8 9 10.019574642181396 seconds.
# 异步调用import asyncioasync def delay_print_async(msg): print(msg, end=" ") await asyncio.sleep(1)async def async_print(): asyncio.gather(*[delay_print_async(i) for i in range(10)])start_time = time.time()await async_print()print("\n", time.time() - start_time, "seconds.")
0.0002448558807373047 seconds.0 1 2 3 4 5 6 7 8 9
asyncio.gather方法将触发传递给它的所有异步调用,并在它们准备就绪后返回它们的结果。
不幸的是,使用requests库无法执行异步调用。为了做到这一点,你可以使用aiohttp库。下面是如何使用aiohttp执行异步调用的示例。
import aiohttpasync def get_completion(content): async with aiohttp.ClientSession() as session: async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={ "model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": content}], "temperature": 0 }) as resp: response_json = await resp.json() return response_json["choices"][0]['message']["content"]await get_completion("Ping")
Pong!
如前所述,要执行异步请求,我们需要使用asyncio.gather方法。
async def get_completion_list(content_list): return await asyncio.gather(*[get_completion(content) for content in content_list])await get_completion_list(["ping", "pong"]*5)
['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']
虽然这个方法可以工作,但是以这种方式进行调用并不理想,因为我们为每次调用都重新创建了会话对象。我们可以通过重用同一个会话对象来节省资源和时间,如下所示:
async def get_completion(content, session): async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={ "model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": content}], "temperature": 0 }) as resp: response_json = await resp.json() return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list): async with aiohttp.ClientSession() as session: return await asyncio.gather(*[get_completion(content, session) for content in content_list])await get_completion_list(["ping", "pong"]*5)
简单吧?有了这个,你可以轻松地执行多个调用。然而,一个问题是以这种方式进行无限调用通常不是一个好的做法,因为你可能会过载系统,并被处罚,导致你在一段时间内无法执行其他请求(相信我,你会的)。因此,限制同时进行的调用数量是个好主意。你可以使用asyncio.Semaphore类来轻松实现这一点。
Semaphore类创建了一个上下文管理器,用于管理当前正在进行的异步调用的数量。如果达到最大数量,它将阻塞,直到一些调用完成。
async def get_completion(content, session, semaphore): async with semaphore: await asyncio.sleep(1) async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={ "model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": content}], "temperature": 0 }) as resp: response_json = await resp.json() return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls): semaphore = asyncio.Semaphore(value=max_parallel_calls) async with aiohttp.ClientSession() as session: return await asyncio.gather(*[get_completion(content, session, semaphore) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*5, 100)print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")print(completion_list)
Time elapsed: 1.8094507199984946 seconds.['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']
这里还有一个可选的内容,就是报告调用的进度。你可以通过创建一个小的类来保存进度,并在所有调用之间共享。可以按照以下方式进行:
class ProgressLog: def __init__(self, total): self.total = total self.done = 0 def increment(self): self.done = self.done + 1 def __repr__(self): return f"Done runs {self.done}/{self.total}."async def get_completion(content, session, semaphore, progress_log): async with semaphore: await asyncio.sleep(1) async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={ "model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": content}], "temperature": 0 }) as resp: response_json = await resp.json() progress_log.increment() print(progress_log) return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls): semaphore = asyncio.Semaphore(value=max_parallel_calls) progress_log = ProgressLog(len(content_list)) async with aiohttp.ClientSession() as session: return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*5, 100)print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")print(completion_list)
Done runs 1/10.Done runs 2/10.Done runs 3/10.Done runs 4/10.Done runs 5/10.Done runs 6/10.Done runs 7/10.Done runs 8/10.Done runs 9/10.Done runs 10/10.Time elapsed: 1.755018908999773 seconds.['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']
完成了关于如何执行多个异步请求的部分。通过这种方式,您可以执行多个异步调用,限制每次调用的数量并报告进度。然而,仍然有一些问题需要处理。
所做的请求可能因多种原因而失败,例如服务器超载,连接中断,错误请求等。这些可能会生成异常或返回不可预测的响应,因此我们需要处理这些情况并自动重试失败的调用。
2. 在失败的情况下重试调用
为了处理失败的调用,我们将使用tenacity库。Tenacity提供了函数装饰器,如果函数调用生成异常,它将自动重试。
from tenacity import ( retry, stop_after_attempt, wait_random_exponential,)
为了为我们的调用提供重试功能,我们需要放置@retry装饰器。在不添加其他参数的情况下使用它将使函数在失败后立即和无限次数地重试。这对某些原因来说并不好。
其中之一是我们的函数调用可能由于服务器超载而失败,因此在再次尝试之前等待一段时间是合理的。为了指示等待的时间,我们将使用指数退避的方法,使用参数wait=wait_random_exponential(min=min_value, max=max_value)。这将增加函数失败时的等待时间。
一个可选的功能是在每次重试发生时记录消息。我们可以通过为参数before_sleep提供某个函数来实现。在这里,我们将使用print函数,然而,更好的方法是使用logging模块并将logging.error或logging.debug函数传递给此参数。
为了演示,我们将生成随机异常。
import randomclass ProgressLog: def __init__(self, total): self.total = total self.done = 0 def increment(self): self.done = self.done + 1 def __repr__(self): return f"Done runs {self.done}/{self.total}."@retry(wait=wait_random_exponential(min=1, max=60), before_sleep=print)async def get_completion(content, session, semaphore, progress_log): async with semaphore: #await asyncio.sleep(1) if random.random() < 0.2: raise Exception("随机异常") async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={ "model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": content}], "temperature": 0 }) as resp: response_json = await resp.json() progress_log.increment() print(progress_log) return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls): semaphore = asyncio.Semaphore(value=max_parallel_calls) progress_log = ProgressLog(len(content_list)) async with aiohttp.ClientSession() as session: return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*5, 100)print("经过的时间:", time.perf_counter() - start_time, "秒。")print(completion_list)
<RetryCallState 133364377433616: 尝试 #1; 睡眠了 0.74; 上次结果:失败 (异常随机异常)><RetryCallState 133364377424496: 尝试 #1; 睡眠了 0.79; 上次结果:失败 (异常随机异常)>已完成的运行 1/10.已完成的运行 2/10.已完成的运行 3/10.已完成的运行 4/10.已完成的运行 5/10.已完成的运行 6/10.已完成的运行 7/10.已完成的运行 8/10.已完成的运行 9/10.已完成的运行 10/10.经过的时间: 1.1305301820011664 秒。['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']
这将使我们的函数在重试之前等待一段时间。然而,失败的原因可能是系统性的,例如服务器停机或错误的有效负载。在这种情况下,我们希望我们的重试次数有限。我们可以使用参数stop=stop_after_attempt(n)来实现。
import random
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"已完成 {self.done}/{self.total} 次运行."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
#await asyncio.sleep(1)
if random.random() < 0.9:
raise Exception("随机异常")
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("已用时间: ", time.perf_counter() - start_time, "秒.")
print(completion_list)
<RetryCallState 133364608660048: 尝试次数 #1; 睡眠时间 0.1 秒; 最终结果: 失败 (异常 随机异常)>
<RetryCallState 133364377435680: 尝试次数 #1; 睡眠时间 0.71 秒; 最终结果: 失败 (异常 随机异常)>
<RetryCallState 133364377421472: 尝试次数 #1; 睡眠时间 0.17 秒; 最终结果: 失败 (异常 随机异常)>
<RetryCallState 133364377424256: 尝试次数 #1; 睡眠时间 0.37 秒; 最终结果: 失败 (异常 随机异常)>
<RetryCallState 133364377430928: 尝试次数 #1; 睡眠时间 0.87 秒; 最终结果: 失败 (异常 随机异常)>
<RetryCallState 133364377420752: 尝试次数 #1; 睡眠时间 0.42 秒; 最终结果: 失败 (异常 随机异常)>
<RetryCallState 133364377422576: 尝试次数 #1; 睡眠时间 0.47 秒; 最终结果: 失败 (异常 随机异常)>
<RetryCallState 133364377431312: 尝试次数 #1; 睡眠时间 0.11 秒; 最终结果: 失败 (异常 随机异常)>
<RetryCallState 133364377425840: 尝试次数 #1; 睡眠时间 0.69 秒; 最终结果: 失败 (异常 随机异常)>
<RetryCallState 133364377424592: 尝试次数 #1; 睡眠时间 0.89 秒; 最终结果: 失败 (异常 随机异常)>
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/_asyncio.py in __call__(self, fn, *args, **kwargs)
49 try:
---> 50 result = await fn(*args, **kwargs)
51 except BaseException: # noqa: B9025 frames
Exception: 随机异常
The above exception was the direct cause of the following exception:
RetryError Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/__init__.py in iter(self, retry_state)
324 if self.reraise:
325 raise retry_exc.reraise()
--> 326 raise retry_exc from fut.exception()
327
328 if self.wait:
RetryError: RetryError[<Future at 0x794b5057a590 state=finished raised Exception>]
通过设置这个参数,一旦尝试次数达到最大值,就会引发RetryError异常。然而,有时候我们希望继续运行而不生成异常,只需将调用返回的值设置为None,以便稍后处理。为了做到这一点,我们可以使用回调函数retry_error_callback,在出现RetryError错误时返回None值:
import random
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"已完成 {self.done}/{self.total} 次运行。"
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
#await asyncio.sleep(1)
if random.random() < 0.7:
raise Exception("随机异常")
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(1)) as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("经过时间: ", time.perf_counter() - start_time, "秒。")
print(completion_list)
已完成 1/10 次运行。
已完成 2/10 次运行。
已完成 3/10 次运行。
经过时间: 2.6409040250000544 秒。
['Pong!', 'Ping!', None, None, None, None, None, 'Ping!', None, None]
使用这个方法,将返回None值而不是生成错误。
目前尚未处理的一个问题是卡住的连接问题。当我们发送请求时,由于某种原因,主机保持连接但既不失败也不返回任何结果。为了处理这种情况,我们需要设置一个超时时间,在给定的时间内如果调用没有返回值,就会返回。我们可以使用aiohttp库的timeout参数和aiohttp.ClientTimeout类来实现。如果出现超时,将引发TimeoutError,然后由tenacity的retry装饰器处理,并自动重新运行函数。
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"已完成 {self.done}/{self.total} 次运行。"
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*100, 100)
print("经过时间: ", time.perf_counter() - start_time, "秒。")
太棒了!现在我们有了一种强大的方式来运行多个并行请求,如果发生故障,它将自动重试,并在故障是系统性的情况下返回None值。因此,最终的代码将如下所示:
import asyncio
import aiohttp
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_API_KEY}"
}
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"完成的运行 {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": content}],
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json["choices"][0]['message']["content"]
async def get_completion_list(content_list, max_parallel_calls, timeout):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(timeout)) as session:
return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])
总之,我们实现了以下功能:
- 异步调用以减少等待时间。
- 记录异步调用的进度。
- 调用失败时自动触发重试。
- 如果失败是系统性的,则返回None值。
- 当调用超时且没有返回任何内容时,重新尝试调用。
如果您有任何问题,发现错误或有任何改进的想法,请在下方留言!