Databricks で実現するリアルタイム売上集計
Delta Live Tables (DLT) の実践活用ガイド

2026年6月19日 掲載

governance

迅速かつ的確な意思決定には、リアルタイムなデータ分析が不可欠です。とくに売上のような主要ビジネス指標は、即時の可視化と集計が求められます。

本記事では、Databricks の強力な機能である Delta Live Tables(DLT) を活用し、AWS S3 上の CSVファイルから商品ごとの総売上をリアルタイムに集計する方法をご紹介します。

目次

Databricksとは?

Databricks は、Apache Spark を基盤とする統合データ分析プラットフォームです。主要なクラウドサービス(AWS、Azure、Google Cloud など)上で動作し、データ処理、分析、機械学習までを一貫して実行できる環境を提供します。

AWS などのクラウドサービスは、仮想マシン、ストレージ、ネットワークなどのインフラ基盤を支えます。
Databricks は、こうしたクラウドインフラ上に構築され、データの処理・分析・活用に特化したアプリケーション層のプラットフォームとして機能します。

大きな特長として、これまで個別に構築・管理されていたバッチ処理、ストリーミング処理、BI 分析、AI / 機械学習のワークロードを、Databricks では単一のプラットフォーム上でシームレスに統合できる点が挙げられます。これにより、データ活用のあらゆるフェーズを効率的に進めることが可能になります。

Delta Live Tables(DLT)とは?

Delta Live Tables(DLT)は、Databricks の機能の一つで、データ処理のパイプラインを簡単に作れるサービスです。

データパイプラインとは、例えば「色々な場所にあるデータを集めてきて、必要な形に加工して、使いやすいように保存する」といった一連のデータ処理の流れのことです。

DLT を使うと、この複雑なデータパイプラインも簡単に構築できます。しかも、データ同士のつながり(依存関係)を DLT が自動で理解し、最適な順番で処理を実行してくれます。

本記事の目的

「販売データをリアルタイムにクレンジング・集計し、商品ごとの売上ランキングを作成する」データパイプラインを構築します。

 

(対象データ)

  • 商品マスタ(PRODUCT):商品ID・価格を格納
  • 販売トランザクション(TRANSACTION):商品ID・数量・日付を記録

ER図で記載すると、この関係にあります。

AWS S3上のファイルの配置場所(パス)は下記とします。

s3://xxx/sales/
├── products/             # 商品データ(product01.csv など)
└── transactions/        # 販売トランザクションデータ(transaction01.csv など)

 

商品データ例

product_id

price

P001

100

P002

250

P003

150

 

トランザクションデータ例

transaction_id

product_id

quantity

sale_date

T000001

P001

5

2025-06-01

T000002

P002

3

2025-06-01

T000003

P002

2

2025-06-02

T000004

P003

7

2025-06-02

T000005

P001

3

2025-06-03

 

パイプライン設計

Databricks 上での、完成後のパイプライングラフを先にお見せします。

  • products_raw / transactions_raw:S3からの生データ取り込み
  • invalid_transactions:不正なデータの検出と切り出し
  • valid_transactions:正常なトランザクションデータの抽出
  • aggregated_sales:売上金額および販売数量の集計結果

データの品質が、ETL処理を通して、ブロンズ(Bronze)、シルバー(Silver)、ゴールド(Gold)となるメダリオンアーキテクチャを意識してみました。
 

実装手順

Databricks には多くの便利な機能がありますが、開発の進め方はローカル環境での作業に近い感覚で行えます。クラウドのリソースを従えた、自分専用の開発環境と考えるとイメージしやすいでしょう。

以後、解説のためにプログラムを分けて説明していきますが、その通りにセルを分割する必要はありません。上図の通り、1セル内に貼り付けても問題ありません。

 

スキーマ定義

CSVファイルの形式に合わせたスキーマを定義します。

# 商品マスタのスキーマ
products_schema = StructType([
    StructField("product_id", StringType(), True),  # 商品ID
    StructField("price", IntegerType(), True)       # 単価
])

# 販売トランザクションのスキーマ
transactions_schema = StructType([
    StructField("transaction_id", StringType(), True),  # トランザクションID
    StructField("product_id", StringType(), True),      # 商品ID
    StructField("quantity", IntegerType(), True),       # 売れた数量
    StructField("sale_date", StringType(), True)        # 販売日
])

 

Auto Loader で S3 からの取り込み

Auto Loader は、クラウドストレージ上の新着ファイルを自動的に検知し、ストリーミング的に取り込むことができる Databricks の機能です。大量データの継続的な取り込みにも適しています。

@dlt.table(
    name="products_raw"
)
def products_raw():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("cloudFiles.allowOverwrites", "true")
        .schema(products_schema)
        .load("s3://***/sales/products/")
    )


