本番環境でAmazon SQSを利用したマイクロサービス構築のプラクティス

2023年12月25日掲載

キービジュアル

ソフトバンクアドベントカレンダー25日目の記事を担当する森です。

普段はTASUKIというAIの学習データ作成支援事業の、AIのアノテーションツール(以下、TASUKIツール)の開発をメインに行っています。

TASUKIツールは、YOLOの様な画像のアノテーション作業をWeb上で行うことができるツールです。その性質上ツールには、日々大量の画像がアップロードされ、アノテーション作業が行われています。

それに伴いTASUKIツール全体の処理の高速化を行う必要があり、最近はAmazon SQSを利用したアプリケーションのマイクロサービス化と、並列化の大規模移行を行ってきました。

この記事は、その移行作業で得たマイクロサービス構築のナレッジのうち、Amazon SQS周りの汎用的なプラクティスについての共有記事となります。

目次

この記事では
  • Amazon SQS上で、初めてマイクロサービスを構築する方向けの記事となります
  • Amazon SQSを利用して商用サービスを構築する際に、アプリの実装で気をつけるべきことを知れます

Amazon Simple Queue Service(SQS)とは

Amazon Simple Queue Service(Amazon SQS)はAWSが提供する完全マネージドなメッセージキューイングサービスです。SQSのようなキューイングサービスを使用することで、アプリケーション間の接続を疎結合にできるため、マイクロサービスを構築する際に効果的であると言われています。

詳細は以下のドキュメントがわかりやすいのでご確認ください。本記事では、SQSに関する詳細な説明は行いません。

詳細: Amazon Simple Queue Service とは?  - Amazon Simple Queue Service

用語整理

本記事で頻繁に利用する用語を説明します。

用語概要

キュー

Amazon SQSでメッセージを格納する領域のこと

メッセージ

キューを介してやり取りするデータ

パブリッシュ

キューにメッセージを追加する処理

プロデューサー

マイクロサービスのうちキューにパブリッシュするアプリケーション

ポーリング

キューからメッセージを取得する処理

コンシューマー

マイクロサービスのうち、キューからポーリングするアプリケーション

キューの準備

Amazon SQSの実験環境として、Amazon SQS互換のOSSであるElastic MQを利用します。AWSのアカウントの準備など必要なく、ローカルで完結するため非常に便利です。

上記のdockerを利用するだけで、簡単にキューサーバを構築することができます。

まず設定ファイルの「custom.conf」を作成します。各種設定値は後ほど説明します。

 

elasticmq/custome.conf

queues {
    sample-queue {
    defaultVisibilityTimeout = 3600 seconds
    receiveMessageWait = 5 seconds
    fifo = false
    deadLettersQueue {
      name = "sample-queue-dead-letters"
      maxReceiveCount = 5
    }
    tags {}
  }
  sample-queue-dead-letters {
    defaultVisibilityTimeout = 3600 seconds
    receiveMessageWait = 5 seconds
    fifo = false
    tags {}
  }
}

その後「compose.yaml」を作成します。

compose.yaml
version: "3.9"

services:
  sqs-mock:
    image: softwaremill/elasticmq
    container_name: sqs_mock
    ports:
      - "9324:9324"
      - "9325:9325" # 0.0.0.0:9325 で管理画面にアクセス可能
    volumes:
      - ./elasticmq/custom.conf:/opt/elasticmq.conf:ro

後は以下のコマンドを実行するだけで、ローカルにAmazon SQSと同じ環境を構築することができます。


docker compose up -d

起動後は、以下のエンドポイントでキューを利用できます。

  • http://0.0.0.0:9324

管理画面も提供されています。以下のURLをブラウザで開くことで、キューの状態を閲覧できます。

  • http://0.0.0.0:9325

繰り返しにはなりますが、Elastic MQはAmazon SQS互換のOSSです。ElasticMQの各種項目と、それに対応するAmazon SQSの項目は以下の通りです。

