バッチ処理 × FaaS で100倍速

2022年12月7日掲載

キービジュアル

本記事では、バッチ処理の一部(負荷がかかる部分)をクラウドのFaaS(Function as a Service)側にオフロードすることで、処理速度を高速化させる事例を紹介します。

大手クラウドベンダ(Alibaba Cloud、 Amazon Web Service(AWS)、 Azure、Google Cloud)であればFunctionサービスは備わっています。多少の使い勝手の相違あっても、どのクラウドの利用者にも応用ができる内容となっています。

目次

  • オフラインのバッチ処理をFunctionとのハイブリッドで高速化させる実装の紹介
  • なぜそのアプローチをとったのかの設計思想の共有

バッチ処理とは

バッチは、定期的にスケジューリングされたジョブ(タスクの集まり)が、何らかの出力を生成する目的で実行される処理です。ユーザとのインタラクションは一般的にはありません。オンラインシステムやストリーム処理のような即時性は求められません。

レスポンスタイム、レイテンシよりも、スループットが主にパフォーマンス値として使われます。

仕様

改善対象のバッチ処理の仕様を記載します。

  1. ユーザのアカウントIDを格納しているファイルを開封
    アカウントID数:1000人
  2. アカウントIDごとにタスクを呼び出し
    ・時間計算量:O(n) 一重のループ処理
  3. タスク
    ・入力:アカウントID
    ・出力:処理結果のJSON
    ・タスクの実行時間:30秒
    ・空間計算量:O(n) ※ただし、1GBのインメモリを消費

 

バッチを走らせるマシンのスペックは下記とします。
・  vCPU: 2
・ メモリ:4GiB
・ ネットワーク帯域:10Gbps

今回の処理で考慮が必要なスペックだけ記載しています。それ以外の性能値、例えばIOPSは潤沢でありボトルネックにならないと判断ください。

課題

現在のスループットを見積ります。

1000人 × 30秒 => 30000秒 ≒ 500 分

この時間を許容できる場合もあるでしょうが、我々の環境では、他業務システムで結果を利用する連携の都合が生まれ、改善の必要が出てきました。

その目標値は5分でした。プログラムを100倍速くしなければなりません。

各タスクが重たい処理ですが、そこは改善できないという条件で、プログラム全体のジョブとしての高速化を狙います。

現在のコード

プログラムの詳細に入っていきます。

accounts_fileはアカウントを格納しているただのテキストファイルです。

account0
account1
account2
account3
account4
・
・
・

 

この、アカウントファイルを開き、メモリに読み込みます。アカウント数が少ないため一度に読み込みます(また、この後の改善のための都合)。

class AccountManager
  def initialize
    @accounts_file = "accounts_file"
    @accounts = Queue.new
  end

  def read_accounts
    File.open(@accounts_file, "r") do |file|
      file.each do |line|
        account = line.chomp
         @accounts.push(account)
        end
    end
  end

  def perform_heavy_task(account)
    sleep 30
  end

  def perform_heavy_tasks
    until @accounts.empty?
      account = @accounts.pop
      perform_heavy_task(account)
    end
  end

end


manager = AccountManager.new
manager.read_accounts
manager.perform_heavy_tasks

perform_heavy_tasksメソッド内で、アカウントリストごとにperform_heavy_taskを実行していきます。タスクは簡易的にsleep処理で代替しています。また、エラー対応は省略しています。コード上は省略していますが、メモリは1GB消費することを忘れてはいけません。

改善 

1.  単一サーバでの高速化

現状の環境と仕組みを大胆に変えることなく、できることを検討します。

スレッド数をコントロールできる仕組みにし、並行度をあげてみます。

class AccountManager
  def initialize
    @accounts_file = "accounts_file"
    @accounts = Queue.new

    @thread_num = 3
    @mutex = Mutex.new
  end

  def read_accounts
    -snip-
  end

  def perform_heavy_task(account)
    -snip-
  end

  def perform_heavy_tasks
    threads = Array.new(@thread_num).map do
      Thread.new do
        until @accounts.empty?
          account = @accounts.pop
          perform_heavy_task(account)
        end
      end
    end

    threads.each(&:join)
  end

end

thread_numでスレッド数を変更できます。上記では3にしています。その数字弱の倍速はできます。ではこのままスレッド数を増やしていけばいいかというとノーです。

CPUのコア数以上に、メモリがきつくなります。それぞれのタスクは、メモリの消費にあわせてデータチャンクをディスクへ書き出す処理にはなっていません。そのタスク自体を維持したたまま、ジョブとしての改善をなんとか図らなければならない制約が今回はあります。

スケールアップすることで改善するでしょうか。これも単体サーバでは大きな改善は見込めません。

 

2. 一部の処理のみオフロード

