私はPySpark DataFrame(パンダではない) と呼ばれるdf
、使用するにはかなり大きいデータがありますcollect()
。したがって、以下に示すコードは効率的ではありません。以前は少量のデータで動作していましたが、現在は失敗します。
import numpy as np
myList = df.collect()
total = []
for product,nb in myList:
for p2,score in nb:
total.append(score)
mean = np.mean(total)
std = np.std(total)
mean
または同様のものを使用して、と をstd
2 つの変数として取得する方法はありますかpyspark.sql.functions
?
from pyspark.sql.functions import mean as mean_, std as std_
を使用することもできますwithColumn
が、この方法では計算が行ごとに適用され、単一の変数は返されません。
アップデート:
サンプルコンテンツdf
:
+----------+------------------+
|product_PK| products|
+----------+------------------+
| 680|[[691,1], [692,5]]|
| 685|[[691,2], [692,2]]|
| 684|[[691,1], [692,3]]|
値の平均と標準偏差を計算する必要があります。たとえば、のscore
値はスコアの 1 つです。1
[691,1]
ベストアンサー1
組み込み関数を使用して集計統計を取得できます。平均と標準偏差を取得する方法は次のとおりです。
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col
df_stats = df.select(
_mean(col('columnName')).alias('mean'),
_stddev(col('columnName')).alias('std')
).collect()
mean = df_stats[0]['mean']
std = df_stats[0]['std']
3 つの異なる標準偏差関数があることに注意してください。ドキュメントによると、私が使用した関数 ( stddev
) は次を返します。
集計関数: グループ内の式の偏りのない標本標準偏差を返します。
describe()
次のメソッドも使用できます:
df.describe().show()
詳細については、このリンクを参照してください:pyspark.sql.関数
アップデート: これはネストされたデータを処理する方法です。
を使用してexplode
値を個別の行に抽出し、上記のようにmean
と を呼び出します。stddev
MWE は次のとおりです。
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev
# mock up sample dataframe
df = sqlCtx.createDataFrame(
[(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
["product_PK", "products"]
)
# udf to get the "score" value - returns the item at index 1
get_score = udf(lambda x: x[1], IntegerType())
# explode column and get stats
df_stats = df.withColumn('exploded', explode(col('products')))\
.withColumn('score', get_score(col('exploded')))\
.select(
_mean(col('score')).alias('mean'),
_stddev(col('score')).alias('std')
)\
.collect()
mean = df_stats[0]['mean']
std = df_stats[0]['std']
print([mean, std])
出力は次のようになります:
[2.3333333333333335, 1.505545305418162]
numpy
これらの値が正しいかどうかは、以下を使用して確認できます。
vals = [1,5,2,2,1,3]
print([np.mean(vals), np.std(vals, ddof=1)])
説明:"products"
列はlist
のですlist
。 を呼び出すと、explode
外側の の各要素に対して新しい行が作成されます。次に、 2 要素の の 2 番目の要素として定義した、分解された各行から値list
を取得します。最後に、この新しい列に対して集計関数を呼び出します。"score"
list