@dlt.table(
    name="transactions_raw",
    table_properties={"pipelines.reset.allowed": "false"}
)
def transactions_raw():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(transactions_schema)
        .load("s3://***/sales/transactions/")
    )

 

不正データの検出と切り出し

データの品質を担保するために、不正なデータ(NULL値や0以下の値など)を検出し、別テーブルに保存します。

# 不整合データを検出する関数
def detect_invalid_data(df):
    invalid_df = df.filter(
        col("product_id").isNull() |       # 商品IDがNULLの場合
        col("quantity").isNull() |         # 数量がNULLの場合
        (col("quantity") <= 0) |           # 数量が0以下の場合(括弧追加)
        col("price").isNull() |            # 単価がNULLの場合
        (col("price") <= 0)                # 単価が0以下の場合(括弧追加)
    )
    return invalid_df

# 不整合データを別テーブルに切り出し
@dlt.table(
    name="invalid_transactions"
)
def invalid_transactions():
    products_df = dlt.read("products_raw")
    transactions_df = dlt.read("transactions_raw")

    # 結合後に不整合データを検出
    merged_df = (
        transactions_df.join(products_df, on="product_id", how="inner")
    )
    invalid_df = detect_invalid_data(merged_df)

    return invalid_df

 

正常データの抽出

先ほどのフィルタリング条件を逆転させ、正常なデータのみを抽出します。

@dlt.table(name="valid_transactions")
def valid_transactions():
    merged_df = dlt.read("transactions_raw").join(dlt.read("products_raw"), "product_id")
    return merged_df.filter(
        col("product_id").isNotNull() &
        col("quantity").isNotNull() & (col("quantity") > 0) &
        col("price").isNotNull()    & (col("price") > 0)
    )

 

売上集計

最後に、正常なデータを基に、商品ごとの売上金額と販売数を集計します。

@dlt.table(name="aggregated_sales")
def aggregated_sales():
    return (
        dlt.read("valid_transactions")
        .groupBy("product_id")
        .agg(
            expr("sum(quantity) as total_quantity"),
            expr("sum(quantity * price) as total_revenue")
        )
        .orderBy(col("total_revenue").desc())
    )

以上を Notebook に貼り付けて適当な名前で保存しておきます。

 

ワークフローの登録

ワークフロー > パイプラインの設定画面で、作成した Notebook を指定するだけで DLT の実行準備が完了します。

パイプラインはデータ処理の本体を定義するものです。
これを定期実行したい場合は、Databricks のワークフロー機能を使ってスケジューリングします。

オプションとしては、まずは下記辺りを気にしておけばいいでしょう。

(パイプラインのトリガーモード)

  • Triggered :手動で起動する場合
  • Continuous :ストリーム処理を継続的に実行する場合(S3への新規CSV追加時にも自動反映)

 

あとは、クラスタのワーカー数などのパラメータで調整できます。処理速度と課金のバランスを見て選択してください。

集計結果の確認

集計結果は SQL を使って見てみましょう。これも新規に Notebook を開き、SQL を発行するだけです。

select * from catalog00.schema00.aggregated_sales

※ catalog00.schema00.aggregated_sales の catalog や schema は、ご自身の Databricks 環境に応じて読み替えてください。catalog はデータベースの集合を示し、schema はその中のスキーマ(テーブル構造をまとめる単位)を指します。

product_id

total_quantity

total_revenue

P001

150

15,000

P002

120

30,000

P003

80

12,000

-snip-

まとめ

Databricks の DLTと Auto Loader を組み合わせることで、AWS S3 上に保存された CSV データをリアルタイムで取り込み、信頼性の高い売上集計を効率的に実現できました。

データのクレンジングから集計・可視化までをシンプルなコードで一貫して構築できる点は、Databricks の大きな魅力です。

ダッシュボードを通じたリアルタイム BI との連携にも対応できます。また生成AIとの統合もスムーズです。例えば、SQL から、Mosaic AI Model Serving Endpoint に接続して、シームレスなAI分析を組み合わせることもできます。

データ品質の確保と柔軟なパイプライン設計を両立しながら、データドリブンな意思決定を加速させる基盤として、Databricks をぜひ活用してみてください。

関連サービス

ソフトバンクはAWS アドバンストティアサービスパートナーです。「はじめてのAWS導入」から大規模なサービス基盤や基幹システムの構築まで、お客さまのご要望にあわせて最適なAWS環境の導入を支援します。

Microsoft Azureは、Microsoftが提供するパブリッククラウドプラットフォームです。コンピューティングからデータ保存、アプリケーションなどのリソースを、必要な時に必要な量だけ従量課金で利用することができます。

Google サービスを支える、信頼性に富んだクラウドサービスです。お客さまのニーズにあわせて利用可能なコンピューティングサービスに始まり、データから価値を導き出す情報分析や、最先端の機械学習技術が搭載されています。

おすすめの記事

条件に該当するページがございません