Elastic MQの項目概要Amazon SQSの項目名
Approximate number of messagesコンシューマーがポーリング可能なメッセージ数利用可能なメッセージ
Approximate number of delayed messagesキューにあるが、ポーリング可能な状態になるまで待機中のメッセージ数(関連: DelaySecond)処理中のメッセージ (他のコンシューマーは利用できません)
Approximate number of not visible Messages既にポーリングされ、他のコンシューマーがポーリングできない状態のキュー遅延したメッセージ

AWS のAmazon SQSのコンソール上では「さらに表示」をクリックして表示される領域に、上記の項目が記載されています。

※「sample-queue-dead-letters」はデッドレターキューです。失敗したメッセージが全てこちらに送られます。

キューの設定値

キューの各種設定とプラクティスについて説明します。今回は特筆すべきものについてのみ解説します。

AWSの設定値

ElasticMQの設定値

概要

プラクティス

タイプ

fifo(=trueとしたときにFIFOキューとなる)

パフォーマンス優先のスタンダードキュー or 順序優先のFIFOキューを選択する

パフォーマンスかデータの厳密性か、どちらを優先するかによって選択する

可視性タイムアウト

defaultVisibilityTimeout

コンシューマーがキューのメッセージを取得したのち、他のコンシューマーがそのキューを取得できない秒数

コンシューマーの処理時間の最大以下の秒数を設定

デッドレターキュー

deadLetterQueue

コンシューマーが指定回数以上ポーリングしたメッセージが送られるキュー。

処理に失敗したメッセージの置き場。

必ず設定する

タイプ

タイプでは、Amazon SQSのキューの種類を以下の2つから選択します。

キューの種類

概要

スタンダードキュー

スループット無制限,順序保証なし

FIFOキュー

スループット30,000トランザクション/秒,順序と重複保証

性能に関しては、スタンダードキューが優位です。スタンダードキューはスループットが無制限なのに対し、FIFOキューは1秒間に3000トランザクションと決まっています。

その一方でFIFOキューには、データの以下のメリットがあります。

  • 重複排除
  • 順序保証
  • 確実に1回保証

FIFOキューは、MQTTなどの「QoS(2)=Exactly once」と似た特徴を持ちます。主に以下の要件を必要とするケースでは、FIFOを指定するのが推奨されています。

  • 厳密に順序を保証したい
  • データを確実に1回のみ送信したい
  • 高パフォーマンスが求められない

一方でFIFOは、大規模処理には向いていません。上述しましたが、秒間トランザクション数が30,000までと決まっているため、FIFOキューで大規模処理を行う際は注意が必要です。

今回は性能を重視するとして、スタンダードキューを選択しています。

可視性タイムアウト

可視性タイムアウト(DefaultVisibilityTimeout)は、コンシューマーがキューのメッセージを取得してから、他のコンシューマーがそのメッセージを取得できない期間です。

例えば可視性タイムアウトの設定を「DefaultVisibilityTimeout=0」としたキューでは、コンシューマーが2つ存在する時、その2つのコンシューマがどちらも同じメッセージを取得してしまいます。

そのため可視性タイムアウトの秒数は、コンシューマの処理時間より少し長い時間に設定する必要があります。

注意する点として、設定時間を長くしすぎないことです。DefaultVisiblityTimeoutが長すぎる場合、コンシューマが処理に失敗した時、そのメッセージがリトライされるまでの時間が延びてしまいます。

逆に短すぎると、並列化した際に処理が重複する可能性があります。

コンシューマーの処理時間を理解し、ちょうど良い時間を設定することが大事です。

参考: Amazon SQSメッセージの操作 - Amazon Simple Queue Service

記事執筆時点の可視性タイムアウトの最大は12時間(43,200秒)です。これを超える処理はなかなかないとは思いますが、上限を超える場合は以下のどちらかの対策の必要があります。

  • 処理の最中に可視性タイムアウトを延長する
  • SQSを利用しない

デッドレターキュー

デッドレターキューは、処理に失敗したメッセージを保存するキューになります。処理に失敗とは、メッセージはポーリングされたのに、キューから削除されていない状態です。

