Cara menggunakan DATAFRAME.SORT_INDEX pada Python

Lompati ke konten utama

Browser ini sudah tidak didukung.

Mutakhirkan ke Microsoft Edge untuk memanfaatkan fitur, pembaruan keamanan, dan dukungan teknis terkini.

Pengenalan DataFrames - Python

  • Artikel
  • 08/22/2022
  • 10 menit untuk membaca

Dalam artikel ini

Artikel ini menyediakan beberapa contoh pengodean API DataFrame PySpark umum yang menggunakan Python.

DataFrame adalah struktur data berlabel dua dimensi dengan kolom dari jenis yang berpotensi berbeda. Anda dapat memikirkan DataFrame seperti spreadsheet, tabel SQL, atau kamus objek seri. Untuk informasi dan contoh selengkapnya, lihat Mulai Cepat di situs web dokumentasi Apache Spark.

Membuat DataFrames

Contoh ini menggunakan kelas Baris dari Spark SQL untuk membuat beberapa DataFrames. Konten dari beberapa DataFrames ini kemudian dicetak.

# import pyspark class Row from module sql
from pyspark.sql import *

# Create Example Data - Departments and Employees

# Create the Departments
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', '', 100000)
employee2 = Employee('xiangrui', 'meng', '', 120000)
employee3 = Employee('matei', None, '', 140000)
employee4 = Employee(None, 'wendell', '', 160000)
employee5 = Employee('michael', 'jackson', '', 80000)

# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee5, employee4])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

print(department1)
print(employee2)
print(departmentWithEmployees1.employees[0].email)

Output:

Row(id='123456', name='Computer Science')
Row(firstName='xiangrui', lastName='meng', email='', salary=120000)

Lihat Pembuatan DataFrame dalam dokumentasi PySpark.

Membuat DataFrames dari daftar baris

Contoh ini menggunakan metode createDataFrame dari SparkSession (yang diwakili oleh variabel spark yang disediakan Azure Databricks) untuk membuat DataFrame dari daftar baris dari contoh sebelumnya.

departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = spark.createDataFrame(departmentsWithEmployeesSeq1)

df1.show(truncate=False)

departmentsWithEmployeesSeq2 = [departmentWithEmployees3, departmentWithEmployees4]
df2 = spark.createDataFrame(departmentsWithEmployeesSeq2)

df2.show(truncate=False)

Output:

+--------------------------------+-----------------------------------------------------------------------------------------------------+
|department                      |employees                                                                                            |
+--------------------------------+-----------------------------------------------------------------------------------------------------+
|{123456, Computer Science}      |[{michael, armbrust, , 100000}, {xiangrui, meng, , 120000}]|
|{789012, Mechanical Engineering}|[{matei, null, , 140000}, {null, wendell, , 160000}]       |
+--------------------------------+-----------------------------------------------------------------------------------------------------+

+---------------------------+------------------------------------------------------------------------------------------------+
|department                 |employees                                                                                       |
+---------------------------+------------------------------------------------------------------------------------------------+
|{345678, Theater and Drama}|[{michael, jackson, , 80000}, {null, wendell, , 160000}]|
|{901234, Indoor Recreation}|[{xiangrui, meng, , 120000}, {matei, null, , 140000}] |
+---------------------------+------------------------------------------------------------------------------------------------+

Bekerja dengan DataFrames

Menyatukan dua DataFrames

Contoh ini menggunakan metode gabungan untuk menggabungkan baris dalam DataFrame yang ditentukan dalam contoh sebelumnya ke dalam DataFrame baru.

unionDF = df1.union(df2)
unionDF.show(truncate=False)

Output:

+--------------------------------+-----------------------------------------------------------------------------------------------------+
|department                      |employees                                                                                            |
+--------------------------------+-----------------------------------------------------------------------------------------------------+
|{123456, Computer Science}      |[{michael, armbrust, , 100000}, {xiangrui, meng, , 120000}]|
|{789012, Mechanical Engineering}|[{matei, null, , 140000}, {null, wendell, , 160000}]       |
|{345678, Theater and Drama}     |[{michael, jackson, , 80000}, {null, wendell, , 160000}]     |
|{901234, Indoor Recreation}     |[{xiangrui, meng, , 120000}, {matei, null, , 140000}]      |
+--------------------------------+-----------------------------------------------------------------------------------------------------+

