PySpark DataFrame から平均と標準偏差を計算するにはどうすればよいでしょうか? 質問する

PySpark DataFrame から平均と標準偏差を計算するにはどうすればよいでしょうか? 質問する

私はPySpark DataFrame(パンダではない) と呼ばれるdf、使用するにはかなり大きいデータがありますcollect()。したがって、以下に示すコードは効率的ではありません。以前は少量のデータで動作していましたが、現在は失敗します。

import numpy as np

myList = df.collect()
total = []
for product,nb in myList:
    for p2,score in nb:
            total.append(score)
mean = np.mean(total)
std = np.std(total)

meanまたは同様のものを使用して、と をstd2 つの変数として取得する方法はありますかpyspark.sql.functions?

from pyspark.sql.functions import mean as mean_, std as std_

を使用することもできますwithColumnが、この方法では計算が行ごとに適用され、単一の変数は返されません。

アップデート:

サンプルコンテンツdf:

+----------+------------------+
|product_PK|          products|
+----------+------------------+
|       680|[[691,1], [692,5]]|
|       685|[[691,2], [692,2]]|
|       684|[[691,1], [692,3]]|

値の平均と標準偏差を計算する必要があります。たとえば、のscore値はスコアの 1 つです。1[691,1]

ベストアンサー1

組み込み関数を使用して集計統計を取得できます。平均と標準偏差を取得する方法は次のとおりです。

from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = df.select(
    _mean(col('columnName')).alias('mean'),
    _stddev(col('columnName')).alias('std')
).collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

3 つの異なる標準偏差関数があることに注意してください。ドキュメントによると、私が使用した関数 ( stddev) は次を返します。

集計関数: グループ内の式の偏りのない標本標準偏差を返します。

describe()次のメソッドも使用できます:

df.describe().show()

詳細については、このリンクを参照してください:pyspark.sql.関数

アップデート: これはネストされたデータを処理する方法です。

を使用してexplode値を個別の行に抽出し、上記のようにmeanと を呼び出します。stddev

MWE は次のとおりです。

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev

# mock up sample dataframe
df = sqlCtx.createDataFrame(
    [(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
    ["product_PK", "products"]
)

# udf to get the "score" value - returns the item at index 1
get_score = udf(lambda x: x[1], IntegerType())

# explode column and get stats
df_stats = df.withColumn('exploded', explode(col('products')))\
    .withColumn('score', get_score(col('exploded')))\
    .select(
        _mean(col('score')).alias('mean'),
        _stddev(col('score')).alias('std')
    )\
    .collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

print([mean, std])

出力は次のようになります:

[2.3333333333333335, 1.505545305418162]

numpyこれらの値が正しいかどうかは、以下を使用して確認できます。

vals = [1,5,2,2,1,3]
print([np.mean(vals), np.std(vals, ddof=1)])

説明:"products"列はlistのですlist。 を呼び出すと、explode外側の の各要素に対して新しい行が作成されます。次に、 2 要素の の 2 番目の要素として定義した、分解された各行から値listを取得します。最後に、この新しい列に対して集計関数を呼び出します。"score"list

おすすめ記事