メッセージがデッドレターキューに移動する主な条件は、以下の2つです。

  • 「最大受信回数」以上同じメッセージをポーリングしている
  • 「メッセージ保持期間」の時間以上メッセージがキューに残っている

デッドレターキューは、メッセージの失敗を検知しやすくするだけでなく、失敗するメッセージの無駄なリトライを防いでくれます。

デッドレターキューを設定しない場合、失敗するメッセージがキューに残り続けてしまいます。

しかし、デッドレターキューを設定することで、「最大受信数」を超えたメッセージはキューから取り除かれます。(下記の図参照)

それにより、コンシューマーの無駄なリトライ処理を防いぎ、処理の遅延やAPIの呼び出し回数増加によるコスト増を防げます。

Amazon SQSのプラクティスのまとめ
  • キュータイプは要件に合わせて選ぶ
  • スループット優先の場合のスタンダードキュー、信頼性と順序保障を優先するのであればFIFOキューを選択
  • 可視性タイムアウト(DefaultVisibilityTimeout)は処理の特性に合わせて慎重に検討する
  • デッドレターキュー必ず設定して、コストと無駄なリトライ処理を削減

環境準備

実装するアプリケーションは以下の環境での動作を確認しています。

  • Python: 3.10.9

※TASUKIツールはPythonではなくC#で実装されています。ユーザ数が多く、ソースコードをシンプルに実装できる為、今回はPythonを利用しました。

必要ライブラリのインストール

Amazon SQSを利用するためにboto3のライブラリをインストールします。


pip install boto3==1.34.2

boto3で利用可能なSQSの各種APIの詳細は、以下の公式ドキュメントを参照してください。

参考: SQS - Boto3 1.34.2 documentation

プロデューサーの実装

キューにメッセージを送信するプロデューサーを実装します。

producer.py

import boto3
import json

ELASTIC_MQ_HOST = "http://0.0.0.0:9324"  # ElasticMQのホスト名
SAMPLE_QUEUE_NAME = "sample-queue"  # 送信先のキュー名、custom.confで定義したものを入力する
ACCOUNT_ID = "000000000000"  # ElasticMQの場合はこの値で固定
MESSAGE_SEND_COUNT = 15  # 送信するメッセージの数

# SQSクライアントの作成
session = boto3.session.Session()
sqs = session.client(
    "sqs",
    endpoint_url=ELASTIC_MQ_HOST,  # SQSのエンドポイントを指定
    region_name="us-east-1",  # ElasticMQはリージョン,資格情報ををチェックしないが、boto3に必要なので適当に指定
    aws_access_key_id="x",
    aws_secret_access_key="x",
    verify=False,
)

# Queue URLの作成
queue_url = f"{ELASTIC_MQ_HOST}/{ACCOUNT_ID}/{SAMPLE_QUEUE_NAME}"
print(f"Queue URL: {queue_url}")

# メッセージの作成
entities = []
for i in range(MESSAGE_SEND_COUNT):
    entities.append(
        {
            "Id": str(i),
            "MessageBody": "Message {}".format(i),
            "DelaySeconds": 0,  # 0秒後(=待機時間なし)でキューに入れる
        }
    )

# メッセージの送信
# 10件ずつ送信する
for i in range(0, len(entities), 10):
    response = sqs.send_message_batch(
        QueueUrl=queue_url,
        Entries=entities[i : i + 10],
    )
    print(f"Message sent result: {json.dumps(response, indent=2)}")

このプロデューサーはメッセージを15件、キューに送信する処理を行います。実行は以下のコマンドで行います。

