こんにちは。技術開発部の赤井橋です。
弊社では現在adstirログ基盤のリプレイスを計画しており、その一貫としてAWS Glueでのデータ変換(json → parquet)、及び変換データのAthenaでの検索を試しました。
Glueとは
https://aws.amazon.com/jp/glue/
2017-08-15から利用出来るようになった抽出、変換、ロード (ETL) を行う完全マネージド型のAWSサービスです。 使い所としてはファイルのカラムナフォーマットへの変換、及びパーティションが有効なディレクトリへの配置、データカタログ(テーブル定義のメタデータ)の更新・・など、 ビッグデータを使いやすく成型する場面が多いかと思います。
Glueが提供する機能
大きく分けて2つ存在します。(2018-07-31時点)
1. データカタログの更新
クローラという機能を用いてデータカタログの自動更新を行うことが出来ます。 クローラは指定されたデータソース(特定のS3ディレクトリなど)内をスキャンし、該当ディレクトリにあるファイルフォーマットに合わせたテーブル定義(パーティショニング含む)を自動で行ってくれます。 定義されたデータカタログはAthena、EMR、Redshift Spectrumでも使用(2018-07-31時点)でき、実行スケジュールの登録も可能です。
2. ETL
ジョブという機能を用いてデータ抽出、変換を行うことが出来ます。 ジョブ追加時のセットアップガイダンスに従って進めていくと最終的にPythonのスクリプトが自動生成されます。シンプルな処理であればスクリプトの修正は不要ですが、手の込んだ処理の場合修正する必要があります。 ジョブを定期実行するトリガーという機能もあります。
使用にあたり
上記2つの機能はどちらかだけ使用してもよく、どちらとも使用したい場合も使用順序に制約はありません。 そのため、クローラでテーブル定義を更新し、更新されたテーブル定義を元にジョブでのデータ変換を行う、という流れ以外にもジョブで変換されたファイルに対してクローラでテーブル定義を更新する、という用途でも使用出来ます。
ジョブのスクリプト
セットアップガイダンスを終えるとPythonスクリプトが生成されますが、例えば特定の引数を受け取って処理を行いたい場合は別途実装が必要です。 スクリプトではGlueで独自定義されたDynamicFrameというデータ構造を操作することで独自の変換処理を行えます。
例)AWS CLIからyear引数を指定してGlueを実行し、受け取ったyear引数をS3のデータソースのパスとして設定したい場合
AWS CLI
1 |
aws glue start-job-run --job-name TestJob --arguments '{"--year":"2018"}' |
スクリプト
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
from awsglue.utils import getResolvedOptions # 引数からパスを生成 args = getResolvedOptions(sys.argv, ['year']) basepath = "s3://mybucket/Glue/year=" s3path = basepath + args['year'] sc = SparkContext() glueContext = GlueContext(sc) # 作成したパスのデータを指定して、DynamicFrameを作成 datasource0 = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [s3path]}, format="csv", format_options={}, transformation_ctx="datasource0") ... #以後、DataFrameに対する処理を記述 |
変換対象のフォーマットが複雑な場合も別途実装が必要です。
例)tsvフォーマットの1列にjson文字列があり、json文字列の部分をstructの配列として変換したい場合
元データ(tsv)
1 2 |
"2018-07-31 00:00:00" "type1" "{column3:[{sub_column1: xxxx, sub_column2: yyyy}, {sub_column1: xxxx, sub_column2: yyyy}]}" "2018-07-31 00:00:00" "type2" "{column3:[{sub_column1: xxxx, sub_column2: yyyy}, {sub_column1: xxxx, sub_column2: yyyy}]}" |
スクリプト
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# マッピング applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "string", "json", "string")], transformation_ctx = "applymapping1") default_df = applymapping1.toDF() # スキーマ定義 fields = [ StructField("column1",StringType()), StructField("column2",StringType()), StructField("column3",ArrayType( StructType([ StructField("sub_column1", StringType()), StructField("sub_column2", IntegerType()), ]))), ] schema = StructType(fields) # データ成型 def format(col0, col1, json): json_data = json.loads(json) ret = {} ret["column1"] = col0 ret["column2"] = col1 json_list = [] for sub_column in json_data["column3"]: sample_dict = {} sample_dict["sub_column1"] = str(sub_column["sub_column1"]) sample_dict["sub_column2"] = int(sub_column["sub_column2"]) # append json_list.append(sample_dict) ret["column3"] = json_list return ret # データ成型してDynamicFrameにする rdd = default_df.select("col0", "col1", "json").rdd.map(lambda x: format(x.col0, x.col1, x.json)) sqlContext = SQLContext(sc) parsedDF = sqlContext.createDataFrame(rdd, schema) dyf = DynamicFrame.fromDF(parsedDF, glueContext, "sample_format") |
GlueとAthena、使ってみて不自由だった点
Glueでデータソースとして読み込めるのは文字コードがUTF-8のみ
UTF-8以外の文字列が変換対象のデータソースに含まれているとGlueでの変換処理が失敗します。
[AWS Black Belt Onine Seminar] AWS Glue
根本的な解決ではないですが、AWS Lambdaを用い対象となるログファイルをUTF-8に変換する、という前処理を行うことで対処しました。
DPUを上げても劇的な処理速度向上は見込めない
GlueではDPU(データ処理ユニット)の数を指定出来るのですが、2倍にすれば2倍処理が早くなる、という挙動ではありませんでした。 7.7G分のファイルでそれぞれ検証したところ、
1 2 3 |
DPU 10 → 14 mins DPU 50 → 9 mins DPU 100 → 9 mins |
という結果でした。
Athenaでのstruct型の使い勝手が悪い
struct型のスキーマ定義に値を追加すると、スキーマと同様の構造で格納されているパーティションでは問題なく検索出来ますが、 値が存在しない(スキーマ変更前の構造の)ファイルが格納されているパーティションでは「HIVE_CANNOT_OPEN_SPLIT」エラーが発生し検索できませんでした。
1 2 3 4 5 6 7 8 9 10 11 |
この定義から CREATE EXTERNAL TABLE `test_tbl`( `id` int, `column` array<struct<aaa:int, bbb:int, ccc:int>>, 〜〜 この定義に変更すると、定義変更前のファイルを読み込もうとした際にHIVE_CANNOT_OPEN_SPLITエラーが発生 CREATE EXTERNAL TABLE `test_tbl`( `id` int, `column` array<struct<aaa:int, bbb:int, ccc:int, new_column:int>>, 〜〜 |
こちら根本的な解決法はないようで、暫定対応としてstruct型をstring型に変更し検索の際にstring文字列をjsonと扱うように対処しました。
まとめ
検証の結果、コストや処理時間なども考慮するとまだGlue + Athenaを導入する判断に至っていません。ただ、比較的新しく提供されたサービスのため今後も機能が追加され使い勝手も改善されていくはずです。 ビッグデータがますます全盛となる時代、このようなサービスの重要性は増していくのではないでしょうか。
今回は以上です。