ChatGPT 打字机效果实现

ChatGPT 刚出来时,各类的 GPT 客户端层出不穷,交互秒杀官方网页版,我本人也几乎不用官方页面。直到某一天,所有的第三方客户端都无法显示 GPT 的回复了,网上一查,说是 OpenAI 升级了页面,数据管道从原来的 EventSource 升级成了 WebSocket,这才造成几乎所有的第三方客户端全部阵亡的情况。

虽然大家很快跟进了修改,但是作为开发者,我们还是有必要了解下,为什么 OpenAI 要进行升级,WebSocket 有什么优势吗?

ChatGPT 的网页应用就是一个标准的聊天应用,信息交互比较频繁。但是,这个频繁只是相对的,首先我们与 ChatGPT 主要数据是文本,其次 ChatGPT 只会在我们发出消息后才会回复消息,不会主动向我们发送消息,所以官方一开始就选用了单向数据推送、传输数据为文本的 EventSource 作为消息管道。

但随着 OpenAI 的政策变化,网页版 ChatGPT 功能越来越强,现在已经支持文件上传,各种数据类型已经不是 EventSource 能承载得了了。可能你会想,文件上传就使用 HTTP 协议实现一个新的接口不就好了?增加一个功能就要增加接口,随着页面功能越来越多,ChatGPT 的 HTTP 连接会非常多,连接建立也是会消耗时间造成延迟,在聊天类应用上不合适;而且浏览器对于同一个网址的 HTTP 连接是有数量限制的,Chrome 的限制是 6 个,也就是同一个网址,只能同时存在 6 个 HTTP 连接,后续一律等待,直到前面的连接有断开。这对基于 HTTP 的 EventSource 简直就是灾难。基于数据类型、网络性能、浏览器限制等种种考虑,OpenAI 进行了升级,将 EventSource 替换成了 WebSocket。

虽然 ChatGPT 已经使用了 WebSocket 了,但是这不妨碍我们学习它的打字机效果的实现,下面将通过三种方式实现。

1. Stream 流式传输

后端开发者对这个应该不陌生,很常见的场景就是文件下载,有时候文件体积很大,全部读取势必会爆内存,这时候的做法就是将一个文件切分成若干块,然后一块一块地发送数据,这样就避免了爆内存的风险。

针对实现了流式传输的接口,我们获取到的数据也是一块一块的,这就跟打字机的效果很类似,都不是一次性显示全部数据,所以通过流式传输,我们也能实现打字机的效果。

nuxt/server:

export default defineEventHandler((event) => {
  setResponseHeader(event, 'Content-Type', 'text/html')
  setResponseHeader(event, 'Cache-Control', 'no-cache')
  setResponseHeader(event, 'Transfer-Encoding', 'chunked')

  let consume_interval: NodeJS.Timeout
  let produce_interval: NodeJS.Timeout
  let message: string[] = []
  const stream = new ReadableStream({
    start(controller) {
      try {
        consume_interval = setInterval(() => {
          const char = message.shift()?.trim()
          if (char) {
            controller.enqueue(char)
          }
        }, 200)
        produce_interval = setInterval(() => {
          // 生成随机字符串
          message.push(...Math.random().toString(36).substring(2, 2 + Math.round(Math.random() * 10)).split(''))
        }, 3000)
      } catch (err: any) {
        controller.enqueue(err.message)
        controller.close()
      }
    },
    cancel() {
      clearInterval(consume_interval)
      clearInterval(produce_interval)
    },
  })

  return sendStream(event, stream)
})

nuxt/client: 

const body: ReadableStream = await $fetch('/api/stream', {
  responseType: 'stream',
})
stream_reader.value = body.getReader()
const decoder = new TextDecoder('utf-8')
const read: () => Promise<any> = async () => {
  const { done, value } = await stream_reader.value!.read()
  if (done) {
    return stream_reader.value!.releaseLock()
  }

  const chunk = decoder.decode(value, { stream: true })
  olchnk)

  return read()
}
await read()

流式传输看似完美符合要求,但在聊天应用中存在一个缺陷,那就是“不够及时”。因为是分块传输,网络设备有可能在接收到一个分块后不会立即转发,而是会等待后续的分块,当分块数量达到一定要求后,会将保存的分块一次性发送出去,这就会造成虽然后端send消息了,但是前端并没有接收到相关数据。