$ python producer.py
Queue URL: http://0.0.0.0:9324/000000000000/sample-queue
# 最初の10件の送信
Message sent result: {
  "Successful": [
    {
      "Id": "0",
      "MessageId": "bac62486-cd75-400c-aa61-c0a667efe29b",
      "MD5OfMessageBody": "7818984006dd3cceda6a5501c43966ff"
    },
    {
      "Id": "1",
      "MessageId": "7b6ae185-b303-4d8b-b34a-569ba407de3f",
      "MD5OfMessageBody": "68390233272823b7adf13a1db79b2cd7"
    },
    {
      "Id": "2",
      "MessageId": "d9ea0a31-f1d4-4ae6-b776-27ac9344bda4",
      "MD5OfMessageBody": "88ef8f31ed540f1c4c03d5fdb06a7935"
    },
     ~~~省略~~~~
    {
      "Id": "9",
      "MessageId": "1e04bcd6-e5f7-4ca4-a459-7c3f61d2ff58",
      "MD5OfMessageBody": "61285229f8b8d5963f182e986b71b413"
    }
  ],
  "ResponseMetadata": {
    "RequestId": "\n              00000000-0000-0000-0000-000000000000\n            ",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "server": "akka-http/10.2.10",
      "date": "Wed, 20 Dec 2023 14:20:42 GMT",
      "content-type": "text/xml; charset=UTF-8",
      "content-length": "2956"
    },
    "RetryAttempts": 0
  }
}
# 次の5件の送信
Message sent result: {
  "Successful": [
    {
      "Id": "10",
      "MessageId": "72dfcc3d-2364-44a7-8b60-8aca67c6a551",
      "MD5OfMessageBody": "bb80cf0fc6dc8b0d2d3257fb2ce82aea"
    },
    ~~~省略~~~~
    {
      "Id": "14",
      "MessageId": "ac03167b-ed6f-42d0-8dba-4c2460d5dd57",
      "MD5OfMessageBody": "2ce065deeefcb3f5920c2bb0600bdd64"
    }
  ],
  "ResponseMetadata": {
    "RequestId": "\n              00000000-0000-0000-0000-000000000000\n            ",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "server": "akka-http/10.2.10",
      "date": "Wed, 20 Dec 2023 14:20:42 GMT",
      "content-type": "text/xml; charset=UTF-8",
      "content-length": "1661"
    },
    "RetryAttempts": 0
  }
}

正常にメッセージをパブリッシュできました。「http://0.0.0.0:9325/」にアクセスして確認すると、「sample-queue」の「Approximate number of messages」の項目に15件のデータが挿入されていることが確認できます。

プロデューサー実装の際のプラクティスは、ソースコード中のコメント①②の2つです。

  • ①DelaySecondの設定
  • ②メッセージのバッチ送信

①DelaySecondの設定の設定

DelaySecondは、プロデューサーがメッセージをパブリッシュした後、そのメッセージをコンシューマーがポーリングできる様になるまでの待機期間です。

今回はこの値に0を設定しています。0を設定することで、キューにパブリッシュされたデータに対して、コンシューマは即座にポーリング可能になります。それにより処理の高速化が期待できます。

しかし、高速化のため常に0を設定すれば良いわけではありません。メッセージの処理が別のコンポーネントに依存している場合、DelaySecondの調整の必要があります。

その例として、ファイルシステムへのI/Oがあります。ファイルシステムへの書き込み処理は、ファイルシステムの状況に応じて想定外の遅延が発生する時があります。

それにより例えば以下のケースが想定されます。

  1. プロデューサーがファイルシステムに書き込み
  2. プロデューサーが「DelaySecond=0」でキューにメッセージをパブリッシュ
  3. コンシューマーがメッセージを取得
  4. コンシューマーが1のファイルを参照して処理しようとしたが、ファイルの書き込みが完了しない為、File Not Foundが発生。

そういった時は、メトリクスからファイルシステムの書き込みの最大時間を確認し、ポーリングの待機に現実的な時間をDelaySecondで指定します。それにより想定外のエラーの発生を抑制できます。

②メッセージのバッチ送信

SQSにメッセージをパブリッシュする場合、1件ずつパブリッシュする方法と、複数件まとめてパブリッシュする方法の2つがあります。コストの観点から、1件ずつの処理より、複数件まとめて処理をすることが推奨されています。

参考: Amazon SQSコストを削減する - Amazon Simple Queue Service

SQSの課金体系はAPIのリクエスト数です。例えば10件のデータを1件ずつ送信するのと、10件まとめて送信するのでは、後者の方がAPIのリクエスト数を1/10に減らせます。そのため特別な要件がない場合は、基本的に複数件まとめて処理するAPIを利用しましょう。

