リアルタイム通信を彩るサーバ送信イベント(SSE)の魅力と可能性

2024年4月15日掲載

governance

ChatGPT、Gemini、Copilot、Claudeなどの、高度な言語モデルを活用した対話型AIアシスタントが大きな注目を集めています。これらのアプリケーションのUIで特徴的なのは、ユーザの質問に対してAIモデルが徐々に回答をストリーミングで生成していく点ではないでしょうか。

このようなUIを実現するには、従来のWebアプリケーションの動作とは異なるアプローチが必要です。一般的な1HTTPリクエスト、1レスポンスのやりとりだけでは実現できません。

そこで登場するのがServer Sent Events(SSE)です。SSEを利用すると、サーバ側から継続的なデータをクライアントへ送信することが可能になります。

 

本記事では、人気のあるAIサービスと同様の体験を提供する、下記のようなWebアプリケーションの実装方法をご紹介します。

それでは、SSEがどのようにウェブ開発の新たな可能性を切り拓いているのかを理解し、その魅力に迫ってみましょう。

目次

SSEとは

SSEは、ウェブアプリケーションにおいて、サーバからクライアントへのリアルタイムなデータ伝送を可能にする手法です。一度接続を確立すると、サーバは同一コネクション内で、任意のタイミングでデータをクライアントに送信できます。クライアント側では、ブラウザのページの再読み込みなしに情報を受け取れるようになります。一方向な連続的な通信が必要な場面で優れた性能を発揮します。

チャットライクなアプリを作る場合は、SSEとWebSocketどちらを選択すべきでしょうか?これは要件によります。クライアントからも随時メッセージを送ったり、またはチャットグループへ同報通信させるような場合はWebSocketがよいでしょう。

今回は、クライアントからの1つのリクエストに対して、サーバ側がその応答として継続的にデータを送信するだけなので、SSEを選定しました。

構成

デモで見ていただいたアプリケーションの構成について簡単にまとめます。
  • クライアント
    - Webのブラウザのフォームを通じて質問をし、回答を受け取ります
    - JavaScriptのフレームワークとしてStimulusを使います
  • サーバ
    - 受け取った質問をサーバ側で受け取り、外部のAIモデル(今回はAWSのBedRockのClaudeを利用)のAPIを通じで回答を得ます
    - サーバサイドのフレームワークはRuby on Railsを使います

 

通信フローをもう一度確認します。

  1. クライアントがSSEを使うHTTPリクエストを送信
  2. サーバとの間でSSEコネクションが作られ、維持
  3. AIモデルが質問を処理し中間出力を生成
  4. 中間出力が生成されるたびに、サーバがSSEを使ってその出力をクライアントにストリーミング
  5. クライアント側で結果を画面に出力
  6. すべての回答が生成されたら、サーバがSSEコネクションクローズ

プログラム

ユーザの動作の起点から、プログラムのシーケンスに沿ってコードを見ていきましょう。ポイントを絞って解説していきます。

