- Published on
1.1.FastAPI 快速上手
- Authors

- Name
- xiaobai
1.概念介绍
1.1.WSGI 简介
- WSGI,全名 Web Server Gateway Interface(服务器网关接口);
- 2003 年提出,用于描述同步的 Python Web 应用(如 Django、Flask)。
- 同步阻塞模型:一个请求必须完全处理完成后才能处理下一个请求。
1.2.ASGI 简介
- ASGI,全名 Asynchronous Server Gateway Interface(异步服务器网管接口);
- 是WSGI的扩展版本,旨在为Python Web服务、框架和应用之间提供一个标准的异步接口。
- 其本身可以提供同步和异步应用,并且可以并行处理。还能处理多种通用协议,包括HTTP,HTTP2和WebSocket。
- 同WSGI一样,需要有独立的服务器实现这种异步的网关接口,比如Daphne、Uvicorn、Hypercorn等;
1.3.Starlette简介
官方文档:https://www.starlette.io/
Starlette 是一个轻量级的 ASGI(Asynchronous Server Gateway Interface)框架,专为构建异步 Web 应用设计。它是 FastAPI 的底层框架,提供了路由、请求处理、中间件、WebSocket 支持等核心功能。Starlette 的设计目标是简单、高效,同时保持足够的灵活性,适用于各种 Web 开发场景。拥有以下特性:
- 异步支持:基于 Python 的 asyncio,支持异步 I/O 操作,适合高并发场景。
- 路由系统:提供直观的路径操作装饰器,用于定义 API 端点。
- 请求与响应处理:内置对 HTTP 请求和响应的支持,支持 JSON、表单数据等格式。
- 中间件:允许开发者在请求处理前后添加自定义逻辑。
- WebSocket 支持:支持实时双向通信,适用于聊天应用等场景。
FastAPI 直接继承了 Starlette 的所有功能,例如路由、请求处理和中间件。FastAPI 的 @app.get()、@app.post() 等装饰器实际上是对 Starlette 路由系统的封装。换句话说,FastAPI 在 Starlette 的基础上增加了类型检查、自动文档生成等高级特性。
1.4.FastAPI 简介
官方文档:https://fastapi.tiangolo.com/zh/;
FastAPI 是一个用于构建 API 的现代、快速(高性能)的 web 框架,使用 Python 并基于标准的 Python 类型提示。拥有以下特性:
- 快速:可与 NodeJS 和 Go 并肩的极高性能(归功于 Starlette 和 Pydantic)。最快的 Python web 框架之一。
- 高效编码:提高功能开发速度约 200% 至 300%。
- 更少 bug:减少约 40% 的人为(开发者)导致错误。
- 智能:极佳的编辑器支持。处处皆可自动补全,减少调试时间。
- 简单:设计的易于使用和学习,阅读文档的时间更短。
- 简短:使代码重复最小化。通过不同的参数声明实现丰富功能。bug 更少。
- 健壮:生产可用级别的代码。还有自动生成的交互式文档。
- 标准化:基于(并完全兼容)API 的相关开放标准:OpenAPI (以前被称为 Swagger) 和 JSON Schema。
- 用于创建 API 的 OpenAPI 包含了路径操作,请求参数,请求体,安全性等的声明。 使用 JSON Schema (因为 OpenAPI 本身就是基于 JSON Schema 的)自动生成数据模型文档。 经过了缜密的研究后围绕这些标准而设计。并非狗尾续貂。 这也允许了在很多语言中自动生成客户端代码。
1.5.Uvicorn 简介
- FastAPI 是一个现代化的高性能 Web 框架,它使用 Python 的异步编程特性来提高 Web 应用程序的性能。
- 而 Uvicorn 则是一个基于 uvloop 和 httptools 实现的高性能 ASGI 服务器,可以实现异步处理 HTTP 请求。
- FastAPI 使用 Uvicorn 作为其默认的 Web 服务器,是因为 Uvicorn 是一个非常快速、可靠且易于使用的 ASGI 服务器,可以在处理大量并发连接时保持稳定和高效。
- 此外,Uvicorn 还支持 WebSocket 和 HTTP/2 等新特性,符合 FastAPI 提倡的现代 Web 开发理念。因此,使用 Uvicorn 作为 FastAPI 的 Web 服务器是一个很好的选择。
1.6.Gnuicorn 简介
1.6.1.简介
- Gunicorn是一款高性能的PythonWSGIHTTP服务器,具有轻量级资源消耗和高并发处理能力。
- 这个名字源自 “Green Unicorn”(绿色独角兽)的缩写,所以也可以结合原意辅助记忆发音~
- Gunicorn 服务器作为wsgi app的容器,能够与各种Web框架兼容(flask,django等),得益于gevent等技术,使用Gunicorn能够在基本不改变wsgi app代码的前提下,大幅度提高wsgi app的性能。
1.6.2.Master-Worker 模型
Master 进程:
- 负责管理 Worker 进程(启动、监控、重启)。
- 不处理请求,仅控制 Worker 生命周期。
- 监听信号(如 HUP 重载配置、TERM 优雅关闭)。
**Worker 进程: **
- 实际处理 HTTP 请求的进程。
- 支持多种 Worker 类型(同步、异步、线程等)。
同步 Workser(默认):
- 每个 Worker 一次处理一个请求。
- 使用操作系统进程(fork)实现并发。
- 适合 CPU 密集型任务(如机器学习推理)。
异步 Worker(gevent/eventlet):
- 基于协程(非阻塞 I/O),适合高并发 I/O 密集型场景(如数据库查询)。
- 通过猴子补丁(monkey-patching)替换标准库的阻塞调用。
Uvicorn Worker(ASGI 支持):
- 运行 FastAPI、Starlette 等 ASGI 应用。
- 结合了 Gunicorn 的进程管理和 Uvicorn 的异步性能。
2.请求响应
如下所示,我们接收一个参数,处理这个参数,然后将计算结果返回:
@app.get("/double")
async def double(num: int):
return {"result": num * 2}
在这个例子中,我们使用装饰器“@app.get”监听get请求,请求路径为“/double”,处理这个请求的是异步函数“async def double”,接收一个查询参数num,返回结果是一个字典“{ result : str }”;
3.接收参数
3.1.路由参数
- 也叫做路径参数、动态路由,也就是说参数在请求路径中;
- 参数在请求路径中的好处是,在检索或者浏览请求日志的时候,很方便地就能够根据这个关键的路径参数找到目标请求信息,方便后续追踪调试;
- 关于路由参数请看官方文档:https://fastapi.tiangolo.com/zh/tutorial/path-params/
3.2.查询参数
- 声明的参数不是路径参数时,路径操作函数会把该参数自动解释为查询参数。
- 查询字符串是键值对的集合,这些键值对位于 URL 的 ? 之后,以 & 分隔。
- 关于查询参数请看官方文档:https://fastapi.tiangolo.com/zh/tutorial/query-params/
3.3.请求体参数
- 请求体是客户端发送给 API 的数据。响应体是 API 发送给客户端的数据。
- API 基本上肯定要发送响应体,但是客户端不一定发送请求体。
- 使用 Pydantic 模型声明请求体,能充分利用它的功能和优点。
3.4.参数规范
Restful API请求规范:
- 查询:(get)/module
- 新建:(post)/module
- 更新:(put)/module
- 删除:(dete)/module
RPC API请求规范:
- 查询:(post)/module/list
- 新建:(post)/module/insert
- 更新:(post)/module/update
- 删除:(post)/module/delete
4.
4.1.确认启动配置
先确认一下“server.py”中,使用uvicorn启动FastAPI的代码如下所示,没有设置worker参数,也就是默认使用一个进程运行Web服务:
uvicorn.run("app.server:app", host="0.0.0.0", port=port)
4.2.准备测试接口
接下来我们准备讲一下在异步接口中调用同步阻塞代码,导致整个服务被阻塞的问题;如下所示,我们准备一个同步阻塞的接口“sync_delay”,一个异步阻塞的接口“async_delay”,以及一个能够快速响应的接口“test”;
@app.get("/test")
async def test():
print(f"Process {os.getpid()} handling /test")
return {"message": "Hello World"}
@app.get("/sync_delay")
async def sync_delay(delay: int = 1):
"""同步延迟delay秒"""
print(f"Process {os.getpid()} handling /sync_delay")
time.sleep(delay)
return {"hello": "world"}
@app.get("/async_delay")
async def async_delay(delay: int = 1):
"""异步延迟delay秒"""
print(f"Process {os.getpid()} handling /async_delay")
await asyncio.sleep(delay)
return {"hello": "world"}
- sync_delay:同步阻塞接口,可以通过查询参数delay来设置同步阻塞多少秒之后响应;
- async_delay:异步阻塞接口,可以通过查询参数delay来设置异步阻塞多少秒之后响应;
准备完毕之后我们启动服务,此时我们调用“test”接口一般都是立即响应;
4.3.客户端测试代码
import Axios from "axios";
import {Button, Space} from "antd";
export default function () {
/*调用test接口,打印消耗的时间*/
async function request_test() {
const startTime = Date.now();
const resp = await Axios.get('http://127.0.0.1:7002/test');
console.log('request_test result:', resp.data);
console.log(`耗时:${((Date.now() - startTime) / 1000).toFixed(2)}s`);
}
/*调用doubao链*/
async function request_doubao_endpoint() {
const resp = await Axios.post(
'http://127.0.0.1:7002/doubao/invoke',
{
input: { messages: [{ role: 'user', content: '写一个关于海洋的小作文,300字' }], },
config: {},
kwargs: {},
}
);
console.log('request_doubao_endpoint result:', resp.data);
}
/*调用同步阻塞接口*/
async function syncDelay(delay = 3) {
const resp = await Axios.get(`http://127.0.0.1:7002/sync_delay?delay=${delay}`);
console.log('syncDelay result:', resp.data);
}
/*调用异步阻塞接口*/
async function asyncDelay() {
const resp = await Axios.get('http://127.0.0.1:7002/async_delay?delay=3');
console.log('asyncDelay result:', resp.data);
}
/*同时调用“同步阻塞接口”以及“test”接口*/
async function syncDelayAndTest() {
syncDelay();
request_test();
}
/*同时调用“异步阻塞接口”以及“test”接口*/
async function asyncDelayAndTest() {
asyncDelay();
request_test();
}
/*同时调用“豆包”接口以及test接口*/
async function doubaoAndTest() {
request_doubao_endpoint();
request_test();
}
return (
<div style={{ padding: '1em' }}>
<Space>
<Button onClick={request_test}>调用test接口</Button>
<Button onClick={syncDelayAndTest}>请求接口:同步阻塞、test</Button>
<Button onClick={asyncDelayAndTest}>请求接口:异步阻塞、test</Button>
<Button onClick={doubaoAndTest}>请求接口:doubao、test</Button>
</Space>
</div>
);
}
客户端渲染内容: 
说明:
- 这里我们一共定义四个请求函数:
- request_test:请求test接口,并且将请求的耗时打印出来;如果单独调用这个接口,那么响应速度很快。如果调用这个接口前调用了同步阻塞接口,那么这个接口的调用会被同步阻塞接口阻塞,等同步阻塞接口调用完毕之后才会响应;
- syncDelay:请求同步阻塞接口,默认阻塞3s;
- asyncDelay:请求异步阻塞接口,默认阻塞3s;
- request_doubao_endpoint:请求“doubao/invoke”接口,说明其是同步还是异步;
- 当立即调用同步接口以及test接口,也就是点击按钮“请求接口:同步阻塞、test”时,这个test接口需要等待3s才会响应;
- 当立即调用异步接口以及test接口,也就是点击按钮“请求接口:异步阻塞、test”时,这个test接口会立即响应;
- 当调用豆包接口以及test接口,也就是点击按钮“请求接口:doubao、test”时,这个test接口会立即响应,因为doubao这个端点下的“invoke、stream、batch”都是异步接口;
关于第4点,可以查看“add_route”的源码,我们找到“Anaconda/envs/langserve/Lib/site-packages/langserve/server.py:497”的这行代码,如下所示:

我们点进去这个“api_hander.invoke”:

可以看到,“invoke”端点最终调用的是“ainvoke”方法,也就是走的异步的逻辑;另外两个“batch”以及“stream”同样的道理;
4.4.用多进程来缓解同步阻塞
这里我们设置uvicorn的启动参数,worker为2,如下所示,意思是启动两个进程来处理请求,uvicorn有一套自己的规则来实现进程之间的请求负载均衡:
uvicorn.run("app.server:app", host="0.0.0.0", port=port, workers=2)
此时我们再执行“请求接口:同步阻塞、test”时,就会发现此时“test”接口已经能够立即响应了;但是如果我们更改一下这个函数的代码,如下所示:
/*同时调用“同步阻塞接口”以及“test”接口*/
async function syncDelayAndTest() {
syncDelay(2);
syncDelay(3);
request_test();
}
我们先发起两个同步阻塞请求,再发送“test”请求,你会发现,此时test又被阻塞了。因为现在worker进程只有两个,分别被两个同步阻塞进程被占用了,于是test请求只能排队等待;
4.5.负载均衡失效问题
- 上面的代码,如果你设置两个“syncDelay”的参数一样,比如都是2或者都是3,那么你会发现,此时“test”接口又能够立即响应了;
- 原因在于Uvicorn 默认使用 操作系统级别的负载均衡(通过 SO_REUSEPORT 套接字选项),由操作系统决定将请求分配给哪个工作进程。但以下情况可能导致请求集中在同一个进程:
- 浏览器或 HTTP 客户端(如 Axios)默认会复用 TCP 连接(HTTP Keep-Alive)。
- 如果两个 syncDelay() 调用是快速连续发起的(例如在同一个事件循环中几乎同时触发),它们可能会复用同一个 TCP 连接,导致操作系统将请求路由到同一个工作进程。
下面我们验证这个同样参数的请求被路由到同一个进程的问题,客户端代码中,当两个syncDelay的参数一样时,比如都是3,FastAPI打印的日志如下所示:
Process 19884 handling /sync_delay
Process 18740 handling /test
INFO: 127.0.0.1:9869 - "GET /test HTTP/1.1" 200 OK
INFO: 127.0.0.1:9868 - "GET /sync_delay?delay=3 HTTP/1.1" 200 OK
Process 19884 handling /sync_delay
INFO: 127.0.0.1:9868 - "GET /sync_delay?delay=3 HTTP/1.1" 200 OK
可以看到,两次处理“/sync_delay”都是由同一个进程来处理的,这导致进程之间的负载均衡失效了;当两个syncDelay的参数不同时,FastAPI打印的日志如下所示:
Process 19884 handling /sync_delay
Process 18740 handling /sync_delay
INFO: 127.0.0.1:10066 - "GET /sync_delay?delay=2 HTTP/1.1" 200 OK
INFO: 127.0.0.1:10067 - "GET /sync_delay?delay=3 HTTP/1.1" 200 OK
Process 18740 handling /test
INFO: 127.0.0.1:10068 - "GET /test HTTP/1.1" 200 OK
可以看到,这时候两次“/sync_delay”被分给了不同的进程来处理了;这个是uvicorn本身的负载均衡策略,如果有需求更进一步优化,应该考虑使用nginx的负载均衡来实现,简单来说一般就是使用docker在不同的服务器主机上启动多个web服务,然后使用nginx负载均衡分发到这些服务,示例配置如下所示:
http {
upstream backend {
server backend1.example.com;
server backend2.example.com;
server backend3.example.com;
}
server {
listen 80;
location / {
proxy_pass http://backend;
}
}
}
5.自定义实现异步端点
5.1.自定义流式接口
@app.get("/my_stream")
async def async_delay(start: int, end: int):
async def generate_numbers():
current = start
while current <= end:
yield json.dumps({"number": current}) + '\n'
await asyncio.sleep(0.2)
current += 1
return StreamingResponse(generate_numbers(), media_type="application/x-ndjson")
5.2.自定义异步端点
如下所示,我们可以自定义实现一个将链部署为接口服务的工具函数“add_async_route”,调用这个工具函数可以为Runnable对象自动添加端点“ainvoke”、“abatch”、“astream”,并且自动生成文档:
import json
import time
from typing import List
from fastapi.responses import StreamingResponse
from fastapi import FastAPI, APIRouter
from langchain_core.messages import AIMessage
from langchain_core.runnables import Runnable
def format_ai_message(ai_message: AIMessage):
return {
"choices": [{
"finish_reason": "stop",
"index": 0,
"message": {
"content": ai_message.content,
"role": "assistant"
}
}],
"created": int(time.time()),
"id": ai_message.id,
"usage": ai_message.response_metadata.get('token_usage')
}
def add_async_routes(app: FastAPI, path: str, runnable: Runnable):
_router = APIRouter(prefix=path, tags=[path])
@_router.post("ainvoke")
async def ainvoke(input: runnable.input_schema):
ai_message: AIMessage = await runnable.ainvoke(input.model_dump())
return format_ai_message(ai_message)
@_router.post("astream")
async def astream(input: runnable.input_schema): # 这里 input_schema 需确保已正确定义
async def generator_function():
# 初始化一个结构,用于构建最终返回的包含 choices 等的数据
result_template = {
"choices": [{"delta": {}, "index": 0}],
"created": time.time(),
"id": "",
"usage": None
}
async for chunk in runnable.astream(input.model_dump()):
# 更新 delta 里的内容
result_template["choices"][0]["delta"]["content"] = chunk.content
result_template["choices"][0]["delta"]["role"] = 'assistant'
if chunk.response_metadata.get('finish_reason') is not None:
result_template["choices"][0]["delta"]['finish_reason'] = chunk.response_metadata.get('finish_reason')
result_template['id'] = chunk.id
result_template['created'] = int(time.time())
yield f"data: {json.dumps(result_template, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generator_function(), media_type="text/event-stream")
@_router.post("abatch")
async def abatch(inputs: list[runnable.input_schema]):
ai_message_list: List[AIMessage] = await runnable.abatch([input.model_dump() for input in inputs])
return [format_ai_message(ai_message) for ai_message in ai_message_list]
app.include_router(_router)
使用示例如下所示:
add_async_routes(
app=app,
runnable=RunnableLambda(lambda x: x['messages']) | create_llm().with_types(input_type=ModelInputSchema),
path="/async_doubao")
add_async_routes(
app=app,
runnable=RunnableLambda(lambda x: x['messages']) | create_llm("huoshan-deepseek-r1").with_types(input_type=ModelInputSchema),
path="/async_doubao_deepseek_r1")