記事執筆時点での送信の最大は10件までです。

参考: メッセージに関連するクォータ - Amazon Simple Queue Service

プロデューサーのプラクティスのまとめ
  • DelaySecondは処理の特性に合わせて適切な時間を設定する
  • メッセージは10件ずつ送信し、料金を節約する

コンシューマーの実装

キューからメッセージをポーリングするコンシューマーの実装です。

consumer.py
import random
import time
import boto3
import json

ELASTIC_MQ_HOST = "http://0.0.0.0:9324"  # ElasticMQのホスト名
SAMPLE_QUEUE_NAME = "sample-queue"  # 送信先のキュー名、custom.confで定義したものを入力する
ACCOUNT_ID = "000000000000"  # ElasticMQの場合はこの値で固定
MESSAGE_RECEIVE_COUNT = 10  # 受信するメッセージの数

# SQSクライアントの作成
session = boto3.session.Session()
sqs = session.client(
    "sqs",
    endpoint_url=ELASTIC_MQ_HOST,  # SQSのエンドポイントを指定
    region_name="us-east-1",  # ElasticMQはリージョン,資格情報ををチェックしないが、boto3に必要なので適当に指定
    aws_access_key_id="x",
    aws_secret_access_key="x",
    verify=False,
)

# Queue URLの作成
queue_url = f"{ELASTIC_MQ_HOST}/{ACCOUNT_ID}/{SAMPLE_QUEUE_NAME}"
print(f"Queue URL: {queue_url}")

# コンシューマー処理
response = sqs.receive_message(
    QueueUrl=queue_url,
    MaxNumberOfMessages=MESSAGE_RECEIVE_COUNT,  # ①バッチ処理: 10件ずつメッセージを取得
    WaitTimeSeconds=20,  # ②ロングポーリング: 20秒間メッセージを取得できなかったら、空のレスポンスを返す
)

if "Messages" in response:
    remaining_messages = response["Messages"].copy()
    print(f"受信したメッセージの数: {len(remaining_messages)}")
    for message in response["Messages"]:
        try:
            time.sleep(1)
            # ランダムでエラーを発生させる
            if random.randint(0, 9) == 0:
                raise Exception("エラーが発生しました。")

            print(f"受信したメッセージの結果: {json.dumps(message, indent=2)}")

            # ③メッセージを処理した後、再処理しないようにキューから削除
            receipt_handle = message["ReceiptHandle"]
            sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)

            # messagesから削除
            remaining_messages.remove(message)
        except Exception:
            print("エラーが発生しました。メッセージをキューに返却します。")
            # ④エラーが発生した場合にメッセージをキューに返却
            sqs.change_message_visibility(
                QueueUrl=queue_url,
                ReceiptHandle=message["ReceiptHandle"],
                VisibilityTimeout=0,  # 0秒後に再処理する
            )
            # messagesから削除
            remaining_messages.remove(message)
        except KeyboardInterrupt:
            print(
                f"キーボード割り込みが発生しました。 処理中のメッセージをキューに返却します: {[message['MessageId'] for message in remaining_messages]}"
            )

            # ⑤強制終了(KeybordeInterrupt)が発生した場合に未処理のメッセージをキューに返却
            entities = [
                {
                    "Id": message["MessageId"],
                    "ReceiptHandle": message["ReceiptHandle"],
                    "VisibilityTimeout": 0,  # 0秒後に再処理することで即座に他のコンシューマーが処理できるようにする
                }
                for message in remaining_messages
            ]
            response = sqs.change_message_visibility_batch(
                QueueUrl=queue_url, Entries=entities
            )
            print(f"{len(entities)}件のメッセージをキューに返却しました。")
            exit(1)

 

consumer.pyを実行し、キューからメッセージを取得します。