タスクをスケールアウトできればよさそうですが、分散システム構成への移行は過剰です。単一サーバで実行させるプログラムの気軽さを維持したまま、タスクを分散させられないでしょか。

クラウドのFunctionサービスを活用し、perform_heavy_taskのみ、そこで稼働させる方針がよさそうです。

どのクラウドにもサーバレスのFaaSはそろっています。

  • Alibaba:Function Compute
  • AWS:AWS Lambda
  • AZure:Azure Function
  • Google Cloud :Cloud Functions

ローカルに元々あったperform_heavy_taskの処理だけをFunctionへ移譲します。
本記事では、AWSのLambdaを、言語やSDKに依存しないようにcurlコマンドで呼び出します。

  def perform_heavy_task(account)
    service = "lambda"
    region = "ap-northeast-1"
    profile = "myProfile"
    url = "https://xxx.lambda-url.ap-northeast-1.on.aws/"

    response = `curl -s -X POST \
      -H 'Content-Type: application/json' \
      --aws-sigv4 "aws:amz:ap-northeast-1:lambda" \
      --user "#{@aws_access_key_id}:#{@aws_secret_access_key}" \
      --max-time 600 \
      -d '{"account": "#{account}"}' \
      #{url}`

     response = JSON.parse(response)
  end

 

呼び出し先であるAWSのLambdaのコードです。
https://xxx.lambda-url.ap-northeast-1.on.aws/

require 'json'

def lambda_handler(event:, context:)
  sleep 30
  { statusCode: 200, body: JSON.generate('Hello from Lambda!') }
end

クラウド側ではその他、利用するメモリの指定と、同時実行数がクォータ制限に該当しないかを確認するだけです。
※同時実行数のクォータの開放も可能です。

あとは、クォータ以下の任意の数だけFunctionをコールし稼働させることができます。

ローカル環境のCPUに負荷はかかりませんし、IO待ちになりRubyのGVLも開放されます。

スレッド数をコントロールできるようにしていました。thread_numの値を一気に100にして並列化してみます。オフロードしたことでスレッドとしての理想的な動きをしてくれると思います。100倍速が達成できます。

ワークフローのオーケストレーションサービスは使いません。サンプルのプログラムは簡易的にしていますが、実際はより大きなバッチであることもあるためです。クラス設計から外れた関数単位の処理に分解し、視認性を悪くするよりは、手元のプログラムの大枠は維持し、最小限のタスクだけを外部化するという方針です。

設計の振り返り

ステップを振り返ります。まずは単一コンピュータで動くモノリシックな設計で動かし、その中で対処できることを試しました。

次のステップとして、Functionを使いスケールアウトさせました。要所は、最初の設計思想は維持しているところです。外部へのPOSTで一部の振る舞いをオフロードさせていますが、構成は密結合のままです。全アクションを細分化してマイクロサービス化するような方針転換をしていないということです。

今回はバッチのプログラムでしたが、フレームワークを利用したアプリケーションの開発などでも似たようなオフロード設計は流用できるでしょう。過剰に細分、独立、分散化したプログラムのオーケストレーションの複雑さを避けられます。

費用

このバッチ処理を毎日1回実行した場合の月額をAWSで算出してみます。
https://calculator.aws/#/addService/Lambda

  • アーキテクチャ: x86
  • リクエスト数:1000回 / 日
  • 各リクエストの実行時間: 30000ms
  • メモリ:1GB
  • ストレージ:512MB

無料利用枠なしで15.22 USDでした(無料利用枠使う場合は半額程度)。

まとめ

クラウドは企業やサービスインフラとして活用されていることが多いでしょうが、今回のように一部のサービスのみを利用して、既存のプログラムの改善の助けにも利用できます。

ソフトバンクでは、IaaSからSaaSまでの幅広い支援が可能です。クラウドへご関心を抱かれましたらぜひ相談ください。

関連サービス

Amazon Web Services (AWS)

ソフトバンクはAWS アドバンストティアサービスパートナーです。「はじめてのAWS導入」から大規模なサービス基盤や基幹システムの構築まで、お客さまのご要望にあわせて最適なAWS環境の導入を支援します。

Microsoft Azure

Microsoft Azureは、Microsoftが提供するパブリッククラウドプラットフォームです。コンピューティングからデータ保存、アプリケーションなどのリソースを、必要な時に必要な量だけ従量課金で利用することができます。

Google Cloud

Google サービスを支える、信頼性に富んだクラウドサービスです。お客さまのニーズにあわせて利用可能なコンピューティングサービスに始まり、データから価値を導き出す情報分析や、最先端の機械学習技術が搭載されています。

Alibaba Cloud

Alibaba Cloudは中国国内でのクラウド利用はもちろん、日本-中国間のネットワークの不安定さの解消、中国サイバーセキュリティ法への対策など、中国進出に際する課題を解消できるパブリッククラウドサービスです。

おすすめの記事

条件に該当するページがございません