Menggabungkan dua tabel di python

Saya memiliki daftar Pandas dataframe yang ingin saya gabungkan menjadi satu Pandas dataframe. Saya menggunakan Python 2.7.10 dan Pandas 0.16.2

Saya membuat daftar dataframe dari:

import pandas as pd
dfs = []
sqlall = "select * from mytable"

for chunk in pd.read_sql_query(sqlall , cnxn, chunksize=10000):
    dfs.append(chunk)

Ini mengembalikan daftar bingkai data

type(dfs[0])
Out[6]: pandas.core.frame.DataFrame

type(dfs)
Out[7]: list

len(dfs)
Out[8]: 408

Berikut ini beberapa contoh data

# sample dataframes
d1 = pd.DataFrame({'one' : [1., 2., 3., 4.], 'two' : [4., 3., 2., 1.]})
d2 = pd.DataFrame({'one' : [5., 6., 7., 8.], 'two' : [9., 10., 11., 12.]})
d3 = pd.DataFrame({'one' : [15., 16., 17., 18.], 'two' : [19., 10., 11., 12.]})

# list of dataframes
mydfs = [d1, d2, d3]

Saya ingin menggabungkan d1, d2, dan d3 menjadi satu pandas dataframe. Atau, metode membaca tabel ish besar langsung ke dalam kerangka data saat menggunakan opsi chunksize akan sangat membantu.

Contoh ini menggunakan dataset yang diunduh darihttp://everypolitician.org/kepadasample-datasetbucket di Amazon Simple Storage Service (Amazon S3): s3://awsglue-datasets/examples/us-legislators/all. Set data tersebut berisi data dalam format JSON tentang legislator Amerika Serikat dan jabatan yang mereka telah jabat di DPR AS dan Senat, dan telah dimodifikasi sedikit dan dibuat tersedia dalam sebuah bucket Amazon S3 publik untuk tujuan tutorial ini.

Anda dapat menemukan kode sumber untuk contoh ini dijoin_and_relationalize.pyfile diAWS Gluerepositori sampelpada GitHub website.

Dengan menggunakan data ini, tutorial ini menunjukkan cara untuk melakukan hal berikut:

  • Menggunakan sebuahAWS Gluecrawler untuk mengklasifikasikan objek yang disimpan dalam sebuah bucket Amazon S3 publik dan menyimpan skemanya ke dalamAWSKatalog Data Glue.

  • Memeriksa metadata tabel dan skema yang dihasilkan dari perayapan.

  • Menulis sebuah skrip extract, transform, and load (ETL) Python yang menggunakan metadata tersebut dalam Katalog Data untuk melakukan hal berikut:

    • Menggabungkan data dalam file sumber yang berbeda bersama-sama ke dalam sebuah tabel data tunggal (yakni, denormalisasi data).

    • Mem-filter tabel yang digabungkan ke dalam tabel terpisah berdasarkan jenis legislator.

    • Menuliskan data yang dihasilkan untuk memisahkan file Apache Parquet untuk analisis nanti.

Cara termudah untuk men-debug Python atau PySpark skrip adalah untuk membuat titik akhir pengembangan dan menjalankan kode Anda di sana. Kami menyarankan Anda untuk memulai dengan menyiapkan sebuah titik akhir pengembangan untuk tempat bekerja. Untuk informasi selengkapnya, lihat Melihat properti titik akhir pengembangan.

Langkah 1: Melakukan Perayapan data di bucket Amazon S3

  1. Masuk keAWS Management Consoledan bukaAWS Gluekonsol dihttps://console.aws.amazon.com/glue/.

  2. Dengan mengikuti langkah-langkah di Bekerja dengan crawler diAWS Gluekonsol, buat sebuah crawler baru yang dapat melakukan perayapan set data s3://awsglue-datasets/examples/us-legislators/all ke dalam basis data bernama legislators di Katalog Data Glue AWS. Contoh data sudah ada di bucket Amazon S3 ini.

  3. Jalankan crawler baru tersebut, dan kemudian periksa basis data legislators.

    Crawler tersebut membuat tabel metadata berikut:

    • persons_json

    • memberships_json

    • organizations_json

    • events_json

    • areas_json

    • countries_r_json

    Ini adalah koleksi tabel semi-dinormalisasi yang berisi legislator dan riwayat mereka.

Langkah 2: Tambahkan skrip boilerplate ke notebook endpoint pengembangan