Contoh ini menggunakan metode perintah rm (dbutils.fs.rm) dari utilitas sistem File (dbutils.fs) di Utilitas Databricks untuk menghapus file Parquet yang ditentukan, jika ada. Metode tulis kemudian menggunakan metode parquet dari DataFrameWriter yang dihasilkan untuk menulis DataFrame dari contoh sebelumnya ke lokasi yang ditentukan di ruang kerja Azure Databricks dalam format Parquet.

# Remove the file if it exists
dbutils.fs.rm("/tmp/databricks-df-example.parquet", True)
unionDF.write.format("parquet").save("/tmp/databricks-df-example.parquet")

Membaca DataFrame dari file Parquet

Contoh ini menggunakan metode baca untuk menggunakan metode parquet dari DataFrameReader yang dihasilkan untuk membaca file Parquet di lokasi yang ditentukan ke dalam DataFrame lalu menampilkan konten DataFrame.

parquetDF = spark.read.format("parquet").load("/tmp/databricks-df-example.parquet")
parquetDF.show(truncate=False)

Output:

+--------------------------------+-----------------------------------------------------------------------------------------------------+
|department                      |employees                                                                                            |
+--------------------------------+-----------------------------------------------------------------------------------------------------+
|{789012, Mechanical Engineering}|[{matei, null, , 140000}, {null, wendell, , 160000}]       |
|{901234, Indoor Recreation}     |[{xiangrui, meng, , 120000}, {matei, null, , 140000}]      |
|{345678, Theater and Drama}     |[{michael, jackson, , 80000}, {null, wendell, , 160000}]     |
|{123456, Computer Science}      |[{michael, armbrust, , 100000}, {xiangrui, meng, , 120000}]|
+--------------------------------+-----------------------------------------------------------------------------------------------------+

Meledakkan kolom karyawan

Contoh ini menggunakan metode pemilihan DataFrame sebelumnya untuk memproyeksikan sekumpulan ekspresi ke dalam DataFrame baru. Dalam hal ini, fungsi ledakan mengembalikan baris baru untuk setiap employees item. Metode alias menggunakan e sebagai singkatan untuk kolom. Metode selectExpr dari DataFrame baru memproyeksikan sekumpulan ekspresi SQL ke dalam DataFrame baru.

from pyspark.sql.functions import explode

explodeDF = unionDF.select(explode("employees").alias("e"))
flattenDF = explodeDF.selectExpr("e.firstName", "e.lastName", "e.email", "e.salary")

flattenDF.show(truncate=False)

Output:

+---------+--------+---------------------+------+
|firstName|lastName|email                |salary|
+---------+--------+---------------------+------+
|michael  |armbrust||100000|
|xiangrui |meng    ||120000|
|matei    |null    ||140000|
|null     |wendell ||160000|
|michael  |jackson |  |80000 |
|null     |wendell ||160000|
|xiangrui |meng    ||120000|
|matei    |null    ||140000|
+---------+--------+---------------------+------+

Gunakan filter() untuk mengembalikan baris yang cocok dengan predikat

Contoh ini menggunakan metode filter DataFrame sebelumnya untuk hanya menampilkan baris yang nilai bidang firstName-nya adalah xiangrui. Kemudian menggunakan metode pengurutan untuk mengurutkan hasil menurut nilai bidang lastName baris.

filterDF = flattenDF.filter(flattenDF.firstName == "xiangrui").sort(flattenDF.lastName)
filterDF.show(truncate=False)

Output:

+---------+--------+---------------------+------+
|firstName|lastName|email                |salary|
+---------+--------+---------------------+------+
|xiangrui |meng    ||120000|
|xiangrui |meng    ||120000|
+---------+--------+---------------------+------+

Contoh ini mirip dengan yang sebelumnya, kecuali hanya menampilkan baris dengan nilai bidang firstName-nya adalah xiangrui atau michael.

from pyspark.sql.functions import col, asc