$ python consumer.py
Queue URL: http://0.0.0.0:9324/000000000000/sample-queue
受信したメッセージの数: 10
~~~受信したメッセージを表示する処理 ~~~
受信したメッセージの結果: {
  "MessageId": "b0f45517-d594-41de-8733-6325d072d052",
  "ReceiptHandle": "b0f45517-d594-41de-8733-6325d072d052#2cc4d02a-f3a6-4b8d-a736-529539497592",
  "MD5OfBody": "7818984006dd3cceda6a5501c43966ff",
  "Body": "Message 0"
}
~~~~省略~~~~~
受信したメッセージの結果: {
  "MessageId": "e0380aee-db29-42ba-96c3-ff6e95534fbc",
  "ReceiptHandle": "e0380aee-db29-42ba-96c3-ff6e95534fbc#dcaece33-d9c9-407f-abf8-aa7cc3a80ae3",
  "MD5OfBody": "335f063c95cf6bcca8694a36fcd51103",
  "Body": "Message 3"
}
エラーが発生しました。メッセージをキューに返却します。
受信したメッセージの結果: {
  "MessageId": "5e9aef66-93f5-4d09-919d-e25696a43efc",
  "ReceiptHandle": "5e9aef66-93f5-4d09-919d-e25696a43efc#11c3deba-5c1f-4490-8830-a0f641ab8369",
  "MD5OfBody": "73d41674b56a4dcf7b700a0a45a0a6b1",
  "Body": "Message 8"
}
受信したメッセージの結果: {
  "MessageId": "20013486-1f2f-4193-a3f2-423ddb0ad5e5",
  "ReceiptHandle": "20013486-1f2f-4193-a3f2-423ddb0ad5e5#c10d2354-21a9-4fa5-88fd-bca174c7abc0",
  "MD5OfBody": "68ee3fbec6195f397d7f696599dd278a",
  "Body": "Message 4"
}
~~~~省略~~~~~
": "Message 5"
}
受信したメッセージの結果: {
  "MessageId": "dcdb56ea-28ff-4bbd-8c7f-eeddd8c9ad88",
  "ReceiptHandle": "dcdb56ea-28ff-4bbd-8c7f-eeddd8c9ad88#7cffc667-c89c-4e05-b272-488b3c58820a",
  "MD5OfBody": "05d2a129ebdb00cfa6e92aaf9f090547",
  "Body": "Message 6"
}

指定した通り10件のメッセージを取得して、コンソールに出力しています。また10%の確率でエラーを発生させており、エラーが発生したメッセージはキューに返却しています。

再度「http://0.0.0.0:9325/」を確認すると、15件のメッセージが6件まで減っていることが確認できます。これは10件取得したものに対して、エラーで返却されたメッセージが1件あるため、15 - 10 + 1 = 6件となっています。

また後述する④のプラクティスにより、エラーが起きても、他のキューがリトライできない状態のメッセージの数(Approximate number of not visible Messages)が0件となっています。

これにより、失敗したメッセージも即座にリトライすることが可能です。

コンシューマ実装の際のプラクティスは、ソースコード中のコメント①②③④⑤の5つです。

  • ①バッチ処理
  • ②ロングポーリング
  • ③メッセージを処理した後、再処理しないようにキューから削除
  • ④エラーが発生した場合にメッセージをキューに返却
  • ⑤強制終了(KeybordeInterrupt)が発生した場合に未処理のメッセージをキューに返却

①バッチ処理

コンシューマーと同じく、ポーリングを10件まとめてリクエストしています。これによりAPIのリクエスト回数を減らせます。

⑤で後述しますが、メッセージの可視性タイムアウトを更新する処理でも「change_message_visibility_batch」でバッチ処理をおこなうことで、APIの呼び出し回数を抑えています。

ポーリング件数の最大値も、記事執筆時点では最大10件です。

②ロングポーリング

ロングポーリングは、コンシューマーがメッセージを取得するとき、メッセージを1件以上取得できるまで、WaitTimeSecondsで指定した時間だけ待つ設定のことです。

例えば「WaitTimeSeconds = 20」とすると、ポーリング処理をリクエストしたときにキューのメッセージが0件でも、20秒間キューにメッセージが送られるのを待つことができます。

