本文引用了后台技术汇一枚少年郎“大模型应用之:SSE流式响应”的内容,即时通讯网有修订和重新排版。
cover_opti.png (11.15 KB, 下载次数: 1)
下载附件 保存到相册
27 天前 上传
1.png (32.83 KB, 下载次数: 12)
31 天前 上传
curl -i -X POST -H 'Content-Type: application/json' -H 'Authorization: Bearer sk-************************************************' [url=https://api.openai.com/v1/chat/completions]https://api.openai.com/v1/chat/completions[/url] -d '{"model":"gpt-3.5-turbo","messages":[{"role": "user", "content": "3+5=?"}],"temperature":0.8,"stream":true}'
HTTP/2 200 date: Fri, 08 Sep 2023 03:39:50 GMT content-type: text/event-stream access-control-allow-origin: * cache-control: no-cache, must-revalidate openai-organization: metaverse-cloud-pte-ltd-orfbgw openai-processing-ms: 5 openai-version: 2020-10-01 strict-transport-security: max-age=15724800; includeSubDomains x-ratelimit-limit-requests: 3500 x-ratelimit-limit-tokens: 90000 x-ratelimit-remaining-requests: 3499 x-ratelimit-remaining-tokens: 89980 x-ratelimit-reset-requests: 17ms x-ratelimit-reset-tokens: 12ms x-request-id: 96ff4efafed25a52fbedb6e5c7a3ab09 cf-cache-status: DYNAMIC server: cloudflare cf-ray: 80342aa96ae00974-HKG alt-svc: h3=":443"; ma=86400 data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]} data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"3"},"finish_reason":null}]} data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" +"},"finish_reason":null}]} data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" "},"finish_reason":null}]} data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"5"},"finish_reason":null}]} data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" ="},"finish_reason":null}]} data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" "},"finish_reason":null}]} data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"8"},"finish_reason":null}]} data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]} data: [DONE]
x0.png (10.48 KB, 下载次数: 9)
x00.png (13.39 KB, 下载次数: 10)
4.png (30.79 KB, 下载次数: 8)
async def _async_stream_with_custom_tokenizer(self, request: Request, langchain, prompt:str="", history_messages: List[Message] = None): """ 异步非阻塞版,区别 同步阻塞版(_generate_event_stream_with_custom_tokenizer) """ total_stream_content = "" async for stream_content in langchain.astream({}): if await request.is_disconnected(): logger.warning(f"[generate_event_stream] " f", [trace_id] = {trace_id_context.get()}" f", gateway connection abort..") break if isinstance(stream_content, str): content = stream_content total_stream_content += content elif isinstance(stream_content, AIMessageChunk): content = stream_content.content total_stream_content += content else: logger.error(f"[generate_event_stream] " f", [trace_id] = {trace_id_context.get()}" f", unexpected stream_content type: {type(stream_content)}") break # print(f"[custom_tokenizer] langchain stream response: {stream_content}") # 提取token统计信息 usage = None if (stream_content.response_metadata is not None and (stream_content.response_metadata.get('finish_reason') == 'stop' or stream_content.response_metadata.get('done_reason') == 'stop')): # hunyuan/azure_openai # logger.debug(f"=====> finish stream response, signal = {stream_content.response_metadata.get('finish_reason')}") # logger.debug(f"=====> finish stream response, signal = {stream_content.response_metadata.get('done_reason')}") if usage is None: token_usage = TokenTracker(self.model).track_full_token_usage( input_text=prompt, output_text=total_stream_content, context=history_messages ) usage = self._get_token_usage(self.model, token_usage) resp = GenerateResponse(code=AiErrorCode.SUCCESS.value["code"], message=AiErrorCode.SUCCESS.value["message"], resp=content, token_usage=usage) yield resp.to_string()
async def _handle_stream_response(self, resp, prompt: str = None, history_messages: List[Message] = None, model:str=None): # 全量数据包响应 & 单个数据包响应 total_stream_content = "" usage = None buffer = "" cache_raw_data = b'' cache_raw_data_enable = False # 分块读取 for stream_response in resp.iter_content(chunk_size=100): # 解码响应(可能因分块边界截断UTF-8字符) origin_content = "" try: if cache_raw_data_enable: cache_raw_data += stream_response # 尝试UTF-8解码 origin_content = cache_raw_data.decode('utf-8') # 每次成功解码后自动清理缓存 cache_raw_data = b'' else : # 尝试UTF-8解码 origin_content = stream_response.decode('utf-8') cache_raw_data_enable = False except UnicodeDecodeError: logger.error(f"extract_content, data chunk decode error, trace_id = {trace_id_context.get()}, origin data = {stream_response}") # 方案1:容错处理(有乱码字符输出,影响用户体验) # origin_content = stream_response.decode('utf-8', errors='replace') # 方案2:解码失败,缓存数据,缓存数据包待处理 logger.debug(f"extract_content, cache_raw_data_enable= {cache_raw_data_enable}, cache_raw_data = {cache_raw_data}") cache_raw_data += stream_response cache_raw_data_enable = True # 跳过后续处理,等待下一块数据 continue logger.debug(f"extract_content, trace_id = {trace_id_context.get()}, origin data = {origin_content}") buffer += origin_content while True: # SSE协议:定位两个连续换行符,标识事件结束 idx = buffer.find('\n\n') if idx == -1: break event_data = buffer[:idx] # 移除已处理数据 buffer = buffer[idx + 2:] # 处理事件数据中的每一行 for line in event_data.split('\n'): line = line.strip() if not line.startswith('data:'): continue # 移除"data:" or "data: "(这里的data:,后面可能跟1个或0个空格,eg,deepseek是没有空格,而azureopenai又有空格,这里做兼容) data_str = line if line.startswith('data: '): data_str = line[6:] elif line.startswith('data:'): data_str = line[5:] if data_str == '[DONE]': # 2.1 自定义token计数器 token_usage = TokenTracker(model_name=model).track_full_token_usage( input_text=prompt, output_text=total_stream_content, context=history_messages ) usage = super()._get_token_usage(model=model, usage=token_usage) # 2.1 拼接最终结果 res = GenerateResponse(code=AiErrorCode.SUCCESS.value["code"], message=AiErrorCode.SUCCESS.value["message"], resp=None, token_usage=usage) logger.debug(f"finish stream, trace_id = {trace_id_context.get()}, token data = {usage}") yield res.to_string() else: try: # 解析JSON数据 data = json.loads(data_str) # 提取delta中的content if 'choices' in data: for choice in data['choices']: delta = choice.get('delta', {}) content = delta.get('content') if content is not None: total_stream_content += content # 3.8 拼接最终结果 res2 = GenerateResponse(code=AiErrorCode.SUCCESS.value["code"], message=AiErrorCode.SUCCESS.value["message"], resp=content, token_usage=usage) logger.debug(f"解析一个数据包数据完成, trace_id = {trace_id_context.get()}, origin data = {content}") yield res2.to_string() except json.JSONDecodeError: pass # 忽略无效JSON数据
3.png (15.08 KB, 下载次数: 6)
来源:即时通讯网 - 即时通讯开发者社区!
轻量级开源移动端即时通讯框架。
快速入门 / 性能 / 指南 / 提问
轻量级Web端即时通讯框架。
详细介绍 / 精编源码 / 手册教程
移动端实时音视频框架。
详细介绍 / 性能测试 / 安装体验
基于MobileIMSDK的移动IM系统。
详细介绍 / 产品截图 / 安装体验
一套产品级Web端IM系统。
详细介绍 / 产品截图 / 演示视频
一套纯血鸿蒙NEXT产品级IM系统。
详细介绍 / 产品截图 / 安装
精华主题数超过100个。
连续任职达2年以上的合格正式版主
为论区做出突出贡献的开发者、版主等。
Copyright © 2014-2024 即时通讯网 - 即时通讯开发者社区 / 版本 V4.4
苏州网际时代信息科技有限公司 (苏ICP备16005070号-1)
Processed in 0.161139 second(s), 44 queries , Gzip On.