# Use `|` instead of `or`
filterDF = flattenDF.filter((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
filterDF.show(truncate=False)

Output:

+---------+--------+---------------------+------+
|firstName|lastName|email                |salary|
+---------+--------+---------------------+------+
|michael  |armbrust||100000|
|michael  |jackson |  |80000 |
|xiangrui |meng    ||120000|
|xiangrui |meng    ||120000|
+---------+--------+---------------------+------+

Klausul ini where() setara dengan filter()

Contoh ini setara dengan contoh sebelumnya, kecuali menggunakan metode di mana alih-alih metode filter.

whereDF = flattenDF.where((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
whereDF.show(truncate=False)

Output:

+---------+--------+---------------------+------+
|firstName|lastName|email                |salary|
+---------+--------+---------------------+------+
|michael  |armbrust||100000|
|michael  |jackson |  |80000 |
|xiangrui |meng    ||120000|
|xiangrui |meng    ||120000|
+---------+--------+---------------------+------+

Ganti nilai null dengan -- menggunakan fungsi DataFrame Na

Contoh ini menggunakan metode fillna dari DataFrame sebelumnya flattenDF untuk mengganti semua null nilai dengan karakter --.

nonNullDF = flattenDF.fillna("--")
nonNullDF.show(truncate=False)

Sebelumnya:

+---------+--------+---------------------+------+
|firstName|lastName|email                |salary|
+---------+--------+---------------------+------+
|michael  |armbrust||100000|
|xiangrui |meng    ||120000|
|matei    |null    ||140000|
|null     |wendell ||160000|
|michael  |jackson |  |80000 |
|null     |wendell ||160000|
|xiangrui |meng    ||120000|
|matei    |null    ||140000|
+---------+--------+---------------------+------+

Setelahnya:

+---------+--------+---------------------+------+
|firstName|lastName|email                |salary|
+---------+--------+---------------------+------+
|michael  |armbrust||100000|
|xiangrui |meng    ||120000|
|matei    |--      ||140000|
|--       |wendell ||160000|
|michael  |jackson |  |80000 |
|--       |wendell ||160000|
|xiangrui |meng    ||120000|
|matei    |--      ||140000|
+---------+--------+---------------------+------+

Ambil hanya baris dengan hilang firstName atau lastName

Contoh ini menggunakan metode filter DataFrame sebelumnya flattenDF bersama dengan metode isNull dari kelas Kolom untuk menampilkan semua baris dengan bidang firstName atau lastName memiliki nilai null.

filterNonNullDF = flattenDF.filter(col("firstName").isNull() | col("lastName").isNull()).sort("email")
dfilterNonNullDF.show(truncate=False)

Output:

+---------+--------+---------------------+------+
|firstName|lastName|email                |salary|
+---------+--------+---------------------+------+
|null     |wendell ||160000|
|null     |wendell ||160000|
|matei    |null    ||140000|
|matei    |null    ||140000|
+---------+--------+---------------------+------+

Contoh agregasi yang menggunakan agg() dan countDistinct()

Contoh ini menggunakan metode pemilihan, groupBy, dan agg dari DataFrame nonNullDF sebelumnya untuk memilih hanya bidang firstName dan lastName baris, mengelompokkan hasil menurut nilai bidang firstName, lalu menampilkan jumlah nilai bidang lastName yang berbeda untuk masing-masing nama depan tersebut. Untuk setiap nama depan, hanya satu nama belakang berbeda yang ditemukan, kecuali michael yang memiliki michael armbrust dan michael jackson.

from pyspark.sql.functions import countDistinct

countDistinctDF = nonNullDF.select("firstName", "lastName") \
  .groupBy("firstName") \
  .agg(countDistinct("lastName").alias("distinct_last_names"))

countDistinctDF.show()

Output:

+---------+-------------------+
|firstName|distinct_last_names|
+---------+-------------------+
|     null|                  1|
| xiangrui|                  1|
|    matei|                  0|
|  michael|                  2|
+---------+-------------------+

Membandingkan paket fisik kueri DataFrame dan SQL

Contoh ini menggunakan metode penjelasan dari DataFrame contoh sebelumnya untuk mencetak hasil rencana fisik untuk tujuan penelusuran kesalahan.

countDistinctDF.explain()

Contoh ini menggunakan metode createOrReplaceTempView dari DataFrame contoh sebelumnya untuk membuat tampilan sementara lokal dengan DataFrame ini. Tampilan sementara ini ada sampai sesi Spark terkait keluar dari cakupan. Contoh ini kemudian menggunakan metode sql sesi Spark untuk menjalankan kueri pada tampilan sementara ini. Rencana fisik untuk kueri ini kemudian ditampilkan. Hasil panggilan explain ini harus sama dengan panggilan explain sebelumnya.

# Register the DataFrame as a temporary view so that we can query it by using SQL.
nonNullDF.createOrReplaceTempView("databricks_df_example")

# Perform the same query as the preceding DataFrame and then display its physical plan.
countDistinctDF_sql = spark.sql('''
  SELECT firstName, count(distinct lastName) AS distinct_last_names
  FROM databricks_df_example
  GROUP BY firstName
''')

countDistinctDF_sql.explain()

Menjumlahkan semua gaji

Contoh ini menggunakan metode agg dari DataFrame nonNullDF sebelumnya untuk menampilkan jumlah semua gaji baris.

salarySumDF = nonNullDF.agg({"salary" : "sum"})
salarySumDF.show()

Output:

+-----------+
|sum(salary)|
+-----------+
|    1020000|
+-----------+

Contoh ini menampilkan jenis data yang mendasari salary bidang untuk DataFrame sebelumnya, yang merupakan bigint.

match = 'salary'

for key, value in nonNullDF.dtypes:
  if key == match:
    print(f"Data type of '{match}' is '{value}'.")

Output:

Data type of 'salary' is 'bigint'.

Mencetak statistik ringkasan untuk gaji

Contoh ini menggunakan metode deskripsi dari DataFrame sebelumnya nonNullDF untuk menampilkan statistik dasar bidang salary.

nonNullDF.describe("salary").show()

Output:

+-------+------------------+
|summary|            salary|
+-------+------------------+
|  count|                 8|
|   mean|          127500.0|
| stddev|28157.719063467175|
|    min|             80000|
|    max|            160000|
+-------+------------------+

Contoh menggunakan panda dan integrasi Matplotlib

Contoh ini menggunakan pustaka pandas dan Matplotlib untuk menampilkan informasi DataFrame nonNullDF sebelumnya secara grafis. Contoh ini menggunakan metode toPandas dari DataFrame untuk menghasilkan konten DataFrame sebagai pandas DataFrame, dan menggunakan metode clf dan plotmatplotlib.pyplot di Matplotlib untuk menghapus permukaan plotting dan kemudian membuat plot aktual.

import pandas as pd
import matplotlib.pyplot as plt
plt.clf()
pdDF = nonNullDF.toPandas()
pdDF.plot(x='firstName', y='salary', kind='bar', rot=45)
display()

Output:

Cara menggunakan DATAFRAME.SORT_INDEX pada Python

Pembersihan: hapus file Parquet

Contoh ini menggunakan metode perintah rm (dbutils.fs.rm) dari utilitas sistem File (dbutils.fs) di Utilitas Databricks untuk menghapus file Parquet yang awalnya ditulis pada awal artikel ini.

dbutils.fs.rm("/tmp/databricks-df-example.parquet", True)

Faq DataFrame

FAQ ini membahas kasus penggunaan umum dan contoh penggunaan menggunakan API yang tersedia. Untuk deskripsi API yang lebih rinci, lihat dokumentasi PySpark.

Bagaimana cara mendapatkan performa yang lebih baik dengan DATAFrame UDFs?

Jika fungsionalitas ada dalam fungsi bawaan, menggunakan ini akan berkinerja lebih baik. Contoh penggunaan berikut. Lihat juga referensi API Fungsi PySpark. Kami menggunakan fungsi bawaan dan API withColumn() untuk menambahkan kolom baru. Anda juga bisa menggunakan withColumnRenamed() untuk mengganti kolom yang ada setelah transformasi.

from pyspark.sql import functions as F
from pyspark.sql.types import *

# Build an example DataFrame dataset to work with.
dbutils.fs.rm("/tmp/dataframe_sample.csv", True)
dbutils.fs.put("/tmp/dataframe_sample.csv", """id|end_date|start_date|location
1|2015-10-14 00:00:00|2015-09-14 00:00:00|CA-SF
2|2015-10-15 01:00:20|2015-08-14 00:00:00|CA-SD
3|2015-10-16 02:30:00|2015-01-14 00:00:00|NY-NY
4|2015-10-17 03:00:20|2015-02-14 00:00:00|NY-NY
5|2015-10-18 04:30:00|2014-04-14 00:00:00|CA-SD
""", True)

df = spark.read.format("csv").options(header='true', delimiter = '|').load("/tmp/dataframe_sample.csv")
df.printSchema()
# Instead of registering a UDF, call the builtin functions to perform operations on the columns.
# This will provide a performance improvement as the builtins compile and run in the platform's JVM.

# Convert to a Date type
df = df.withColumn('date', F.to_date(df.end_date))

# Parse out the date only
df = df.withColumn('date_only', F.regexp_replace(df.end_date,' (\d+)[:](\d+)[:](\d+).*$', ''))

# Split a string and index a field
df = df.withColumn('city', F.split(df.location, '-')[1])

# Perform a date diff function
df = df.withColumn('date_diff', F.datediff(F.to_date(df.end_date), F.to_date(df.start_date)))
df.createOrReplaceTempView("sample_df")
display(sql("select * from sample_df"))

Saya ingin mengonversi DataFrame kembali ke untai JSON untuk dikirim kembali ke Kafka.

Ada fungsi yang mendasarinya toJSON() yang mengembalikan RDD untai (karakter) JSON menggunakan nama kolom dan skema untuk menghasilkan catatan JSON.

rdd_json = df.toJSON()
rdd_json.take(2)

UDF saya mengambil parameter termasuk kolom untuk beroperasi. Bagaimana cara melewati parameter ini?

Ada fungsi yang tersedia yang disebut lit() yang membuat kolom konstanta.

from pyspark.sql import functions as F

add_n = udf(lambda x, y: x + y, IntegerType())

# We register a UDF that adds a column to the DataFrame, and we cast the id column to an Integer type.
df = df.withColumn('id_offset', add_n(F.lit(1000), df.id.cast(IntegerType())))
display(df)
# any constants used by UDF will automatically pass through to workers
N = 90
last_n_days = udf(lambda x: x < N, BooleanType())

df_filtered = df.filter(last_n_days(df.date_diff))
display(df_filtered)

Saya memiliki tabel di metastore Apache Hive dan saya ingin mengakses tabel sebagai DataFrame. Apa cara terbaik untuk mendefinisikan ini?

Ada beberapa cara untuk menentukan DataFrame dari tabel terdaftar. Memanggil table(tableName) atau memilih dan memfilter kolom tertentu menggunakan kueri SQL:

# Both return DataFrame types
df_1 = table("sample_df")
df_2 = spark.sql("select * from sample_df")

Saya ingin menghapus semua tabel yang di-cache pada kluster saat ini.

Ada API yang tersedia untuk melakukan ini di tingkat global atau per tabel.

spark.catalog.clearCache()
spark.catalog.cacheTable("sample_df")
spark.catalog.uncacheTable("sample_df")

Saya ingin menghitung agregat pada kolom. Apa cara terbaik untuk melakukan ini?

Metode agg(*exprs) yang mengambil daftar nama kolom dan ekspresi untuk jenis agregasi yang ingin Anda hitung. See pyspark.sql.DataFrame.agg. Anda dapat menggunakan fungsi bawaan dalam ekspresi untuk setiap kolom.

# Provide the min, count, and avg and groupBy the location column. Diplay the results
agg_df = df.groupBy("location").agg(F.min("id"), F.count("id"), F.avg("date_diff"))
display(agg_df)

Saya ingin menulis DataFrames ke Parquet, tetapi ingin mempartisi pada kolom tertentu.

Anda dapat menggunakan API berikut ini untuk menyelesaikan tugas ini. Pastikan kode tidak membuat kolom yang mempartisi dalam jumlah besar dengan himpunan data jika tidak, overhead metadata dapat menyebabkan perlambatan yang signifikan. Jika ada tabel SQL kembali oleh direktori ini, Anda harus memanggil refresh table <table-name> untuk memperbarui metadata sebelum kueri.

df = df.withColumn('end_month', F.month('end_date'))
df = df.withColumn('end_year', F.year('end_date'))
df.write.partitionBy("end_year", "end_month").format("parquet").save("/tmp/sample_table")
display(dbutils.fs.ls("/tmp/sample_table"))

Bagaimana cara menangani kasus dengan benar di mana saya ingin memfilter data NULL?

Anda dapat menggunakan filter() dan memberikan sintaksis serupa seperti yang Anda lakukan dengan kueri SQL.

null_item_schema = StructType([StructField("col1", StringType(), True),
                               StructField("col2", IntegerType(), True)])
null_df = spark.createDataFrame([("test", 1), (None, 2)], null_item_schema)
display(null_df.filter("col1 IS NOT NULL"))

Bagaimana cara menyimpulkan skema menggunakan CSVatau spark-avro pustaka?

Ada inferSchema bendera pilihan. Menyediakan header memastikan penamaan kolom yang sesuai.

adult_df = spark.read.\
    format("com.spark.csv").\
    option("header", "false").\
    option("inferSchema", "true").load("dbfs:/databricks-datasets/adult/adult.data")
adult_df.printSchema()

Anda memiliki himpunan data untai (karakter) yang dibatasi yang ingin Anda konversi ke tipe datanya. Bagaimana Anda akan mencapai ini?

Gunakan API RDD untuk memfilter baris yang salah bentuk dan memetakan nilai ke jenis yang sesuai. Kami menentukan fungsi yang memfilter item menggunakan ekspresi reguler.