PySpark DataFrameメモ
Kazuki Moriyama (森山 和樹)
DataFrameの作成
SparkSession#cresateDataFrame
を使用する
作成元のデータは
- RDD
- list
- pandas.DataFrame
のいずれか
pyspark.sql module — PySpark 2.4.5 documentation
select
リストカラムを展開して複数行にする
exploedeを使用する。
df\_e = df.select(‘id', F.explode(F.col('tags’)).alias(‘tag'))
PySparkで1レコードを複数行にする - iMind Developers Blog
辞書カラムから特定の要素を抜き出す
df.select(
col(“payload.weight”).alias(“weight”),
col(“payload.unix\_msec”).alias(“unix\_msec”), “macAddress”, “sensorType”
)
filter
|
または&
でつなぐ
複数条件は df.filter((col("act\_date”) >= “2016-10-01”) & (col(“act\_date”) <= “2017-04-01”))
python - Multiple condition filter on dataframe - Stack Overflow
betweenは以上と以下になる
\>>> df.select(df.name, df.age.between(2, 4)).show()
+——+—————————————+
| name|((age >= 2) AND (age <= 4))|
+——+—————————————+
|Alice| true|
| Bob| false|
+——+—————————————+
udf
作り方
pysparkでデータハンドリングする時によく使うやつメモ - Qiita
udfのreturn typeの注意点
- udfでは戻り値の型を指定してやる必要があるが、numpyと併用すると容易にnumpy.floatとかに化けるためエラーが起きる時がある。その時はfloatとかに変換してやればいい。
- 戻り値をFloatとかにしてIntegerを返すと普通に動くが、適合していないとこはすべてNoneになる
変換
columnの型を変換する
changedTypedf = joindf.withColumn("label”, joindf\[“show”\].cast("double"))
how to change a Dataframe column from String type to Double type in pyspark
カラムのrename
withColumnRenamed(“old_name”, “new_name”)
from/to pandas
import numpy as np
import pandas as pd
Enable Arrow-based columnar data transfers
spark.conf.set(“spark.sql.execution.arrow.enabled”, “true”)
Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))
Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)
Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result\_pdf = df.select(“\*”).toPandas()
unionとunionByNameの違い
カラム名を参照して結合してくれるかどうか
df1 = spark.createDataFrame(\[(1, 2, 3)\], \[‘x’, ‘y’, ‘z’\])
df2 = spark.createDataFrame(\[(4, 5, 6)\], \[‘z’, ‘x’, ‘y’\])
df\_union = df1.union(df2)
df\_unionByName = df1.unionByName(df2)
print(‘df1’)
df1.show()
print(‘df2’)
df2.show()
# df1
# +—+—+—+
# | x| y| z|
# +—+—+—+
# | 1| 2| 3|
# +—+—+—+
#
# df2
# +—+—+—+
# | z| x| y|
# +—+—+—+
# | 4| 5| 6|
# +—+—+—+
print(‘union’)
df\_union.show()
print(‘unionByName’)
df\_unionByName.show()
# union
# +—+—+—+
# | x| y| z|
# +—+—+—+
# | 1| 2| 3|
# | 4| 5| 6|
# +—+—+—+
#
# unionByName
# +—+—+—+
# | x| y| z|
# +—+—+—+
# | 1| 2| 3|
# | 5| 6| 4|
# +—+—+—+
次の行との計算を行いたい場合
Windowを使って次、または前の行のデータを書く行に連結する。
前の値は lag
、次の行の値は lead
で取得する。
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import lit
dfu = df.withColumn(‘user’, lit(‘tmoore’))
df\_lag = dfu.withColumn(‘prev\_day\_price’,
func.lag(dfu\[‘price’\])
.over(Window.partitionBy(“user”)))
result = df\_lag.withColumn(‘daily\_return’,
(df\_lag\[‘price’\] - df\_lag\[‘prev\_day\_price’\]) / df\_lag\[‘price’\] )
>>> result.show()
+—+——+———+———————+——————————+
|day|price| user|prev\_day\_price| daily\_return|
+