フォーム読み込み中
迅速かつ的確な意思決定には、リアルタイムなデータ分析が不可欠です。とくに売上のような主要ビジネス指標は、即時の可視化と集計が求められます。
本記事では、Databricks の強力な機能である Delta Live Tables(DLT) を活用し、AWS S3 上の CSVファイルから商品ごとの総売上をリアルタイムに集計する方法をご紹介します。
Databricks は、Apache Spark を基盤とする統合データ分析プラットフォームです。主要なクラウドサービス(AWS、Azure、Google Cloud など)上で動作し、データ処理、分析、機械学習までを一貫して実行できる環境を提供します。
AWS などのクラウドサービスは、仮想マシン、ストレージ、ネットワークなどのインフラ基盤を支えます。
Databricks は、こうしたクラウドインフラ上に構築され、データの処理・分析・活用に特化したアプリケーション層のプラットフォームとして機能します。
大きな特長として、これまで個別に構築・管理されていたバッチ処理、ストリーミング処理、BI 分析、AI / 機械学習のワークロードを、Databricks では単一のプラットフォーム上でシームレスに統合できる点が挙げられます。これにより、データ活用のあらゆるフェーズを効率的に進めることが可能になります。
Delta Live Tables(DLT)は、Databricks の機能の一つで、データ処理のパイプラインを簡単に作れるサービスです。
データパイプラインとは、例えば「色々な場所にあるデータを集めてきて、必要な形に加工して、使いやすいように保存する」といった一連のデータ処理の流れのことです。
DLT を使うと、この複雑なデータパイプラインも簡単に構築できます。しかも、データ同士のつながり(依存関係)を DLT が自動で理解し、最適な順番で処理を実行してくれます。
「販売データをリアルタイムにクレンジング・集計し、商品ごとの売上ランキングを作成する」データパイプラインを構築します。
(対象データ)
ER図で記載すると、この関係にあります。
s3://xxx/sales/
├── products/ # 商品データ(product01.csv など)
└── transactions/ # 販売トランザクションデータ(transaction01.csv など)
商品データ例
product_id | price |
|---|---|
P001 | 100 |
P002 | 250 |
P003 | 150 |
トランザクションデータ例
transaction_id | product_id | quantity | sale_date |
|---|---|---|---|
T000001 | P001 | 5 | 2025-06-01 |
T000002 | P002 | 3 | 2025-06-01 |
T000003 | P002 | 2 | 2025-06-02 |
T000004 | P003 | 7 | 2025-06-02 |
T000005 | P001 | 3 | 2025-06-03 |
Databricks 上での、完成後のパイプライングラフを先にお見せします。
データの品質が、ETL処理を通して、ブロンズ(Bronze)、シルバー(Silver)、ゴールド(Gold)となるメダリオンアーキテクチャを意識してみました。
Databricks には多くの便利な機能がありますが、開発の進め方はローカル環境での作業に近い感覚で行えます。クラウドのリソースを従えた、自分専用の開発環境と考えるとイメージしやすいでしょう。
以後、解説のためにプログラムを分けて説明していきますが、その通りにセルを分割する必要はありません。上図の通り、1セル内に貼り付けても問題ありません。
CSVファイルの形式に合わせたスキーマを定義します。
# 商品マスタのスキーマ
products_schema = StructType([
StructField("product_id", StringType(), True), # 商品ID
StructField("price", IntegerType(), True) # 単価
])
# 販売トランザクションのスキーマ
transactions_schema = StructType([
StructField("transaction_id", StringType(), True), # トランザクションID
StructField("product_id", StringType(), True), # 商品ID
StructField("quantity", IntegerType(), True), # 売れた数量
StructField("sale_date", StringType(), True) # 販売日
])
Auto Loader は、クラウドストレージ上の新着ファイルを自動的に検知し、ストリーミング的に取り込むことができる Databricks の機能です。大量データの継続的な取り込みにも適しています。
@dlt.table(
name="products_raw"
)
def products_raw():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("cloudFiles.allowOverwrites", "true")
.schema(products_schema)
.load("s3://***/sales/products/")
)
@dlt.table(
name="transactions_raw",
table_properties={"pipelines.reset.allowed": "false"}
)
def transactions_raw():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.schema(transactions_schema)
.load("s3://***/sales/transactions/")
)
データの品質を担保するために、不正なデータ(NULL値や0以下の値など)を検出し、別テーブルに保存します。
# 不整合データを検出する関数
def detect_invalid_data(df):
invalid_df = df.filter(
col("product_id").isNull() | # 商品IDがNULLの場合
col("quantity").isNull() | # 数量がNULLの場合
(col("quantity") <= 0) | # 数量が0以下の場合(括弧追加)
col("price").isNull() | # 単価がNULLの場合
(col("price") <= 0) # 単価が0以下の場合(括弧追加)
)
return invalid_df
# 不整合データを別テーブルに切り出し
@dlt.table(
name="invalid_transactions"
)
def invalid_transactions():
products_df = dlt.read("products_raw")
transactions_df = dlt.read("transactions_raw")
# 結合後に不整合データを検出
merged_df = (
transactions_df.join(products_df, on="product_id", how="inner")
)
invalid_df = detect_invalid_data(merged_df)
return invalid_df
先ほどのフィルタリング条件を逆転させ、正常なデータのみを抽出します。
@dlt.table(name="valid_transactions")
def valid_transactions():
merged_df = dlt.read("transactions_raw").join(dlt.read("products_raw"), "product_id")
return merged_df.filter(
col("product_id").isNotNull() &
col("quantity").isNotNull() & (col("quantity") > 0) &
col("price").isNotNull() & (col("price") > 0)
)
最後に、正常なデータを基に、商品ごとの売上金額と販売数を集計します。
@dlt.table(name="aggregated_sales")
def aggregated_sales():
return (
dlt.read("valid_transactions")
.groupBy("product_id")
.agg(
expr("sum(quantity) as total_quantity"),
expr("sum(quantity * price) as total_revenue")
)
.orderBy(col("total_revenue").desc())
)
以上を Notebook に貼り付けて適当な名前で保存しておきます。
ワークフロー > パイプラインの設定画面で、作成した Notebook を指定するだけで DLT の実行準備が完了します。
パイプラインはデータ処理の本体を定義するものです。
これを定期実行したい場合は、Databricks のワークフロー機能を使ってスケジューリングします。
オプションとしては、まずは下記辺りを気にしておけばいいでしょう。
(パイプラインのトリガーモード)
あとは、クラスタのワーカー数などのパラメータで調整できます。処理速度と課金のバランスを見て選択してください。
集計結果は SQL を使って見てみましょう。これも新規に Notebook を開き、SQL を発行するだけです。
select * from catalog00.schema00.aggregated_sales
※ catalog00.schema00.aggregated_sales の catalog や schema は、ご自身の Databricks 環境に応じて読み替えてください。catalog はデータベースの集合を示し、schema はその中のスキーマ(テーブル構造をまとめる単位)を指します。
product_id | total_quantity | total_revenue |
|---|---|---|
P001 | 150 | 15,000 |
P002 | 120 | 30,000 |
P003 | 80 | 12,000 |
-snip-
Databricks の DLTと Auto Loader を組み合わせることで、AWS S3 上に保存された CSV データをリアルタイムで取り込み、信頼性の高い売上集計を効率的に実現できました。
データのクレンジングから集計・可視化までをシンプルなコードで一貫して構築できる点は、Databricks の大きな魅力です。
ダッシュボードを通じたリアルタイム BI との連携にも対応できます。また生成AIとの統合もスムーズです。例えば、SQL から、Mosaic AI Model Serving Endpoint に接続して、シームレスなAI分析を組み合わせることもできます。
データ品質の確保と柔軟なパイプライン設計を両立しながら、データドリブンな意思決定を加速させる基盤として、Databricks をぜひ活用してみてください。
ソフトバンクはAWS アドバンストティアサービスパートナーです。「はじめてのAWS導入」から大規模なサービス基盤や基幹システムの構築まで、お客さまのご要望にあわせて最適なAWS環境の導入を支援します。
Microsoft Azureは、Microsoftが提供するパブリッククラウドプラットフォームです。コンピューティングからデータ保存、アプリケーションなどのリソースを、必要な時に必要な量だけ従量課金で利用することができます。
Google サービスを支える、信頼性に富んだクラウドサービスです。お客さまのニーズにあわせて利用可能なコンピューティングサービスに始まり、データから価値を導き出す情報分析や、最先端の機械学習技術が搭載されています。
条件に該当するページがございません