Tempelkan skrip boilerplate berikut ke notebook titik akhir pengembangan untuk mengimporAWS Glueperpustakaan yang Anda butuhkan, dan mengatur satuGlueContext:


import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

Langkah 3: Memeriksa skema dari data dalam Katalog Data

Selanjutnya, Anda dapat dengan mudah membuat memeriksa DynamicFrame dariAWSKatalog Data Glue, dan memeriksa skema data. Misalnya, untuk melihat skema tabel persons_json, tambahkan hal berikut di notebook Anda:


persons = glueContext.create_dynamic_frame.from_catalog(
             database="legislators",
             table_name="persons_json")
print "Count: ", persons.count()
persons.printSchema()

Berikut adalah output dari panggilan cetak:


Count:  1961
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string

Setiap orang dalam tabel adalah anggota dari beberapa badan kongres AS.

Untuk melihat skema tabel memberships_json, ketik berikut ini:


memberships = glueContext.create_dynamic_frame.from_catalog(
                 database="legislators",
                 table_name="memberships_json")
print "Count: ", memberships.count()
memberships.printSchema()

Output adalah sebagai berikut:


Count:  10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string

organizations adalah partai dan dua majelis Kongres, Senat dan Dewan Perwakilan Rakyat. Untuk melihat skema tabel organizations_json, ketik berikut ini:


orgs = glueContext.create_dynamic_frame.from_catalog(
           database="legislators",
           table_name="organizations_json")
print "Count: ", orgs.count()
orgs.printSchema()

Output adalah sebagai berikut:


Count:  13
root
|-- classification: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string

Langkah 4: Mem-filter Data

Berikutnya, simpan hanya bidang yang Anda inginkan, dan ubah nama id menjadi org_id. Set data cukup kecil sehingga Anda dapat melihat semuanya.

toDF() mengkonversi DynamicFrame menjadi DataFrame Apache Spark, sehingga Anda dapat menerapkan transformasi yang sudah ada di Apache Spark SQL:


orgs = orgs.drop_fields(['other_names',
                        'identifiers']).rename_field(
                            'id', 'org_id').rename_field(
                               'name', 'org_name')
orgs.toDF().show()

Berikut ini menunjukkan outputnya:


