Spark データフレームのネストされたフィールドの名前を変更する 質問する

Spark データフレームのネストされたフィールドの名前を変更する 質問する

dfSpark でデータフレームを使用する:

 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

フィールドの名前を変更するarray_field.aにはどうすればいいですかarray_field.a_renamed?

[アップデート]:

.withColumnRenamed()ネストされたフィールドでは機能しないため、このハッキーで安全でない方法を試しました。

# First alter the schema:
schema = df.schema
schema['array_field'].dataType.elementType['a'].name = 'a_renamed'

ind = schema['array_field'].dataType.elementType.names.index('a')
schema['array_field'].dataType.elementType.names[ind] = 'a_renamed'

# Then set dataframe's schema with altered schema
df._schema = schema

プライベート属性を設定するのは良い方法ではないことはわかっていますが、dfのスキーマを設定する他の方法がわかりません

私は正しい方向に進んでいると思うが、df.printSchema()まだ古い名前が表示されていますarray_field.adf.schema == schemaTrue

ベストアンサー1

パイソン

ネストされた単一のフィールドを変更することはできません。構造全体を再作成する必要があります。この特定のケースでは、最も簡単な解決策は を使用することですcast

まず、一連のインポート:

from collections import namedtuple
from pyspark.sql.functions import col
from pyspark.sql.types import (
    ArrayType, LongType, StringType, StructField, StructType)

サンプルデータ:

Record = namedtuple("Record", ["a", "b", "c"])

df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])

スキーマがあなたの場合と同じであることを確認しましょう:

df.printSchema()
root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

たとえば、新しいスキーマを文字列として定義できます。

str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select(col("array_field").cast(str_schema)).printSchema()
root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a_renamed: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

またはDataType:

struct_schema = ArrayType(StructType([
    StructField("a_renamed", StringType()),
    StructField("b", LongType()),
    StructField("c", LongType())
]))

 df.select(col("array_field").cast(struct_schema)).printSchema()
root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a_renamed: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

スカラ

同じテクニックを Scala でも使用できます。

case class Record(a: String, b: Long, c: Long)

val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")

val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select($"array_field".cast(strSchema))

または

import org.apache.spark.sql.types._

val structSchema = ArrayType(StructType(Seq(
    StructField("a_renamed", StringType),
    StructField("b", LongType),
    StructField("c", LongType)
)))

df.select($"array_field".cast(structSchema))

改善の可能性:

dict表現力豊かなデータ操作やJSON処理ライブラリを使用すると、データ型をJSON文字列にダンプしてそこから取得するのが簡単になります(Python /toolz):

from toolz.curried import pipe, assoc_in, update_in, map
from operator import attrgetter

# Update name to "a_updated" if name is "a"
rename_field = update_in(
    keys=["name"], func=lambda x: "a_updated" if x == "a" else x)

updated_schema = pipe(
   #  Get schema of the field as a dict
   df.schema["array_field"].jsonValue(),
   # Update fields with rename
   update_in(
       keys=["type", "elementType", "fields"],
       func=lambda x: pipe(x, map(rename_field), list)),
   # Load schema from dict
   StructField.fromJson,
   # Get data type
   attrgetter("dataType"))

df.select(col("array_field").cast(updated_schema)).printSchema()

おすすめ記事