フォーム読み込み中
Alibaba Cloud DataWorks はビッグデータ収集/処理/分析のワンストップソリューションで、MaxCompute、Hologres、EMR、AnalyticDB、CDP など主流のコンピューティングエンジンに対応しています。DataWorks を利用する第一歩としては、他所で蓄積したソースデータをビッグデータストレージにインポートすることですので、DataWorks を利用して OSS に保存しているソースデータを MaxCompute にインポートしてみたいと思います。
前回のブログ「Alibaba Cloud DataWorks を利用した OSS からのデータ収集について(前編)」にて、 Data Integration 機能を利用したインポート方法を検証しましたので、今回の後編は自作コードによる同一 OSS からのデータインポート方法を紹介します。
DataWorks の PyODPS3 ノードにて自作コードを作成し、OSS 上のソースデータを MaxCompute Table にインポートします。
PyODPS3 ノードは Resoure Group にて Python3 のコードを実行する機能です。PyODPS3 ノードの詳細はこちらをご参照ください。
1-1.検証用 DataWorks Workspace を作成します。 DataWorks のコンソールを開き、「Create Workspace」をクリックします。
1-2.任意の「Workspace Name」を入力し、「Commit」をクリックします。
1-3.お勧めのビッグデータエンジンが表示されます。今回のエンジンは MaxCompute にしますので、「MaxCompute」右側の「Associate Now」をクリックします。
1-4.下記内容を入力し、「Complete Association」をクリックします。
Resource Display Name:任意入力
Project Source:Create project
Payment Model:Pay by Volume
Quota Group:Pay-as-you-go Quota
Data Type:2.0 Data Type
Encrypted:No encryption
Project Name:任意入力
Scheduled Access Identity:Alibaba Cloud primary account
1-5.Workspace と MaxCompute インスタンス紐付け完了の画面が表示され、「Back」をクリックし、Workspace 一覧に戻ります。
1-6.MaxCompute 側でインポート先のテーブルを作成します。Workspace 一覧画面で、検証用 Workspace 右側の「Data Studio」をクリックします。
1-7.Data Studio コンソールで「MaxCompute ー Table ー Create Table」を順にクリックします。
1-8.任意の「Table Name」を入力し、「Create」をクリックします。
1-9.「Create Field」をクリックし、下記の 4 つフィールドを作成します。
m_name, STRING m_age, STRING m_cardno, STRING m_birthday, STRING |
1-10.「Commit to Production Environment ー OK」をクリックし、テーブルを作成します。
1-11.左メニューの「Tenant Tables」から「手順1-10 で作成したテーブル」を特定し、「Preview」タブにて中身が空であることを確認します。
2-1.検証用 OSS バケットを作成します。下記内容を入力し、「OK」をクリックします。
Bucket Name:任意入力
Region:Tokyo
2-2.下記内容の CSV ファイルを作成し、手順2-1 で作成したバケットにアップロードします。
ファイル名:test.csv
Name,Age,CardNo,Birthday Nancy,23,1,2000/1/1 Bob,22,2,2001/1/1 Mary,21,3,2002/1/1 Jack,20,4,2003/1/1 Smith,19,5,2004/1/1 |
2-3.Dataworks 製品から OSS を操作するシステムロールが存在します。自作コードに該当ロールの ARN 情報が必要ですので、RAM ロールのコンソールを開きます。
2-4.「AliyunODPSPAIDefaultRole」というロールを特定し、クリックします。
2-5.「Basic Information ー ARN」の値を記録します。
3-1.Dataworks 専用リソースグループを購入します。Resource Groups 一覧画面にて、「Create Resource Group for Scheduling ー Purchase」をクリックします。
3-2.購入ページで下記のように入力し、「Buy Now」をクリックします。
Region:Tokyo
Type:Exclusive Resource for Scheduling
Exclusive Resources for Scheduling:4 CPU 8 GB
Units:1
Duration:1 Month
Resource Group Name:任意入力
Resource Group Description:任意入力
3-3.「Pay」をクリックします。
3-4.作成した Resoure Group が Running になってから、「Change Workspace」をクリックし、検証用 WorkSpace 右側の「Bind」をクリックし、検証用 WorkSpace から該当 Resoure Group を利用できるようになります。
3-5.Workspace 一覧画面にて、検証用 Workspace 右側の「DataStudio」をクリックします。
3-6.「MaxCompute ー Create Node ー PyODPS 3」を順にクリックし、PyODPS 3 ノードを作成します。
3-7.任意の Name を入力し、「Confirm」をクリックします。
3-8.下記コードを入力し、「Save ー Run with Parameters ー 手順 3-4 でバインドした専用 Resoure Group」の順にクリックし、ノードの実行を開始します。
from odps import ODPS odps.execute_sql("load overwrite table <1> from location '<2>' stored by 'com.aliyun.odps.CsvStorageHandler' with serdeproperties ('odps.properties.rolearn'='<3>','odps.text.option.delimiter'=',')") |
説明:
No | 入力内容 | 入力例 |
<1> | 手順 1-8 で作成したテーブル名 | testtable |
<2> | 手順 2-2 で CSV ファイルがアップロードされた OSS フォルダのパス | oss://oss-ap-northeast-1.aliyuncs.com/testpath/ |
<3> | 手順 2-5 で記録した ARN の値 | acs:ram::538187xxxxx:role/aliyunodpspaidefaultrole |
3-9.しばらく待つと実行が完了します。
3-10.左メニューの「Tenant Tables」から「手順 1-10 で作成したテーブル」を特定し、Preview タブにて CSV データが反映されたことを確認します。
DataWorks の PyODPS3ノードで自作コードを利用して、OSS 上のソースデータを MaxCompute にインポートしてみました。OSS に蓄積したログ(アクセスログ、監査ログなど)を MaxCompute で分析したいニーズをよく聞きますので、この手順は役に立つと思います。使い分けとして、前編の Data Integration 機能は GUI 利用可能で、初心者でも安心感が強いですが、大量の OSS ファイルをインポートするシーンは繰り返し作業が必要になります。一方、今回の自作コードの方は引数を変えるだけで、容易に大量のインポート構文を作成できるメリットもあります。OSS から Dataworks にデータをインポートするニーズがありましたら、ぜひ本記事を参考にしてみてください。
条件に該当するページがございません