目次
AWS Glue色々
Kazuki Moriyama (森山 和樹)
GUI 操作
テーブルの分類の変更方法
テーブルページで編集したいテーブルを選択したあと、 テーブルの編集ボタンを押す。
モーダルが出てくるので、 テーブルのプロパティ 項目の classification を json に変更する.
ジョブの登録方法
- ジョブで走らせたいスクリプトを作成する
- GlueApp というオブジェクトの中に走らせたいコードをすべて配置する
- s3 に走らせたいスクリプトファイルを配置する
- Glue 内のジョブページで新規作成する
- ほとんどデフォルト
- Role は GlueJobRole に設定
- Glue version は Spark 2.4, Scala2(Glue Version 1.0)
- このジョブ実行はユーザ提供の既存のスクリプトを選択
- クラス名を GlueApp に設定
- スクリプトの S3 パスをさっきのスクリプトのパスに設定
- 保存を連打して完了
- 完了したらスクリプト編集画面が出てくる
オンデマンドトリガーの実行方法
Glue のトリガータブで作成したオンデマンドトリガーを選択し、アクションからトリガーの実行を選択する。 トリガーによって実行されたかどうかはジョブタブでお目当てのジョブを選択すると下にログが表示される。
ジョブパラメータの注入方法
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME','VAL1','VAL2','VAL3','DEST_FOLDER'])
job.init(args['JOB_NAME'], args)
v_list=[{"VAL1":args['VAL1'],"VAL2":args['VAL2'],"VAL3":args['VAL3']}]
df=sc.parallelize(v_list).toDF()
df.repartition(1).write.mode('overwrite').format('csv').options(header=True, delimiter = ';').save("s3://"+ args['DEST_FOLDER'] +"/")
job.commit()
amazon web services - AWS Glue Job Input Parameters - Stack Overflow
コードを書くときの注意点
glue の 1 カラムに複数の型が入っていたときの対応
1 カラムに複数型が入ると choice 型になってネストした辞書っぽくなる。
DynamicFrame#resolveChoice メソッドを使えば解決できる。
aws-samples/aws-glue-samples
以下は glue のテーブルのスキーマにカラムスキーマをあわせる例。
import com.amazonaws.services.glue.ChoiceOption
val resolvedDf = logs.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")),
database = Some("test"),
tableName = Some("logs")).toDF()
AWS Glue で新しく Scala がサポートされました | cloudpack.media
polynote から Glue スクリプトに移植するときの注意点
- poly で必要な import spark.* 系の import は Glue では全部いらない
- polynote では spark.SparkContext() で SparkContext を作成しているが、Glue では new SparkContext() で作成するように変更する
- polynote ではスクリプトベタ書きだが、Glue では GlueApp オブジェクト内に作成する
object GlueApp { def main(args: Array[String]) = { // Glue ではここにコードを書いていく } }
- poly では spark.createDataFrame でデータフレームを作成しているが、Glue では spark が使えないので sqlContext.createDataFrame で作成する
- polynote では case class を Dataset に詰め替えるときに import spark.implicits._ でエンコーダを用意できるが、Glue では自前で implicit encoder を用意してあげなければならない
// Glue では implicit Encoder を用意する
case class Test(mean_cost: Double, median_cost: Double, mean_time: Double, median_time: Double, hour: String)
implicit val testEnc: Encoder[Test] = Encoders.product[Test]()
例外対応
toDF 無いよ的なこと言われた時
sqlContrext を使用するときは implicits も同時に import しないと toDF などが使えない value toDF is not a member of org.apache.spark.rdd.RDD(Long, org.apache.spark.ml.linalg.Vector) - Stack Overflow
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
DynamicFrame
- Spark や pandas の DataFrame と似た挙動をする
- Spark の Row や pandas の Series に当たるものとして内部的に DynamicRecord というものがある
- AWS の他のサービスと接続性がある
- 自身を S 3に吐き出したり
- toDF は同列に複数型がある場合は Spark の DataFrame に変換できない
情報源
- 公式
- AWS Glue の DynamicFrame の動きを見てみる | Developers.IO
- AWS Glue
- AWS Glue が Scala をサポートしました | Amazon Web Services ブログ
開発エンドポイントと glue job の差異
- 開発エンドポイントだと sparkContext がもともと与えられているが、glue job だと自分で作成しなければいけないのでそこで差異が生じる。具体的には foreach などの中で sc を使った処理を行うと Task not Serializable が出る
PySpark を使用するときの tips
dynamic frame 系の optional 引数 transformation_ctx
- glue は生データの重複処理を防ぐための仕組みとして job bookmark というものを持っている
- transformation_ctx 引数は job bookmark を制御するためのもので、詳しくはよくわからんがとりあえず入れとくのをすすめる What is transformation_ctx used for in aws glue?
カタログからのデータの読み込み
pre_logs = glueContext.create_dynamic_frame.from_catalog(
database = "db_name",
table_name = "table_name",
transformation_ctx = "anything"
)
choice の解決方法
logs = ResolveChoice.apply(
frame=pre_logs, choice="MATCH_CATALOG", database="test", table_name="logs", transformation_ctx="logs"
)
既存の python ライブラリを読み込む方法
下のやつで行ける。
注意点はルートに init.py を作成すること。
AWS Glue で S3 上にある Python の外部ライブラリをインポートして利用する - YOMON8.NET