事件起因

oneapi 是 github 上一款用 go 语言编写的开源的 openai 接口适配项目。主要用于将不同厂商的ai 服务转为 openai 标准的格式。(实际上,openai 的交互格式已成为 Chat 对话的标准)。
具体的 oneapi 的使用教程可以参考官方 github 仓库和文档说明,再次不再赘述。

由于有一些厂商的服务 oneapi 并未兼容,加上本人不会写 go ,也无从根据官方项目拓展,因此呢,本人写了一个基于java 体系下 webflux 的中转项目,用来适配其他的一些厂商,例如 bito,智谱清言和 kimi的网页版逆向等等。

这样呢,我服务器上就启动了两个项目,一个 oneapi,一个java 项目(mhn-proxy)。然后呢,oneapi 是支持自行添加兼容 openai 格式的渠道的,因此我在oneapi上配置了渠道,指向 mhn-proxy了。

mhn-proxy 这个项目在主流的客户端上都是可以使用的,流式和非流式都能完美输出。但是接到 oneapi 以后,请求就报错了。

具体表现为:如果是非流式输出,返回没问题。如果是流式输出,返回为空。

查找问题

由于 oneapi 的日志打印很简单,从日志看,仅仅只能确认如下有效信息:oneapi 打印出的输出日志为空,也就意味着 oneapi 没有给客户端返回数据。
但是mhn-proxy 的日志显示,mhn-proxy 是收到请求也有响应的(从该项目如果单独被客户端调用可以使用也是可以确认的)。那这究竟是啥问题呢?
单看日志无法解决问题,只能硬着头皮去看 oneapi 的源代码。着重关注 oneapi 关于流式数据的输入和输出。在 relay/adaptor/openai/main.go​有如下的处理:

const (
	dataPrefix       = "data: "
	done             = "[DONE]"
	dataPrefixLength = len(dataPrefix)
)
func StreamHandler(c *gin.Context, resp *http.Response, relayMode int) (*model.ErrorWithStatusCode, string, *model.Usage) {
	responseText := ""
	scanner := bufio.NewScanner(resp.Body)
	scanner.Split(bufio.ScanLines)
	var usage *model.Usage

	common.SetEventStreamHeaders(c)

	doneRendered := false
	for scanner.Scan() {
		data := scanner.Text()
		if len(data) < dataPrefixLength { // ignore blank line or wrong format
			continue
		}
		if data[:dataPrefixLength] != dataPrefix && data[:dataPrefixLength] != done {
			continue
		}
		if strings.HasPrefix(data[dataPrefixLength:], done) {
			render.StringData(c, data)
			doneRendered = true
			continue
		}
		//...
	return nil, responseText, usage
}

这一段主要是判断返回的数据是否以data: ​开头,如果不是就跳过,是就截取掉,只处理后面的 json 数据。这段逻辑依据的是标准的 SSE 协议,都是这样规定的。看了一圈没啥问题,而且本身也不太可能有问题(如果有问题那其他的中转也肯定没法用了)。那问题不是出在 oneapi 上,是我自己的 mhn-proxy 有啥问题吗?

mhn-proxy 接口是基于 sring-webflux 开发的,大概代理接口长这样。

@RequestMapping(value = {"/v1/chat/completions"})  
public Publisher<String> chat(ServerWebExchange exchange) {  
	//xxxx
    return gptProxyService.chat(request);  
}

返回一个Publisher<String>​,其实都是Flux<String>​。然后在前置的一个过滤器里,有如下判断:

 if (stream) {  
    response.getHeaders().set("Cache-Control", "no-cache");  
    response.getHeaders().set("Content-Type", "text/event-stream");  
    response.getHeaders().set("Access-Control-Allow-Origin", "*");  
}

如果请求参数中 stream=true​,那么就会设置响应头为text/event-stream​,代表流式输出。看来要解决问题,只能看一下 SpringWebflux 对于流式输出是怎么处理的。
查阅资料得知,核心的处理逻辑在于ServerSentEventHttpMessageWriter​这个类。

@Override  
public boolean canWrite(ResolvableType elementType, @Nullable MediaType mediaType) {  
    return (mediaType == null || MediaType.TEXT_EVENT_STREAM.includes(mediaType) ||  
          ServerSentEvent.class.isAssignableFrom(elementType.toClass()));  
}

该类生效的前提是 contentType 为TEXT_EVENT_STREAM​。需要看一下具体是怎么处理的。

private Flux<Publisher<DataBuffer>> encode(Publisher<?> input, ResolvableType elementType,  
       MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {  
  
    ResolvableType dataType = (ServerSentEvent.class.isAssignableFrom(elementType.toClass()) ?  
          elementType.getGeneric() : elementType);  
  
    return Flux.from(input).map(element -> {  
  
       ServerSentEvent<?> sse = (element instanceof ServerSentEvent ?  
             (ServerSentEvent<?>) element : ServerSentEvent.builder().data(element).build());  
  
       StringBuilder sb = new StringBuilder();  
       String id = sse.id();  
       String event = sse.event();  
       Duration retry = sse.retry();  
       String comment = sse.comment();  
       Object data = sse.data();  
       if (id != null) {  
          writeField("id", id, sb);  
       }  
       if (event != null) {  
          writeField("event", event, sb);  
       }  
       if (retry != null) {  
          writeField("retry", retry.toMillis(), sb);  
       }  
       if (comment != null) {  
          sb.append(':').append(StringUtils.replace(comment, "\n", "\n:")).append('\n');  
       }  
       if (data != null) {  
          sb.append("data:");  
       }  
  
       Flux<DataBuffer> result;  
       if (data == null) {  
          result = Flux.just(encodeText(sb + "\n", mediaType, factory));  
       }  
       else if (data instanceof String) {  
          data = StringUtils.replace((String) data, "\n", "\ndata:");  
          result = Flux.just(encodeText(sb + (String) data + "\n\n", mediaType, factory));  
       }  
       else {  
          result = encodeEvent(sb, data, dataType, mediaType, factory, hints);  
       }  
  
       return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);  
    });  
}

可以看到他这里就是给实际返回的数据按照 SSE 标准规范添加了一些信息。乍一看好像也没问题啊!那问题到底出在哪里?
等等,好像有哪里不对劲,必须睁大我的 24k 钛合金眼了。

 if (data != null) {  
	  sb.append("data:");  
   }  
const (
	dataPrefix       = "data: "
	done             = "[DONE]"
	dataPrefixLength = len(dataPrefix)
)

这俩 dataPrefix​ 好像不太一样?差一个空格?看到这里,我突然好像找到了关键所在。onepai 在处理响应的时候,是按照data: ​判断前缀的,而 spring 在处理的时候添加的是data:​。这肯定校验不过啊!!
好吧,原来问题是这。openai 的标准格式返回data:​后面是有空格的,虽然说 SSE 协议并不强制要求说有没有空格,但是 oneapi 在处理的时候是按有空格处理的,并没有兼容没有空格的情况。

解决方案

既然找到了原因所在,那我们就需要替换掉默认的ServerSentEventHttpMessageWriter​,写一个自定义的HttpMessageWriter​来作为 SSE 的编码器。
经过一番研究,发现可以采用如下的方式来替换默认的编码器。

@Configuration  
public class WebFluxConfig implements WebFluxConfigurer {  
  
    @Override
    public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
        configurer.registerDefaults(false);

        //由于上述禁用了默认的注册,所以需要手动注册默认的编解码器
        configurer.customCodecs().register(new Jackson2JsonEncoder());
        configurer.customCodecs().register(new EncoderHttpMessageWriter<>(new ByteArrayEncoder()));
        configurer.customCodecs().register(new EncoderHttpMessageWriter<>(new ByteBufferEncoder()));
        configurer.customCodecs().register(new EncoderHttpMessageWriter<>(new DataBufferEncoder()));
        configurer.customCodecs().register(new EncoderHttpMessageWriter<>(new NettyByteBufEncoder()));
        configurer.customCodecs().register(new ResourceHttpMessageWriter());

        configurer.customCodecs().register(new EncoderHttpMessageWriter<>(CharSequenceEncoder.textPlainOnly()));

        //注入自定义的 sse writer
        configurer.customCodecs().register(new CustomServerSentEventHttpMessageWriter(new Jackson2JsonEncoder()));
    } 
}

需要先禁用掉默认的 codec,然后注册自定义的编码器。如此一来就可以替换默认的了。

总结

最后总结一下背锅顺序吧!
oneapi 首先背大锅,它在处理响应的时候太严格了,其实只要按data:​处理不就好了,然后截取完以后再 trim 一下。
spring 背中锅,人家都是有空格的,咋就你没有空格呢!!
我背小锅,谁让我不会写 go,只能用 java 写呢,用 go 还有这问题吗!