事件起因
oneapi 是 github 上一款用 go 语言编写的开源的 openai 接口适配项目。主要用于将不同厂商的ai 服务转为 openai 标准的格式。(实际上,openai 的交互格式已成为 Chat 对话的标准)。
具体的 oneapi 的使用教程可以参考官方 github 仓库和文档说明,再次不再赘述。
由于有一些厂商的服务 oneapi 并未兼容,加上本人不会写 go ,也无从根据官方项目拓展,因此呢,本人写了一个基于java 体系下 webflux 的中转项目,用来适配其他的一些厂商,例如 bito,智谱清言和 kimi的网页版逆向等等。
这样呢,我服务器上就启动了两个项目,一个 oneapi,一个java 项目(mhn-proxy)。然后呢,oneapi 是支持自行添加兼容 openai 格式的渠道的,因此我在oneapi上配置了渠道,指向 mhn-proxy了。
mhn-proxy 这个项目在主流的客户端上都是可以使用的,流式和非流式都能完美输出。但是接到 oneapi 以后,请求就报错了。
具体表现为:如果是非流式输出,返回没问题。如果是流式输出,返回为空。
查找问题
由于 oneapi 的日志打印很简单,从日志看,仅仅只能确认如下有效信息:oneapi 打印出的输出日志为空,也就意味着 oneapi 没有给客户端返回数据。
但是mhn-proxy 的日志显示,mhn-proxy 是收到请求也有响应的(从该项目如果单独被客户端调用可以使用也是可以确认的)。那这究竟是啥问题呢?
单看日志无法解决问题,只能硬着头皮去看 oneapi 的源代码。着重关注 oneapi 关于流式数据的输入和输出。在 relay/adaptor/openai/main.go有如下的处理:
const (
dataPrefix = "data: "
done = "[DONE]"
dataPrefixLength = len(dataPrefix)
)
func StreamHandler(c *gin.Context, resp *http.Response, relayMode int) (*model.ErrorWithStatusCode, string, *model.Usage) {
responseText := ""
scanner := bufio.NewScanner(resp.Body)
scanner.Split(bufio.ScanLines)
var usage *model.Usage
common.SetEventStreamHeaders(c)
doneRendered := false
for scanner.Scan() {
data := scanner.Text()
if len(data) < dataPrefixLength { // ignore blank line or wrong format
continue
}
if data[:dataPrefixLength] != dataPrefix && data[:dataPrefixLength] != done {
continue
}
if strings.HasPrefix(data[dataPrefixLength:], done) {
render.StringData(c, data)
doneRendered = true
continue
}
//...
return nil, responseText, usage
}
这一段主要是判断返回的数据是否以data:
开头,如果不是就跳过,是就截取掉,只处理后面的 json 数据。这段逻辑依据的是标准的 SSE 协议,都是这样规定的。看了一圈没啥问题,而且本身也不太可能有问题(如果有问题那其他的中转也肯定没法用了)。那问题不是出在 oneapi 上,是我自己的 mhn-proxy 有啥问题吗?
mhn-proxy 接口是基于 sring-webflux 开发的,大概代理接口长这样。
@RequestMapping(value = {"/v1/chat/completions"})
public Publisher<String> chat(ServerWebExchange exchange) {
//xxxx
return gptProxyService.chat(request);
}
返回一个Publisher<String>,其实都是Flux<String>。然后在前置的一个过滤器里,有如下判断:
if (stream) {
response.getHeaders().set("Cache-Control", "no-cache");
response.getHeaders().set("Content-Type", "text/event-stream");
response.getHeaders().set("Access-Control-Allow-Origin", "*");
}
如果请求参数中 stream=true,那么就会设置响应头为text/event-stream
,代表流式输出。看来要解决问题,只能看一下 SpringWebflux 对于流式输出是怎么处理的。
查阅资料得知,核心的处理逻辑在于ServerSentEventHttpMessageWriter
这个类。
@Override
public boolean canWrite(ResolvableType elementType, @Nullable MediaType mediaType) {
return (mediaType == null || MediaType.TEXT_EVENT_STREAM.includes(mediaType) ||
ServerSentEvent.class.isAssignableFrom(elementType.toClass()));
}
该类生效的前提是 contentType 为TEXT_EVENT_STREAM
。需要看一下具体是怎么处理的。
private Flux<Publisher<DataBuffer>> encode(Publisher<?> input, ResolvableType elementType,
MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {
ResolvableType dataType = (ServerSentEvent.class.isAssignableFrom(elementType.toClass()) ?
elementType.getGeneric() : elementType);
return Flux.from(input).map(element -> {
ServerSentEvent<?> sse = (element instanceof ServerSentEvent ?
(ServerSentEvent<?>) element : ServerSentEvent.builder().data(element).build());
StringBuilder sb = new StringBuilder();
String id = sse.id();
String event = sse.event();
Duration retry = sse.retry();
String comment = sse.comment();
Object data = sse.data();
if (id != null) {
writeField("id", id, sb);
}
if (event != null) {
writeField("event", event, sb);
}
if (retry != null) {
writeField("retry", retry.toMillis(), sb);
}
if (comment != null) {
sb.append(':').append(StringUtils.replace(comment, "\n", "\n:")).append('\n');
}
if (data != null) {
sb.append("data:");
}
Flux<DataBuffer> result;
if (data == null) {
result = Flux.just(encodeText(sb + "\n", mediaType, factory));
}
else if (data instanceof String) {
data = StringUtils.replace((String) data, "\n", "\ndata:");
result = Flux.just(encodeText(sb + (String) data + "\n\n", mediaType, factory));
}
else {
result = encodeEvent(sb, data, dataType, mediaType, factory, hints);
}
return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
});
}
可以看到他这里就是给实际返回的数据按照 SSE 标准规范添加了一些信息。乍一看好像也没问题啊!那问题到底出在哪里?
等等,好像有哪里不对劲,必须睁大我的 24k 钛合金眼了。
if (data != null) {
sb.append("data:");
}
const (
dataPrefix = "data: "
done = "[DONE]"
dataPrefixLength = len(dataPrefix)
)
这俩 dataPrefix 好像不太一样?差一个空格?看到这里,我突然好像找到了关键所在。onepai 在处理响应的时候,是按照data:
判断前缀的,而 spring 在处理的时候添加的是data:
。这肯定校验不过啊!!
好吧,原来问题是这。openai 的标准格式返回data:
后面是有空格的,虽然说 SSE 协议并不强制要求说有没有空格,但是 oneapi 在处理的时候是按有空格处理的,并没有兼容没有空格的情况。
解决方案
既然找到了原因所在,那我们就需要替换掉默认的ServerSentEventHttpMessageWriter
,写一个自定义的HttpMessageWriter
来作为 SSE 的编码器。
经过一番研究,发现可以采用如下的方式来替换默认的编码器。
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {
@Override
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
configurer.registerDefaults(false);
//由于上述禁用了默认的注册,所以需要手动注册默认的编解码器
configurer.customCodecs().register(new Jackson2JsonEncoder());
configurer.customCodecs().register(new EncoderHttpMessageWriter<>(new ByteArrayEncoder()));
configurer.customCodecs().register(new EncoderHttpMessageWriter<>(new ByteBufferEncoder()));
configurer.customCodecs().register(new EncoderHttpMessageWriter<>(new DataBufferEncoder()));
configurer.customCodecs().register(new EncoderHttpMessageWriter<>(new NettyByteBufEncoder()));
configurer.customCodecs().register(new ResourceHttpMessageWriter());
configurer.customCodecs().register(new EncoderHttpMessageWriter<>(CharSequenceEncoder.textPlainOnly()));
//注入自定义的 sse writer
configurer.customCodecs().register(new CustomServerSentEventHttpMessageWriter(new Jackson2JsonEncoder()));
}
}
需要先禁用掉默认的 codec,然后注册自定义的编码器。如此一来就可以替换默认的了。
总结
最后总结一下背锅顺序吧!
oneapi 首先背大锅,它在处理响应的时候太严格了,其实只要按data:
处理不就好了,然后截取完以后再 trim 一下。
spring 背中锅,人家都是有空格的,咋就你没有空格呢!!
我背小锅,谁让我不会写 go,只能用 java 写呢,用 go 还有这问题吗!