ロングポーリングを設定しない場合(ショートポーリング)と比較して、APIの呼び出し回数を圧倒的に減らすことができます。

公式ドキュメントでは、ほとんどの場合でReceiveMessageの待機時間を、設定の上限値20秒に設定することを推奨しています。

参考: Amazon SQSメッセージの操作 - Amazon Simple Queue Service

③メッセージを処理した後、再処理しないようにキューから削除

これはプラクティス、というよりもSQSを初めて利用する時に忘れがちなのであげています。

まずキューのメッセージは、コンシューマーがポーリングした段階ではまだキューに残っています。前述した可視性タイムアウトで指定した期間、他のコンシューマーがポーリングできないだけで、その時間を過ぎたら、再度ポーリングすることができます。

そのためポーリング後、メッセージに紐づく処理が完了した場合は、コンシューマーが責任を持ってメッセージを削除する必要があります。

逆に「削除されない = 処理が失敗している」を意味するため、削除されなかったメッセージはキューに戻され、可視性タイムアウトで指定した期間が過ぎた後、再度リトライ可能な状態になります。

このようにリトライ処理を作り込まなくても、自動でリトライ可能な状態を維持してくれるのはキューの利点でもあります。

④エラーが発生した場合にメッセージを即座キューに返却

③で、エラーが発生時「可視性タイムアウトで指定した期間が過ぎた後」リトライ可能と説明しました。そのため、処理にかかる時間が非常に長く可視性タイムアウトに長い時間を指定する必要がある場合、メッセージのリトライに遅延が発生してしまいます(下記図左)。

それを防ぐために、エラーが発生したらExceptionをキャッチし、「change_message_visibility」で「VisibilityTimeout=0」に設定しています(上記図右)。これにより、キューの「DefaultVisibilityTimeout」の待機時間が0になり、コンシューマが即座にポーリングできる状態となります。

consumer.py

                ~~省略~~~
                # ランダムでエラーを発生させる
                if random.randint(0, 9) == 0:
                    raise Exception("エラーが発生しました。")
                ~~省略~~~
            except Exception:
                print("エラーが発生しました。メッセージをキューに返却します")
                # ④エラーが発生した場合にメッセージをキューに返却
                sqs.change_message_visibility(
                    QueueUrl=queue_url,
                    ReceiptHandle=message["ReceiptHandle"],
                    VisibilityTimeout=0,  # 0秒後に再処理する
                )
                # messagesから削除
                remaining_messages.remove(message)

この設定を入れることで、処理時間が長いタスクをSQSで実装し、処理が途中で失敗した場合も、即座にリトライすることができます。そして、エラーの影響反映を最小に抑えることができます。

⑤強制終了(keyboardinterrupt)が発生した場合に未処理のメッセージをキューに返却

④と似ていますが、④は「コンシューマーの処理でエラーが発生したケース」に対して、こちらは「コンシューマーのアプリケーションそのものが終了したケース」です。

想定外のエラーやCPU使用率の上昇で、コンシューマアプリケーションが強制終了してしまうことはよくあります。またKubernetesやECSのような、コンシューマがスケールする環境で稼働している場合、オートスケール機能により、処理中のコンシューマーが縮退されてしまうこともあるでしょう。そういったコンシューマそのものの終了時に、コンシューマーがポーリング済みのメッセージを何件か保持していた場合、そのメッセージは破棄され、可視性タイムアウトで指定した時間だけ処理されなくなってしまいます。

その場合、アプリの停止直前に、未処理の全てのメッセージのVisibilityTimeoutを0に戻す必要があります。

それを実現するため以下の2つです。

  • 「remaining_messages = response["Messages"]」で未処理のメッセージを管理
  • 正常終了とエラー終了時に「remaining_messages.remove(message)」で未処理のメッセージを削除

「remaining_messages」で未処理のメッセージを管理することで、コンシューマの強制終了(今回は「keyboardinterrupt」を強制終了として定義)の際に、未処理のメッセージを全てキューに返却することが可能となります。