+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification|              org_id|            org_name|               links|seats|       type|               image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|         party|            party/al|                  AL|                null| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/democrat-li...|    Democrat-Liberal|[[website,http://...| null|       null|                null|
|   legislature|d56acebe-8fdc-47b...|House of Represen...|                null|  435|lower house|                null|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|party/new_progres...|     New Progressive|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/popular_dem...|    Popular Democrat|[[website,http://...| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/republican-...|Republican-Conser...|[[website,http://...| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|   legislature|8fa6c3d2-71dc-478...|              Senate|                null|  100|upper house|                null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+

Ketik berikut ini untuk melihat organizations yang muncul di memberships:


memberships.select_fields(['organization_id']).toDF().distinct().show()

Berikut ini menunjukkan outputnya:


+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+

Langkah 5: Menyatukan semuanya

Sekarang, gunakanAWS Glueuntuk menggabungkan tabel-tabel relasional ini dan buat satu tabel riwayat penuh legislatormembershipsdan mereka yang sesuaiorganizations.

  1. Pertama, gabungkan persons dan memberships pada id dan person_id.

  2. Selanjutnya, gabungkan hasilnya dengan orgs pada org_id dan organization_id.

  3. Kemudian, buang bidang-bidang redundan-nya, person_id dan org_id.

Anda dapat melakukan semua operasi ini dalam satu baris kode (diperpanjang):


l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
print "Count: ", l_history.count()
l_history.printSchema()

Output adalah sebagai berikut:


Count:  10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string

Anda sekarang memiliki tabel akhir yang dapat Anda gunakan untuk analisis. Anda dapat menuliskannya dalam format yang ringkas dan efisien untuk analitik—yaitu Parquet—yang dapat Anda jalankan SQL padanyaAWS Glue, Amazon Athena, atau Amazon Redshift Spectrum.

Panggilan berikut menulis tabel di beberapa file untuk mendukung pembacaan paralel cepat ketika melakukan analisis kemudian:


glueContext.write_dynamic_frame.from_options(frame = l_history,
          connection_type = "s3",
          connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"},
          format = "parquet")

Untuk menempatkan semua data riwayat ke dalam satu file, Anda harus mengubahnya menjadi sebuah bingkai data, melakukan pemartisian ulang, dan menuliskannya:


s_history = l_history.toDF().repartition(1)
s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')

Atau, jika Anda ingin memisahkannya berdasarkan Senat dan DPR:


l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part',
                               partitionBy=['org_name'])

Langkah 6: Mengubah data untuk database relasional

AWS Gluememudahkan untuk menulis data ke basis data relasional seperti Amazon Redshift, bahkan dengan data semi-terstruktur. Ia menawarkan transformasi relationalize, yang meratakan DynamicFrames tidak peduli seberapa kompleks objek dalam bingkai.

Dengan menggunakan l_history DynamicFrame dalam contoh ini, masukkan nama dari tabel akar (hist_root) dan path kerja sementara untuk relationalize. Ini akan mengembalikan DynamicFrameCollection. Anda kemudian dapat mencantumkan nama-nama DynamicFrames dalam koleksi itu:


dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/")
dfc.keys()

Berikut ini adalah hasil dari panggilan keys:


[u'hist_root', u'hist_root_contact_details', u'hist_root_links',
 u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']

Relationalize memecahkan tabel riwayat ke dalam enam tabel baru: tabel akar yang berisi catatan untuk setiap objek dalam DynamicFrame, dan tabel tambahan untuk array. Array penanganan dalam basis data relasional sering bersifat suboptimal, terutama karena array tersebut menjadi besar. Memisahkan array ke dalam tabel yang berbeda membuat kueri lebih cepat.

Selanjutnya, lihat pemisahan dengan memeriksa contact_details:


l_history.select_fields('contact_details').printSchema()
dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

Berikut ini adalah hasil dari panggilan show:


root
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10|    0|                     fax|                         |
| 10|    1|                        |             202-225-1314|
| 10|    2|                   phone|                         |
| 10|    3|                        |             202-225-3772|
| 10|    4|                 twitter|                         |
| 10|    5|                        |          MikeRossUpdates|
| 75|    0|                     fax|                         |
| 75|    1|                        |             202-225-7856|
| 75|    2|                   phone|                         |
| 75|    3|                        |             202-225-2711|
| 75|    4|                 twitter|                         |
| 75|    5|                        |                SenCapito|
+---+-----+------------------------+-------------------------+

Bidang contact_details adalah sebuah array dari struct di DynamicFrame asli. Setiap elemen dari array tersebut adalah baris terpisah dalam tabel tambahan, yang diindeks berdasarkan index. id di sini adalah sebuah kunci asing dalam tabel hist_root dengan kunci contact_details:


dfc.select('hist_root').toDF().where(
    "contact_details = 10 or contact_details = 75").select(
       ['id', 'given_name', 'family_name', 'contact_details']).show()

Berikut hasilnya:


+--------------------+----------+-----------+---------------+
|                  id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...|      Mike|       Ross|             10|
|e3c60f34-7d1b-4c0...|   Shelley|     Capito|             75|
+--------------------+----------+-----------+---------------+

Perhatikan dalam perintah ini bahwa toDF() dan kemudian ekspresi where digunakan untuk mem-filter baris yang ingin Anda lihat.

Jadi, dengan menggabungkan tabel hist_root dengan tabel tambahan akan memungkinkan Anda melakukan hal berikut:

  • Memuat data ke dalam basis data tanpa dukungan array.

  • Meng-kueri setiap item individu dalam array menggunakan SQL.

Simpan dan akses kredensi Amazon Redshift Anda dengan amanAWS Gluekoneksi. Untuk informasi tentang cara membuat koneksi Anda sendiri, lihat Mendefinisikan koneksi diAWS Glue Data Catalog.

Anda sekarang siap untuk menulis data Anda ke koneksi dengan bersepeda melaluiDynamicFramessatu per satu:


for df_name in dfc.keys():
  m_df = dfc.select(df_name)
  print "Writing to table: ", df_name
  glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)

Pengaturan koneksi Anda akan berbeda berdasarkan jenis database relasional Anda:

  • Untuk petunjuk tentang menulis ke Amazon Redshift berkonsultasiPemindahan data ke dan dari Amazon Redshift.

  • Untuk database lain, konsultasikanJenis koneksi dan opsi untuk ETL diAWS Glue.

Kesimpulan

Secara keseluruhan,AWS GlueSangat fleksibel. Ia memungkinkan Anda mencapai, dalam beberapa baris kode, apa yang biasanya akan memerlukan waktu berhari-hari untuk ditulis. Anda dapat menemukan secara keseluruhan source-to-target Script ETL dalam file Pythonjoin_and_relationalize.pydi dalamAWS Gluemencicipidi atas GitHub.