OpenAI ChatGPT API + FaskAPI SSE Stream 流式周转技术 以及前端Fetch 流式请求获取案例

这篇文章当时写得比较匆忙,这里进行一下更深入的补充

SSE 技术不是什么新鲜东西,就是一个 HTTP 请求和响应,关键就是响应这个环节,原始的响应都是一次性的,普通的响应是这样的:

  1. Nginx 是一个静态服务器,所谓静态服务器,就是将一个静态文件按照大小不同情况选择不同的方式传输给浏览器,小的文件,一次性发过去,大的文件则会采用数据块的方式传给浏览器。
  2. 再次强调下没有什么 http 长链接和短链接的概念,对该问题认识不清楚的可以再看我这篇足够详细地文章Http/Websocket协议的长连接和短连接的错误认识详细解读_森叶的博客-CSDN博客
  3. 建立链接之后,你所感知的 HTTP 请求完毕就关掉了,这完全是浏览器或者服务端的一种处理方式,在内存极其稀缺的互联网早期,及时地销毁一个 Socket 对象都是非常必要的。
  4. 到达现在内存不值钱的时代,我们才敢大胆的将一个 Socket 对象长时间保持,也就是目前 Websocket 大行其道的时代,理论上谁不喜欢长时间双向通信的 Websocket呢,全双工是网络通信一开始就有的概念。
  5. HTTP 协议下的链接绝不是只互相通信了一次就结束了,是会有多次通信的,例如服务端向浏览器索要账号密码的过程,这就是二次沟通了,浏览器将账号密码发回服务端,服务端再继续为浏览器进行服务。
  6. 理论上如果没有任何关闭机制存在,只要返回的内容没有结束符\r\n\r\n存在,浏览器都不会主动关闭这次请求事件。所以这样来看,HTTP 协议下的连接具有了长链接的基础了。
  7. Nginx 在收到后端服务器发过来的响应报文时,只要没有收到结束符,就不会主动关闭这次链接,会一直将后端服务器发送的数据推给浏览器,浏览器端就能连续地接到数据过去,这就是网页被称为流的概念。

Nginx 与后端服务器之间的通信

  1. Nginx 通过 socket 连接到后端服务器,会有一个 send 函数,将前端请求信息发送给后端服务器,同时用 `onmessage` 函数接收后端服务器的返回报文,平时一旦
  2. 后端服务器收到 nginx 发送过来的请求,则会开一个线程/协程来处理请求,像 Django 这类同步框架,则是由并发服务器uWSGI拉起 Django 执行逻辑,获取数据,然后将结果交付给 Nginx,Nginx 收到后将结果发送回浏览器。
  3. 针对异步框架,nginx 的处理机制是相同的,例如 FastAPI,区别在于并发服务器使用的是 uvicorn,uvicorn 接入 FastAPI,在线程中调用 fastapi 的入口文件,兵器使用 await 进行等待,数据处理完成则返回给 Nginx,所以同步/异步框对 nginx 来说是一样的东西。
  4. 在处理 websocket协议时,Nginx 不会立即关闭和后端服务器的连接,后端服务器的独立线程也会一直开着,所以开多了,性能上就吃不消,例如同时在线 1000个人,那么就有 3000 个协程在开着,一个是来自 nginx 和浏览器之间的,一个是 nginx 和后端服务器的,另外还有后端服务器启动的1000 个常驻协程用来轮询消息队列,发送给 nginx,所以,用 FastAPI来做 websocket 很耗费性能
  5. 针对 Hyperf 来说要好些,一次处理完后,并不需要建立 while 进行循环等消息,而Hyperf 本身等价于 uvicorn,等有队列消息出现时,是由某一进程 trigger 的,同时从 Hyperf 层面拿 socket给 nginx 发消息,自始至终只有一个while 来轮 redis,统一分发给各个消费者进行消费,消费者又能直接获取 Hyperf从而拿到 socket 给 nginx 发送消息,因此性能上得到极大的提升。

评论中有人问是否直接用 StreamResponse 就行了,如果你自己打算封装一个StreamResponse,并且可以反复发送多次,差不多就实现了与EventSourceResponse同样的效果吧!

nginx如果要支持SSE,要调整一些参数,比较重要的就是要避免缓存,之前 nginx 处理的都是一次性的响应,例如网页,所以会尽量把内容缓存起来一次性发送到客户端,所以这里要关掉缓存

conf 配置文件,这里做了修正

