Создание фрейма данных pyspark вручную

Я пытаюсь вручную создать фрейм данных pyspark с учетом определенных данных:

row_in=[(1566429545575348),(40.353977),(-111.701859)]
rdd=sc.parallelize(row_in)
schema = StructType([StructField("time_epocs", DecimalType(),    True),StructField("lat", DecimalType(),True),StructField("long", DecimalType(),True)])
df_in_test=spark.createDataFrame(rdd,schema)

Это дает ошибку, когда я пытаюсь отобразить фрейм данных, поэтому я не уверен, как это сделать.

Однако документация Spark кажется мне немного запутанной здесь, и я получил похожие ошибки, когда попытался следовать этим инструкциям.

Кто-нибудь знает как это сделать?


person Josh    schedule 16.09.2019    source источник
comment
ваш код должен работать, если row_in=[(1566429545575348, 40.353977,-111.701859)]   -  person pault    schedule 17.09.2019
comment
Это не сработало даже с использованием row_in = [(1566429545575348, 40.353977, -111.701859)]   -  person Josh    schedule 18.09.2019
comment
Настоящая проблема возникает из-за того, что (1) - это целое число, а не кортеж. когда у вас есть только 1 элемент, вам нужно добавить кому, чтобы создать кортеж (1,)   -  person Steven    schedule 02.11.2020
comment
См. Мой ответ для полного обсуждения различных подходов и того, какой подход наиболее подходит для различных обстоятельств.   -  person Powers    schedule 24.02.2021


Ответы (6)


Простое создание фрейма данных:

df = spark.createDataFrame(
    [
        (1, "foo"),  # create your data here, be consistent in the types.
        (2, "bar"),
    ],
    ["id", "label"]  # add your column names here
)

df.printSchema()
root
 |-- id: long (nullable = true)
 |-- label: string (nullable = true)

df.show()
+---+-----+                                                                     
| id|label|
+---+-----+
|  1|  foo|
|  2|  bar|
+---+-----+

Согласно официальному документу:

  • когда схема представляет собой список имен столбцов, тип каждого столбца будет выводиться из данных.
  • Когда схема pyspark.sql.types.DataType или строка типа данных, она должна соответствовать реальным данным.
# Example with a datatype string
df = spark.createDataFrame(
    [
        (1, "foo"),  # Add your data here
        (2, "bar"),
    ],  
    "id int, label string",  # add column names and types here
)

# Example with pyspark.sql.types
from pyspark.sql import types as T
df = spark.createDataFrame(
    [
        (1, "foo"),  # Add your data here
        (2, "bar"),
    ],
    T.StructType(  # Define the whole schema within a StructType
        [
            T.StructField("id", T.IntegerType(), True),
            T.StructField("label", T.StringType(), True),
        ]
    ),
)


df.printSchema()
root
 |-- id: integer (nullable = true)  # id's type is forced at Int
 |-- label: string (nullable = true)
person Steven    schedule 16.09.2019

Чтобы разработать / построить ответ @Steven:

field = [
    StructField("MULTIPLIER", FloatType(), True),
    StructField("DESCRIPTION", StringType(), True),
]
schema = StructType(field)
multiplier_df = sqlContext.createDataFrame(sc.emptyRDD(), schema)

Создаст пустой фрейм данных.

Теперь мы можем просто добавить к нему строку:

l = [(2.3, "this is a sample description")]
rdd = sc.parallelize(l)
multiplier_df_temp = spark.createDataFrame(rdd, schema)
multiplier_df = wtp_multiplier_df.union(wtp_multiplier_df_temp)
person Josh    schedule 20.09.2019
comment
эта незакрытая скобка является частью синтаксиса? - person Luis Bosquez; 13.08.2020
comment
зачем вам объединять multiplier_df_temp с пустым фреймом данных? вы уже создали строку с правильной схемой. union бесполезен. - person Steven; 02.11.2020
comment
Этого подхода следует избегать, поскольку он излишне сложен. - person Powers; 24.02.2021

В этом ответе показано, как создать фрейм данных PySpark с createDataFrame, create_df и toDF.

df = spark.createDataFrame([("joe", 34), ("luisa", 22)], ["first_name", "age"])

df.show()
+----------+---+
|first_name|age|
+----------+---+
|       joe| 34|
|     luisa| 22|
+----------+---+

Вы также можете передать createDataFrame RDD и схему для создания DataFrames с большей точностью:

from pyspark.sql import Row
from pyspark.sql.types import *

rdd = spark.sparkContext.parallelize([
    Row(name='Allie', age=2),
    Row(name='Sara', age=33),
    Row(name='Grace', age=31)])

schema = schema = StructType([
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), False)])

df = spark.createDataFrame(rdd, schema)

df.show()
+-----+---+
| name|age|
+-----+---+
|Allie|  2|
| Sara| 33|
|Grace| 31|
+-----+---+

create_df из моего проекта Quinn позволяет использовать лучшее из обоих миров - он краткий и полностью описательный:

from pyspark.sql.types import *
from quinn.extensions import *

df = spark.create_df(
    [("jose", "a"), ("li", "b"), ("sam", "c")],
    [("name", StringType(), True), ("blah", StringType(), True)]
)

df.show()
+----+----+
|name|blah|
+----+----+
|jose|   a|
|  li|   b|
| sam|   c|
+----+----+

toDF не дает никаких преимуществ перед другими подходами:

from pyspark.sql import Row

rdd = spark.sparkContext.parallelize([
    Row(name='Allie', age=2),
    Row(name='Sara', age=33),
    Row(name='Grace', age=31)])
df = rdd.toDF()
df.show()
+-----+---+
| name|age|
+-----+---+
|Allie|  2|
| Sara| 33|
|Grace| 31|
+-----+---+
person Powers    schedule 24.02.2021

Расширение ответа @ Стивена:

data = [(i, 'foo') for i in range(1000)] # random data

columns = ['id', 'txt']    # add your columns label here

df = spark.createDataFrame(data, columns)

Примечание. Когда schema - это список имен столбцов, тип каждого столбца будет выводиться из данных.

Если вы хотите конкретно определить схему, сделайте следующее:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([StructField("id", IntegerType(), True), StructField("txt", StringType(), True)])
df1 = spark.createDataFrame(data, schema)

Выходы:

>>> df1
DataFrame[id: int, txt: string]
>>> df
DataFrame[id: bigint, txt: string]
person Ani Menon    schedule 26.07.2020

С форматированием

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
    [
        (1, "foo"),
        (2, "bar"),
    ],
    StructType(
        [
            StructField("id", IntegerType(), False),
            StructField("txt", StringType(), False),
        ]
    ),
)
print(df.dtypes)
df.show()
person Koroslak    schedule 09.03.2021

для новичков полный пример импорта данных из файла:

from pyspark.sql import SparkSession
from pyspark.sql.types import (
    ShortType,
    StringType,
    StructType,
    StructField,
    TimestampType,
)

import os

here = os.path.abspath(os.path.dirname(__file__))


spark = SparkSession.builder.getOrCreate()
schema = StructType(
    [
        StructField("id", ShortType(), nullable=False),
        StructField("string", StringType(), nullable=False),
        StructField("datetime", TimestampType(), nullable=False),
    ]
)

# read file or construct rows manually
df = spark.read.csv(os.path.join(here, "data.csv"), schema=schema, header=True)
person ehacinom    schedule 19.08.2020