PySpark on AWS Glue
dynamic frame系のoptional引数transformation_ctx
AWS Glueでは、transformation_ctx
というオプション引数を使用して、ジョブのブックマークを制御することができます。ジョブのブックマークは、生データの重複処理を防ぐための仕組みです。詳細については理解できていないが、一般的にはこのオプション引数を指定することをお勧めします。
カタログからのデータの読み込み
glueContext.create_dynamic_frame.from_catalog
メソッドを使用して、AWS Glueデータカタログからデータを読み込むことができます。以下はその使用例です。
pre_logs = glueContext.create_dynamic_frame.from_catalog(
database="db_name",
table_name="table_name",
transformation_ctx="anything"
)
choiceの解決方法
ResolveChoice.apply
メソッドを使用して、AWS Glueデータフレームのchoiceの解決を行います。以下はその使用例です。
logs = ResolveChoice.apply(
frame=pre_logs,
choice="MATCH_CATALOG",
database="test",
table_name="logs",
transformation_ctx="logs"
)
既存のPythonライブラリを読み込む方法
外部のPythonライブラリをAWS Glueジョブで使用する方法はいくつかありますが、下記の手順で行うことができます。ただし、ルートに __init__.py
ファイルを作成する必要があることに注意してください。
AWS GlueでS3上にあるPythonの外部ライブラリをインポートして利用する - YOMON8.NET
APIを用いたジョブの取り扱い
コードからのジョブの呼び出し
AWS GlueのPython APIを使用して、ジョブをコードから呼び出すことができます。以下はその使用例です。
response = client.start_job_run(
JobName='my_test_Job',
Arguments={
'--day_partition_key': 'partition_0',
'--hour_partition_key': 'partition_1',
'--day_partition_value': day_partition_value,
'--hour_partition_value': hour_partition_value
}
)
コードからのパラメータの取得方法
awsglue.utils.getResolvedOptions
メソッドを使用して、AWS Glueジョブのパラメータを取得することができます。以下はその使用例です。
import sys
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value'])
print("The day-partition key is: ", args['day_partition_key'])
print("and the day-partition value is: ", args['day_partition_value'])
APIを用いたワークフローの取り扱い
ワークフローの実行
AWS GlueのPython APIを使用して、ワークフローをコードから実行することができます。パラメータも動的に指定することが可能です。以下はその使用例です。
Glue — Boto 3 Docs 1.12.23 documentation
ジョブからのワークフロープロパティの取得
AWS Glueのジョブ内からワークフローのプロパティを取得する場合は、boto3
を使用して以下のように行います。
import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext
glue_client = boto3.client("glue")
args = getResolvedOptions(sys.argv, ['JOB_NAME','WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
workflow_name = args['WORKFLOW_NAME']
workflow_run_id = args['WORKFLOW_RUN_ID']
workflow_params = glue_client.get
_workflow_run_properties(Name=workflow_name, RunId=workflow_run_id)["RunProperties"]
target_database = workflow_params['target_database']
target_s3_location = workflow_params['target_s3_location']