④と同じく、処理時間が長いタスクも大幅な遅延なくリトライすることが可能となります。

実験してみます。始めにキューにメッセージを15件登録します。

$ python producer.py
~~出力は省略~~~
$ python consumer.py
Queue URL: http://0.0.0.0:9324/000000000000/sample-queue
受信したメッセージの数: 10
受信したメッセージの結果: {
  "MessageId": "7ff627ab-a6da-4016-a289-94b878587f2f",
  "ReceiptHandle": "7ff627ab-a6da-4016-a289-94b878587f2f#ec3fc716-6647-4000-8001-03785a077ef2",
  "MD5OfBody": "7818984006dd3cceda6a5501c43966ff",
  "Body": "Message 0"
}
受信したメッセージの結果: {
  "MessageId": "47d23fb3-13f5-46cc-9e17-1b26e2f6b53f",
  "ReceiptHandle": "47d23fb3-13f5-46cc-9e17-1b26e2f6b53f#c29c0b0f-4b8e-4d75-a479-7633ebff9592",
  "MD5OfBody": "68390233272823b7adf13a1db79b2cd7",
  "Body": "Message 1"
}
~~~ 2件取得したところで強制終了する。~~~
^Cキーボード割り込みが発生しました。 処理中のメッセージをキューに返却します: ['10d4a3da-71af-46b4-b7c9-1c4cd237c46f', 'd5e6989f-d3f1-4c55-b0b2-0cab40104c41', '4e489ec0-d929-467d-9859-e959db9ab890', '997771a3-288e-4107-bc8c-08900659df04', 'db250997-9ddf-47f7-858a-ca8c61ba4f3a', '5ed6e8fc-033d-4345-afb7-adad16c09de4', '39fb3a79-abaa-4bcb-a3cf-8050347d34a3', '7bf8501a-8f95-4d27-9176-d086e8a330f9']
8件のメッセージをキューに返却しました。

2件取得したところで強制終了してみます。8件がキューに戻されました。ダッシュボードを見ても「Approximate number of messages」に13件(2件のみ処理された状態)となっています。

この処理を入れることで、コンシューマーが強制終了しても、即時リトライ可能な状態を実現することができました。

※consumer.pyはサンプルです。商用アプリケーションの場合はKeyboardInterruptではなく、利用しているフレームワークのイベントハンドラを利用しましょう

コンシューマのプラクティスのまとめ
  • メッセージを処理したらキューから削除
  • ポーリングも10件単位で行い料金節約
  • ロングポーリングはほぼ必ず設定する
  • 即時性を求められるアプリケーションは、エラーや強制終了が発生したらキューにメッセージを戻す処理を組み込む

まとめ

この記事では、SQSを利用してマイクロサービスを開発する上でのプラクティスを共有しました。

SQSは調べれば調べるほど奥が深く、実はもっと説明したい機能がいっぱいあります。Simple Queue Serviceと言いながらシンプルじゃないじゃん!と思いつつ、マイクロサービス構築において、困ってることを即座に解決してくれる機能の引き出しには感動しています。

私たちもより良いアーキテクチャを模索してる段階なので、「私たちはこう使ってマイクロサービス構築してるよ!」とかあればぜひともご連絡下さい!

このSQSを利用したプラクティスは、TASUKIツールに実装済みです。私たちTASUKIエンジニアチームは、お客様に最高のデータを提供すべく日々切磋琢磨し、新技術にも臆さず挑戦しています。

TASUKIでは、学習データアノテーションの代行サービスに加え、RAG向けのデータ構造化サービスを提供しています。

TASUKIチームの技術やChatGPTやRAGの精度向上にご興味がある方は、ぜひご相談ください。

関連サービス

TASUKI Annotation

AI開発を加速する最高品質のアノテーション代行サービス。

自動アノテーション技術で迅速・安価に高品質な凝視データを作成。データ収集・撮影にも対応。

TASUKI Annotation生成AI用データ構造化サービス

検索拡張生成(RAG)の検索精度向上のためデータ構造化を支援いたします。

おすすめの記事

条件に該当するページがございません