まずは、質問フォームのview部分です。
フォームのボタンを押すと、clickイベントが発動し、Javascriptで実装するgenerateメソッドを呼ぶことだけが分かればよいです(sse#generateのsseは、stimulusの任意につけられるコントローラ名です)。

app/views/chats/_form.html.erb

<div class="form">
  <label for="message">メッセージを入力</label>
  <textarea id="message" name="message" data-sse-target="question" rows="1" placeholder="Message" class="scrollbar"></textarea>

  <button type="submit" aria-label="送信" data-sse-target="button" data-action="click->sse#generate">

-snip-

  </button>
  <div class="text-small">Enter で改行 / Ctrl + Enter または Command + Enter で送信</div>
</div>

 

続いて、Javascriptのコントローラで定義したイベントが反応します。ある程度商用でも通用するように、フロントエンド側での処理を入れていますが、本質的な箇所のコードは多くありません。
generateメソッド内でサーバへ通信させていることが分かればよいです。

app/javascript/controllers/sse_controller.rb

import {
  Controller
} from "@hotwired/stimulus"

export default class extends Controller {
  connect() {
  }
  static targets = ["question", "button"]
  static values = {
    room: String,
    chat: String
  }

  async generate() {
    const content = this.questionTarget.value
    let message_id = null

    this.#disableSubmitButton()
    this.#writeQuestionMessage()

    const response = await fetch(`/rooms/${this.roomValue}/chats/${this.chatValue}/messages`, {
      method: "POST",
      cache: "no-cache",
      keepalive: true,
      headers: {
        "Content-Type": "application/json",
        "Accept": "text/event-stream",
      },
      body: JSON.stringify({
        message: {
          content: content
        }
      }),
    })

    const reader = response.body.pipeThrough(new TextDecoderStream()).getReader()
    this.#writeAnswerMessage(reader)
  }

-snip-

  async #writeAnswerMessage(reader) {
    const templateMessage = document.getElementById('template-message-you');
    let documentFragment = templateMessage.content.cloneNode(true);
    document.getElementById('messages').appendChild(documentFragment);

    this.#disableActionLinks()

    let lastMessageText = document.querySelector('.messages .message:last-child .text');

    while (true) {
      let {
        value,
        done
      } = await reader.read();
      if (!done) {
        lastMessageText.innerText = lastMessageText.innerText + value
        this.#scrollToBottom()
      } else {
        break
      }
    }

-snip-

実は本コードでは使っていないのですが、サーバ送信イベントAPIとしてEventSourceというWEB用のインターフェイスが公開されています。どのブラウザでも利用でき、コードも簡易になります。
利用をためらった理由は、メソッドがGETに限定されていたためです。POSTできない。あとは、接続開始、終了時の動作にカスタマイズ性を持たせたかったというのもあります。

そのためfetch APIを使い、ヘッダをSSE用に直接書き換えています。

"Accept": "text/event-stream"

Acceptは、HTTPリクエストヘッダの一部であり、クライアントがサーバに対して送信する要求の一部です。サーバに対してクライアントがイベントストリーム形式のデータを受け付けることを示します。

response.bodyはFetch APIを通して得られたレスポンスのボディを参照しています。レスポンスボディはbinary (バイナリ)データのストリームとして扱われます。

 

他の処理も、見慣れなさそうなところだけを少し補足します。
pipeThrough(new TextDecoderStream())は、そのストリームをデコードするためのパイプを設定します。

TextDecoderStreamは受信したバイナリデータをテキストに変換します。

getReader()は、そのデコードされたストリームを読み取るためのリーダーを取得します。このリーダーからストリーム内のデータを読み取ることができます。

 

※HTTPヘッダを書き換えるような処理をしたため、Javascriptのコードが少し多くなりましたが、Railsであれば、Turboが、そしてTurbo FramesがJavascriptを意識せずフロント画面をコントロールする仕組みを提供しています。一般的なアプリケーションであれば控えめなJavascriptで完結します。

 

続いてはサーバ側のコントローラです。

app/controllers/messages_controller.rb

class MessagesController < ApplicationController
  include ActionController::Live

  def create  
    -snip-

    query = (message_params)

    response.headers['Content-Type'] = 'text/event-stream'
    response.headers['Last-Modified'] = Time.now.httpdate

    Ai.new.chat(query) do |fragment|
      response.stream.write(fragment)
    end

    -snip-

  rescue ActionController::Live::ClientDisconnected
    response.stream.close
  ensure
    response.stream.close
  end

ここに注目してください。
response.headers['Content-Type'] = 'text/event-stream'
HTTPレスポンスヘッダの一部であるContent-Typeフィールドに、text/event-streamという値を設定しています。このヘッダは、クライアントに対してレスポンスの本文がどのような形式であるかを伝えます。

サーバ側でメッセージを保存管理する場合は、もう少し追加のコードが必要ですが(一般のサーバ構築のお作法です)、SSE部分はこれだけです。

 

最後はClaudeのAPIを扱うクラスです。
返却されるjsonの処理が理解しにくいかもしれませんが、返却値を公式ドキュメントに沿って加工しているだけです。

app/models/concerns/ai.rb

class Ai
  def initialize
    access_key_id = Rails.application.credentials.dig(:aws, :access_key_id)
    secret_access_key = Rails.application.credentials.dig(:aws, :secret_access_key)
    credentials = Aws::Credentials.new(access_key_id, secret_access_key)
    @client = Aws::BedrockRuntime::Client.new(region: "us-east-1", credentials: credentials)
  end

  def chat(query)
    body = {
      "max_tokens": 1000,
      "anthropic_version": "bedrock-2023-05-31",
      "messages": [
        {
          "role": "user",
          "content": [
            {
              "type": "text",
              "text": query
            }
          ]
        }
      ]
    }

    params = {
     "model_id": "anthropic.claude-3-haiku-20240307-v1:0",
     "content_type": "application/json",
     "accept": "application/json",
      body: body.to_json
    }


    @client.invoke_model_with_response_stream(params) do |stream|
      stream.on_error_event do |event|
        raise event 
      end
      stream.on_event do |event|
        hash_event = JSON.parse(event.bytes)
        case hash_event['type']
        when 'content_block_delta'
          fragment = hash_event['delta']['text']
          yield(fragment)
        when 'message_stop'
          return
        end
      end
    end
  end
end

まとめ

返答されるレスポンスを徐々に組み立てていくアプリケーションのUIを、実際にコードを書いて再現してみました。

本格的な商用サービスにするためには考慮する事項がまだまだたくさんありますが、UIの動きを真似てみる、だけであれば簡単に実現できることが分かりました。

リアルタイムに徐々に更新をかけていく複雑な処理ですが、HTTPの進化と、アプリケーション開発を支えるフレームワークのおかげで開発の敷居は決して高くないと思います。ぜひぜひ挑戦してみてください。

おすすめの記事

条件に該当するページがございません