warren-wong-kMRMcUcO81M-unsplash.jpg

PySpark DataFrameメモ

 
0
このエントリーをはてなブックマークに追加
Kazuki Moriyama
Kazuki Moriyama (森山 和樹)

DataFrameの作成

SparkSession#cresateDataFrameを使用する
作成元のデータは

  • RDD
  • list
  • pandas.DataFrame
    のいずれか

pyspark.sql module — PySpark 2.4.5 documentation

select

リストカラムを展開して複数行にする

exploedeを使用する。

df\_e = df.select(id', F.explode(F.col('tags’)).alias(‘tag'))

PySparkで1レコードを複数行にする - iMind Developers Blog

辞書カラムから特定の要素を抜き出す

df.select(
  col(“payload.weight”).alias(“weight”),
    col(“payload.unix\_msec”).alias(“unix\_msec”), “macAddress”, “sensorType”
)

filter

複数条件は |または&でつなぐ

df.filter((col("act\_date”) >=2016-10-01) & (col(“act\_date”) <=2017-04-01))

python - Multiple condition filter on dataframe - Stack Overflow

betweenは以上と以下になる

\>>> df.select(df.name, df.age.between(2, 4)).show()
+——+—————————————+
| name|((age >= 2) AND (age <= 4))|
+——+—————————————+
|Alice|                       true|
|  Bob|                      false|
+——+—————————————+

udf

作り方

pysparkでデータハンドリングする時によく使うやつメモ - Qiita

udfのreturn typeの注意点

  • udfでは戻り値の型を指定してやる必要があるが、numpyと併用すると容易にnumpy.floatとかに化けるためエラーが起きる時がある。その時はfloatとかに変換してやればいい。
  • 戻り値をFloatとかにしてIntegerを返すと普通に動くが、適合していないとこはすべてNoneになる

変換

columnの型を変換する

changedTypedf = joindf.withColumn("label”, joindf\[“show”\].cast("double"))

how to change a Dataframe column from String type to Double type in pyspark

カラムのrename

withColumnRenamed(“old_name”, “new_name”)

from/to pandas

import numpy as np
import pandas as pd

Enable Arrow-based columnar data transfers

spark.conf.set(“spark.sql.execution.arrow.enabled”, “true”)

Generate a Pandas DataFrame

pdf = pd.DataFrame(np.random.rand(100, 3))

Create a Spark DataFrame from a Pandas DataFrame using Arrow

df = spark.createDataFrame(pdf)

Convert the Spark DataFrame back to a Pandas DataFrame using Arrow

result\_pdf = df.select(“\*).toPandas()

unionとunionByNameの違い

カラム名を参照して結合してくれるかどうか

df1 = spark.createDataFrame(\[(1, 2, 3)\], \[‘x’, ‘y’, ‘z’\])
df2 = spark.createDataFrame(\[(4, 5, 6)\], \[‘z’, ‘x’, ‘y’\])

df\_union = df1.union(df2)
df\_unionByName = df1.unionByName(df2)

print(‘df1’)
df1.show()
print(‘df2’)
df2.show()

# df1
# +—+—+—+
# |  x|  y|  z|
# +—+—+—+
# |  1|  2|  3|
# +—+—+—+
#
# df2
# +—+—+—+
# |  z|  x|  y|
# +—+—+—+
# |  4|  5|  6|
# +—+—+—+

print(‘union’)
df\_union.show()
print(‘unionByName’)
df\_unionByName.show()

# union
# +—+—+—+
# |  x|  y|  z|
# +—+—+—+
# |  1|  2|  3|
# |  4|  5|  6|
# +—+—+—+
#
# unionByName
# +—+—+—+
# |  x|  y|  z|
# +—+—+—+
# |  1|  2|  3|
# |  5|  6|  4|
# +—+—+—+

次の行との計算を行いたい場合

Windowを使って次、または前の行のデータを書く行に連結する。
前の値は lag、次の行の値は leadで取得する。

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import lit

dfu = df.withColumn(‘user’, lit(‘tmoore’))

df\_lag = dfu.withColumn(‘prev\_day\_price’,
                        func.lag(dfu\[‘price’\])
                                 .over(Window.partitionBy(“user”)))

result = df\_lag.withColumn(‘daily\_return’, 
          (df\_lag\[‘price’\] - df\_lag\[‘prev\_day\_price’\]) / df\_lag\[‘price’\] )

>>> result.show()
++——+———+———————+——————————+
|day|price|   user|prev\_day\_price|        daily\_return|
+
info-outline

お知らせ

K.DEVは株式会社KDOTにより運営されています。記事の内容や会社でのITに関わる一般的なご相談に専門の社員がお答えしております。ぜひお気軽にご連絡ください。