http {
    ...

    server {
        ...

        location /sse {
            proxy_http_version 1.1;
            proxy_set_header Connection "";
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header Host $host;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        	# SSE 连接时的超时时间
        	proxy_read_timeout 86400s;
        	
			# 取消缓冲
            proxy_buffering off;

			# 关闭代理缓存
            proxy_cache off;
        	
        	# 禁用分块传输编码
        	#chunked_transfer_encoding off
        	
        	# 反向代理到 SSE 应用的地址和端口
            proxy_pass http://backend-server;
        }

        ...
    }

    ...
}
  1. proxy_http_version 1.1 才支持 keep-alive,保持连接不中断

  2. location /sse块内,设置代理头部以确保不会对SSE响应进行缓存。Cache-ControlContent-Type头部用于告诉浏览器这是一个SSE连接。

  3. 使用proxy_buffering off;来禁用Nginx的缓冲,以确保SSE响应立即被传递给客户端。

  4. 使用proxy_pass指令将SSE请求代理到后端服务器的SSE端点。请将http://backend_server;替换为你的后端服务器地址和SSE端点路径。

  5. 设置连接超时时间,以避免不活动连接被Nginx关闭。在这里,我设置了一个较长的超时时间(3600秒),以便连接可以保持较长时间。你可以根据你的需求进行调整。

SSE 和 Websocket 的区别?

Server-Sent Events(SSE)和WebSocket是两种用于实现实时通信的不同技术,它们在某些方面有相似之处,但也存在一些关键的区别:

Server-Sent Events (SSE):

  1. 单向通信:SSE是一种单向通信机制,其中服务器向客户端发送数据。客户端接收服务器推送的数据,但不能向服务器发送数据。这使得它适合用于服务器向客户端的实时通知或事件推送。

  2. 基于HTTP:SSE建立在标准的HTTP/HTTPS协议之上,使用普通的HTTP请求和响应来实现。它不需要特殊的协议升级。

  3. 文本数据:SSE主要用于发送文本数据。服务器可以将文本事件推送到客户端,客户端通过监听事件来接收数据。每个事件通常包含一个标识符、数据字段和可选的注释字段。

  4. 浏览器支持:SSE在现代浏览器中得到广泛支持,不需要额外的javascript库或框架。客户端使用EventSourceAPI来与SSE服务端通信。

WebSocket:

  1. 双向通信:WebSocket是一种双向通信协议,允许客户端和服务器之间进行双向数据交换。客户端可以向服务器发送数据,服务器也可以主动向客户端推送数据。

  2. 独立协议:WebSocket是一种独立的协议,与HTTP不同。建立WebSocket连接需要进行协议升级,然后在一个持久连接上进行数据交换。

  3. 二进制和文本数据:WebSocket支持二进制和文本数据的传输,因此可以用于多种类型的数据交换,包括游戏、实时聊天和多媒体流等。

  4. 浏览器支持:WebSocket在现代浏览器中得到广泛支持,同时也有许多服务器端和客户端库可用于各种编程语言和环境中。

服务端推送事件消息

服务端判断前端要建立 SSE 连接的识别标识是 A***ept: text/event-stream,该标识代表前端会接收后端发过来的事件流

GET /sse-endpoint HTTP/1.1
Host: example.***
A***ept: text/event-stream

连接后因为消息至少区分两种,message 和 close,所以就把推送消息做成了一种结构:

event: message
data: Hello, this is a message!

 如果是 close 事件

event: close
data:

前端接收关闭事件

const eventSource = new EventSource('/sse-endpoint');

eventSource.onmessage = function(event) {
  // 处理事件消息
};

eventSource.onerror = function(event) {
  if (event.readyState === EventSource.CLOSED) {
    // 服务器关闭了连接
    console.log('SSE连接已关闭');
    // 可以尝试重新建立连接
  }
};

正式开始

OpenAI 官方给我了一个超简单的文档,还直接用curl的方式搞得,真是能多省就多省,大家可以使用apifox 或者 postman 将curl 转成 fetch 或者 request 等自己能看懂的代码,当然也可以自己自学一下curl的命令,如果你能访问OpenAI,可以点下面的链接,自己看看

https://platform.openai.***/docs/api-reference/chat/createhttps://platform.openai.***/docs/api-reference/chat/create

 大家如果对上面的双语翻译感兴趣,我推荐一个技术大佬的免费插件,沉浸式翻译

https://chrome.google.***/webstore/detail/immersive-translate/bpoadfkcbjbfhfodiog***hhhpibjhbnhhttps://chrome.google.***/webstore/detail/immersive-translate/bpoadfkcbjbfhfodiog***hhhpibjhbnh

 其中有个 stream 使用讲解,stream这个东西,我之前也没用过,经过学习后,发现这东西一直都存在就是一个content-type格式,只是我们原来没有注意过,我们都是用urlencode或者json格式来处理数据的,其实可以以二进制的方式,发过来,然后你再自行处理。

我发现了一个大佬,开源了一个插件,从中窥见了SSE的使用案例,大家有兴趣,可以看另外一篇SSE的学习案例,这里不对前端再做深入的讨论了

