pexels-photo-733397.jpeg

PySpark on AWS Glue

 
0
このエントリーをはてなブックマークに追加
Kazuki Moriyama
Kazuki Moriyama (森山 和樹)

dynamic frame系のoptional引数transformation_ctx

AWS Glueでは、transformation_ctxというオプション引数を使用して、ジョブのブックマークを制御することができます。ジョブのブックマークは、生データの重複処理を防ぐための仕組みです。詳細については理解できていないが、一般的にはこのオプション引数を指定することをお勧めします。

カタログからのデータの読み込み

glueContext.create_dynamic_frame.from_catalogメソッドを使用して、AWS Glueデータカタログからデータを読み込むことができます。以下はその使用例です。

pre_logs = glueContext.create_dynamic_frame.from_catalog(
    database="db_name",
    table_name="table_name",
    transformation_ctx="anything"
)

choiceの解決方法

ResolveChoice.applyメソッドを使用して、AWS Glueデータフレームのchoiceの解決を行います。以下はその使用例です。

logs = ResolveChoice.apply(
    frame=pre_logs,
    choice="MATCH_CATALOG",
    database="test",
    table_name="logs",
    transformation_ctx="logs"
)

既存のPythonライブラリを読み込む方法

外部のPythonライブラリをAWS Glueジョブで使用する方法はいくつかありますが、下記の手順で行うことができます。ただし、ルートに __init__.py ファイルを作成する必要があることに注意してください。

AWS GlueでS3上にあるPythonの外部ライブラリをインポートして利用する - YOMON8.NET

APIを用いたジョブの取り扱い

コードからのジョブの呼び出し

AWS GlueのPython APIを使用して、ジョブをコードから呼び出すことができます。以下はその使用例です。

response = client.start_job_run(
    JobName='my_test_Job',
    Arguments={
        '--day_partition_key': 'partition_0',
        '--hour_partition_key': 'partition_1',
        '--day_partition_value': day_partition_value,
        '--hour_partition_value': hour_partition_value
    }
)

コードからのパラメータの取得方法

awsglue.utils.getResolvedOptionsメソッドを使用して、AWS Glueジョブのパラメータを取得することができます。以下はその使用例です。

import sys
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value'])
print("The day-partition key is: ", args['day_partition_key'])
print("and the day-partition value is: ", args['day_partition_value'])

APIを用いたワークフローの取り扱い

ワークフローの実行

AWS GlueのPython APIを使用して、ワークフローをコードから実行することができます。パラメータも動的に指定することが可能です。以下はその使用例です。

Glue — Boto 3 Docs 1.12.23 documentation

ジョブからのワークフロープロパティの取得

AWS Glueのジョブ内からワークフローのプロパティを取得する場合は、boto3を使用して以下のように行います。

import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext

glue_client = boto3.client("glue")
args = getResolvedOptions(sys.argv, ['JOB_NAME','WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
workflow_name = args['WORKFLOW_NAME']
workflow_run_id = args['WORKFLOW_RUN_ID']
workflow_params = glue_client.get

_workflow_run_properties(Name=workflow_name, RunId=workflow_run_id)["RunProperties"]

target_database = workflow_params['target_database']
target_s3_location = workflow_params['target_s3_location']

ワークフローの実行プロパティの取得と設定

info-outline

お知らせ

K.DEVは株式会社KDOTにより運営されています。記事の内容や会社でのITに関わる一般的なご相談に専門の社員がお答えしております。ぜひお気軽にご連絡ください。