2. EventSource

为了应对上述问题,HTTP 协议专门增加了一种 Content-Type:'text/event-stream'内容类型以告知相关应用“这个数据chunk不需要缓存,请立即发送”。

在 EventSource 中,传输的分块叫消息,同时也增加了字段:id、event、data,用于对消息进行区分。下面是通过 EventSource 实现的打字机效果。

nuxt/server:

export default defineEventHandler((event) => {
  setResponseHeader(event, 'Content-Type', 'text/event-stream')
  setResponseHeader(event, 'Cache-Control', 'no-cache')

  let consume_interval: NodeJS.Timeout
  let produce_interval: NodeJS.Timeout
  let message: string[] = []
  const stream = new ReadableStream({
    start(controller) {
      try {
        consume_interval = setInterval(() => {
          const char = message.shift()?.trim()
          if (char) {
            controller.enqueue(`event: char\n`)
            controller.enqueue(`data: ${char}\n\n`)
          }
        }, 200)
        produce_interval = setInterval(() => {
          // 生成随机字符串
          message.push(...Math.random().toString(36).substring(2, 2 + Math.round(Math.random() * 10)).split(''))
        }, 3000)
      } catch (err: any) {
        controller.enqueue(err.message)
        controller.close()
      }
    },
    cancel() {
      clearInterval(consume_interval)
      clearInterval(produce_interval)
    },
  })

  return sendStream(event, stream)
})

nuxt/client:

const eventSource = new EventSource('/api/sse')
eventSource.addEventListener('char', (event) => {
  sse_result.value += event.data
})

通过上面的后端代码我们可以发现,整个流程跟流式传输非常的相似,区别就是content-type与chunk格式了,这说明 EventSource 就是基于流式传输的一个特殊的数据类型。

3. WebSocket

上面基于 HTTP 的实现方式虽然好,但是会占用浏览器对 HTTP 连接的限额、传输内容都是文本数据且只能是单向数据传输。于是支持双向会话的全双工协议 WebSocket 诞生了,也是 ChatGPT 网页目前采用的方案。

虽然 WebSocket 有上述优点,但不代表升级到 WebSocket 是没有代价的。虽然它是基于 HTTP 协议进行升级的,但是服务却是无法共用的,你需要开发相对应的服务代码;网络设备也需要做相应的调整以支持 WebSocket;很多时候我们根本不需要它的全双工双向通信能力,比如 ChatGPT 场景,OpenAI 更多的是基于未来的考虑才升级的。

当然,这些并不妨碍我们使用 WebSocket 实现打字机效果。

nuxt/server:

export default defineWebSocketHandler({
  open(peer) {
    console.log("[ws] open", peer);

    const message: string[] = []
    // @ts-ignore
    peer.consume_interval = setInterval(() => {
      const char = message.shift()?.trim()
      if (char) {
        peer.send(char)
      }
    }, 200)
    // @ts-ignore
    peer.produce_interval = setInterval(() => {
      // 生成随机字符串
      message.push(...Math.random().toString(36).substring(2, 2 + Math.round(Math.random() * 10)).split(''))
    }, 3000)
  },

  message(peer, message) {
    console.log("[ws] message", peer, message);
    if (message.text().includes("ping")) {
      peer.send("pong");
    }
  },

  close(peer, event) {
    console.log("[ws] close", peer, event);
    // @ts-ignore
    clearInterval(peer.consume_interval)
    // @ts-ignore
    clearInterval(peer.produce_interval)
  },

  error(peer, error) {
    console.log("[ws] error", peer, error);
    // @ts-ignore
    clearInterval(peer.consume_interval)
    // @ts-ignore
    clearInterval(peer.produce_interval)
  },
})

nuxt/client:

sse_result.value = ''
const ws = new WebSocket('/api/ws')
ws.addEventListener('message', (event) => {
  ws_result.value += event.data
})

4. 总结

上面就是目前实现打字机效果的优选方案,很多时候需要根据场景选择不同的方案,不能无脑 WebSocket,一些简单应用场景可能流式传输更合适,比如日志打印什么的。

所有相关代码从 这里 获取。