chatgpt API SSE(服务器推送技术)和 Fetch 请求 A***ept: text/event-stream 标头案例_森叶的博客-CSDN博客在需要接收服务器实时推送的数据时,我们可以使用 `fetch()` 方法和 `EventSource` API 进行处理。使用 `fetch()` 方法并在请求头中添加 `A***ept: text/event-stream` 可以告诉服务器我们想要接收 Server-Sent Events (SSE) 格式的数据流。`fetch()` 对流处理有良好的支持,我们可以使用 `body` 属性来读取 SSE 消息,同时也可以利用 `fetch()` 的其他功能如超时控制、请求重试等。缺点是需要手动解析数据、https://blog.csdn.***/wangsenling/article/details/130490769python 端官方提供了openai 库,这个也是开源的,大家可以找到看看

GitHub - openai/openai-python: The OpenAI Python library provides convenient a***ess to the OpenAI API from applications written in the Python language.The OpenAI Python library provides convenient a***ess to the OpenAI API from applications written in the Python language. - GitHub - openai/openai-python: The OpenAI Python library provides convenient a***ess to the OpenAI API from applications written in the Python language.https://github.***/openai/openai-python/我没怎么看,但看起来没有给stream案例,只是给了request的案例,如果只是request的那其实就挺简单了,就没啥讲的了

SSE 的用途

  1.  通过 SSE,建立链接后,可以将推送消息放进 redis hash 中,每个连接对象定时扫描自己的 hash 看是否有信息需要推送给前端;
  2. 利用异步请求 aiohttp 与 chatgpt 建立连接,通过while+生成器+Future的方式实现异步数据的处理。

不用官网的openai库,根据开发文档,直接发送request请求也可以,这里给的是一位大佬的请求方式,用的是httpx,大家可以自行学习下,知乎有一篇比较文

浅度测评:requests、aiohttp、httpx 我应该用哪一个? - 知乎在 Python 众多的 HTTP 客户端中,最有名的莫过于 requests、aiohttp和httpx。在不借助其他第三方库的情况下,requests只能发送同步请求;aiohttp只能发送异步请求;httpx既能发送同步请求,又能发送异步请求。所…https://zhuanlan.zhihu.***/p/103711201

核心参数截图

 请求主体代码截图

 AI Claude 给的讲解

 

 这是一个生成器函数,通过yield函数,yield 很多地方都讲得很晦涩难懂,《你不知道的javascript》中非常简洁地说,这就是一个return,只是对于生成器来说,return次数要进行多次,所以搞了一个yield用来区分同步函数的return,而且return的意义还有停止下面的代码,而返回数据的意思,两者还是有点差异,但是yield就是return,多次返回的return

核心库EventSourceResponse

from sse_starlette import EventSourceResponse

 

AI给出的EventSourceResponse解读,自己也可以把EventSourceResponse源码丢给Claude,让其看过,给你解读,都是好方法

下面给下EventSourceResponse的FastAPI简单案例代码,让大家玩起来

import uvicorn
import asyncio
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse

times = 0
app = FastAPI()

origins = [
    "*"
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.post("/sse/data")
async def root(request: Request):
    event_generator = status_event_generator(request)
    return EventSourceResponse(event_generator)


status_stream_delay = 1  # second
status_stream_retry_timeout = 30000  # milisecond


# 其实就是绑定函数事件 一直在跑循环
async def status_event_generator(request):
    global times
    while True:
        if not await request.is_disconnected() == True:
            yield {
                "event": "message",
                "retry": status_stream_retry_timeout,
                "data": "data:" + "times" + str(times) + "\n\n"
            }
        print("alive")
        times += 1
        await asyncio.sleep(status_stream_delay)


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level='info')

大家对照着上面的讲解,就能把代码搞出来

fetch('http://localhost:8000/sse/data', {
    method: 'POST',
    headers: {
        'Content-Type': 'application/json',
    },
    body: JSON.stringify({
        text: "hello"
    })
}).then(async response=>{
    const reader = response.body.pipeThrough(new TextDecoderStream()).getReader()
    while (true) {
        let {value, done} = await reader.read();
        if (done)
            break;
        if (value.indexOf("data:") < 0 || value.indexOf('event: ping') >= 0)
            continue;
        // console.log('Received~~:', value);
        let values = value.split("\r\n")
        for (let i = 0; i < values.length; i++) {
            let _v = values[i].replace("data:", "")
            // console.log(_v)
            if (_v.trim() === '')
                continue
            console.log(_v)
        }
    }
}
).catch(error=>{
    console.error(error);
}
);

因为自己的业务代码有很多鉴权和数据库操作,就不便放出来了,大家根据自己的所需,可以在这个简单的代码基础上,只要自己写生成器函数即可

转载请说明出处内容投诉
CSS教程_站长资源网 » OpenAI ChatGPT API + FaskAPI SSE Stream 流式周转技术 以及前端Fetch 流式请求获取案例

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买