一、SSE概念
SSE(Server Sent Event),直译为服务器发送事件,顾名思义,也就是客户端可以获取到服务器发送的事件。我们常见的 http 交互方式是客户端发起请求,服务端响应,然后一次请求完毕;但是在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式
二、SSE应用场景
在web端消息推送功能中,由于传统的HTTP协议是由客户端主动发起请求,服务端才会响应。基本的ajax轮询技术便是如此。而在SSE中,浏览发送一个请求给服务端,通过响应头中的Content-Type:text/event-stream等向客户端声名这是一个长连接,发送的是流数据,这样客户端就不会关闭连接,一直等待服务端发送数据。
如果服务器返回的数据中包含了事件标识符,浏览器会记录最后一次接收的事件的标识符。如果与服务器的连接中断,当浏览器再次进行连接时,会通过头来声明最后一次接收的事件的标识符。服务器端可以通过浏览器发送的事件标识符来确定从哪个事件来继续连接
三、前端使用方法、问题
1、get方式
使用eventsource完成get请求
缺点:客户端无法通过一个get请求完成数据传递
参考文档:
EventSource - Web API 接口参考 | MDN
实现流程:
- 后端提供了两个接口,一个是:post,用以完成前端信息的传递,我这边是做大语言模型的,所以包括了模型必要参数、问题等;二、get接口,完成流式输出的接口,配置相应的具名事件、请求头等
- 前端通过调用post接口拿到本次会话id,将id携带在get请求里,完成信息传递
- 前端处理SSE流式返回
代码实现:
const eventSourceRef = useRef<any>(null)
const contact = async (messageData: any) => {
eventSourceRef.current = new EventSource(
`${API_BASE}/v1/model/stream?id=${id}`,
)
if (!eventSourceRef.current) return
// 监听 SSE 事件,因为后端定义了具名事件,所以这儿要用addEventListener监听,而不是onmessage
eventSourceRef.current.addEventListener('add', function (e: any) {
// 处理数据展示
})
eventSourceRef.current.addEventListener('finish', function (e: any) {
// 结束标识finish
eventSourceRef.current.close() // 关闭连接
})
eventSourceRef.current.addEventListener('error', function (e: any) {
if (e.status === 401) {
// 用户登录状态失效处理
}
// error报错处理
console.log('Error o***urred:', e)
// 关闭连接
eventSourceRef.current.close()
})
}
2、post方式
使用fetch-event-source完成连接,仅需一个接口,支持添加请求头
缺点:在浏览器返回的text/eventstream里看不到具体返回,无法进行预览
参考文档:
@microsoft/fetch-event-source
实现流程:
- 后端提供一个接口,支持前端传参、流式返回
- 前端通过fetch-event-source,完成传参、请求头添加等
- 处理返回数据
具体实现:
const eventSourceRef = useRef<any>(null)
const [abortController, setAbortController] = useState(new AbortController())
// 通信事件
const contact = async (messageData: any) => {
messageData = { ...messageData, do_stream: modelArg.do_stream } // 请求参数
receivedDataRef.current = ''
const token: string = getLocal('AUTHCODE') || ''
fetchEventSource(`${MAAS_API_BASE}/v1/model_api/invoke`, {
method: 'POST',
// 添加请求头
headers: {
Authorization: token,
'Content-Type': 'application/json',
},
// 传参必须保证是json
body: JSON.stringify(messageData),
// abortController.signal 提供了一个信号对象给 fetchEventSource 函数。
// 如果在任何时候你想取消正在进行的 fetch 操作,你可以调用
// abortController.abort()。这会发出关联任务的信号,你可以使用
// AbortController 的信号来检查异步操作是否已被取消。
signal: abortController.signal,
openWhenHidden: true, // 切换标签页时连接不关闭
async onopen(resp) {
// 处理登录失效
if (resp.status === 401) {
message.warning('登录过期')
return
}
},
onmessage(msg: any) {
const eventType = msg.event // 监听event的具名事件
switch (eventType) {
case 'add':
// 流式输出事件,add每次会返回具体字符,前端负责拼接展示
break
case 'finish':
setStatu('finish') // 结束标识
break
case 'error':
if (msg.status === 401) {
message.warning('登录过期')
}
console.log('Error o***urred:', e)
break
}
},
onerror(err) {
throw err // 连接遇到http错误时,如跨域等,必须要throw才能停止,不然会一直重连
},
onclose() {},
})
}
// 终止连接方法,比如在切换模型时,你可能有必要终止上一次连接来避免问答串联
const closeSSE = () => {
abortController.abort()
setAbortController(new AbortController())
}
3、一种接口同时兼容流式/非流式
同上post方法
fetchEventSource(sseUrl, {
method: 'POST',
headers,
signal: abortController.signal,
body: JSON.stringify(customInferData),
openWhenHidden: true,
/**
*在onopen阶段处理
第一步:判断resp.headers.get('content-type'),如果不包含text/event-stream,
则代表非流式
第二步:需要在onopen阶段处理非流式返回,即json返回,读取json返回并渲染,注意异常也要处理
第三步:
*/
async onopen(resp) {
const contentype = resp.headers.get('content-type') || ''
console.log('contentype =>', contentype)
console.log('resp.ok =>', resp.ok)
if (resp.ok && !contentype.includes('text/event-stream')) {
// 读取json数据
const responseData = await resp.json()
if (responseData.code !== 0) {
// 报错处理+关闭连接
} else {
//处理数据渲染+关闭连接
stopSession()
}
} else if (resp.status === 401) {
message.warning('登录过期')
// 报错处理+关闭连接
stopSession()
}
},
onmessage(msg: any) {
const eventType = msg.event
const messages: any = cloneDeep(chatState.sessionMessages)
let lastMessage: any = messages[messages.length - 1] || {}
switch (eventType) {
case 'add':
lastMessage = {
...lastMessage,
text: `${lastMessage.text}${msg.data || ' '}`,
loading: false,
}
messages.splice(messages.length - 1, 1, lastMessage)
chatAction.updateSessionMessages(messages)
break
case 'finish':
console.log('finish lastMessage =>', lastMessage)
chatAction.updateSessionStatu(SessionStatuTypes.ready)
chatAction.updateContext(msg.data)
break
case 'info':
{
const messages: any = cloneDeep(chatState.sessionMessages)
let lastMessage: any = messages[messages.length - 1] || {}
lastMessage = {
referenceDocs: JSON.parse(msg.data).reference_by_docs,
...lastMessage,
}
messages.splice(messages.length - 1, 1, lastMessage)
chatAction.updateSessionMessages(messages)
}
break
case 'error':
if (msg.status === 401) {
chatAction.updateSessionStatu(SessionStatuTypes.ready)
} else {
errorItemFn(msg?.msg || msg?.data || '抱歉,暂无法回答问题')
}
break
}
},
onerror(err: any) {
errorItemFn(err?.msg || '抱歉,暂无法回答该问题')
console.log('eventSource error: ', `${err}`)
throw err // 连接遇到http错误时,如跨域等,必须要throw才能停止,不然会一直重连
},
onclose() {
console.log('eventSource close')
},
})
// 终止会话
const stopSession = () => {
abortController.abort()
setAbortController(new AbortController())
}
四、常见问题汇总
1、无法添加请求头
应用fetch-event-source解决
2、一个方法需要同时兼容流式和非流式
应用fetch-event-source在onopen阶段处理非流式输出,如报错、接口json返回等
3、遇到跨域时候,请求一直连接
应用fetch-event-source在监听具名事件时,如error,将错误throw err,否则无法中断连接
4、fetch方法如何终止
const stopSession = () => {
abortController.abort()
setAbortController(new AbortController())
}