Cara menggunakan use data in python

Lompati ke konten utama

Browser ini sudah tidak didukung.

Mutakhirkan ke Microsoft Edge untuk memanfaatkan fitur, pembaruan keamanan, dan dukungan teknis terkini.

Mulai Cepat: Buat pabrik data dan alur menggunakan Python

  • Artikel
  • 09/27/2022
  • 9 menit untuk membaca

Dalam artikel ini

  • Versi 1
  • Versi saat ini

BERLAKU UNTUK:

Cara menggunakan use data in python
Azure Data Factory
Cara menggunakan use data in python
Azure Synapse Analytics

Dalam mulai cepat ini, Anda membuat pabrik data dengan menggunakan Python. Alur dalam pabrik data ini menyalin data dari satu folder ke folder lain dalam penyimpanan Azure Blob.

Azure Data Factory adalah layanan integrasi data berbasis cloud yang memungkinkan Anda membuat alur kerja berbasis data untuk mengatur dan mengotomatiskan pemindahan data dan transformasi data. Dengan menggunakan Azure Data Factory, Anda dapat membuat dan menjadwalkan alur kerja berbasis data, yang disebut alur.

Alur dapat menyerap data dari penyimpanan data yang berbeda. Alur memproses atau mengubah data dengan menggunakan layanan komputasi seperti Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics, dan Azure Machine Learning. Alur menerbitkan data output ke penyimpanan data seperti aplikasi Azure Synapse Analytics untuk kecerdasan bisnis (BI).

Prasyarat

  • Akun Azure dengan langganan aktif. Membuat satu akun gratis.

  • Python 3.6+.

  • Akun Azure Storage.

  • Penjelajah Azure Storage (opsional).

  • Aplikasi di Azure Active Directory. Buat aplikasi dengan mengikuti langkah-langkah di tautan ini, menggunakan Opsi Autentikasi 2 (rahasia aplikasi), dan tetapkan aplikasi ke peran Kontributor dengan mengikuti petunjuk di artikel yang sama. Catat nilai berikut seperti yang ditampilkan dalam artikel untuk digunakan di langkah selanjutnya: ID aplikasi (klien), nilai rahasia klien, dan ID tenant.

Membuat dan mengunggah file input

  1. Luncurkan Notepad. Salin teks berikut dan simpan sebagai file input.txt di disk Anda.

    John|Doe
    Jane|Doe
    
  2. Gunakan alat seperti Azure Storage Explorer untuk membuat kontainer adfv2tutorial, dan folder input dalam kontainer. Lalu, unggah file input.txt ke folder input.

Menginstal paket Python

  1. Buka terminal atau perintah dengan hak istimewa admin. 

  2. Pertama, instal paket Python untuk sumber daya pengelolaan Azure:

    pip install azure-mgmt-resource
    
  3. Untuk menginstal paket Python untuk Data Factory, jalankan perintah berikut:

    pip install azure-mgmt-datafactory
    

    Python SDK untuk Data Factory mendukung Python 2.7 dan 3.6+.

  4. Untuk menginstal paket Python untuk autentikasi Azure Identity, jalankan perintah berikut:

    pip install azure-identity
    

Membuat klien pabrik data

  1. Buat file bernama datafactory.py. Tambahkan pernyataan berikut untuk menambahkan referensi ke namespace.

    from azure.identity import ClientSecretCredential 
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.datafactory import DataFactoryManagementClient
    from azure.mgmt.datafactory.models import *
    from datetime import datetime, timedelta
    import time
    
  2. Tambahkan fungsi berikut yang mencetak informasi.

    def print_item(group):
        """Print an Azure object instance."""
        print("\tName: {}".format(group.name))
        print("\tId: {}".format(group.id))
        if hasattr(group, 'location'):
            print("\tLocation: {}".format(group.location))
        if hasattr(group, 'tags'):
            print("\tTags: {}".format(group.tags))
        if hasattr(group, 'properties'):
            print_properties(group.properties)
    
    def print_properties(props):
        """Print a ResourceGroup properties instance."""
        if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
            print("\tProperties:")
            print("\t\tProvisioning State: {}".format(props.provisioning_state))
        print("\n\n")
    
    def print_activity_run_details(activity_run):
        """Print activity run details."""
        print("\n\tActivity run details\n")
        print("\tActivity run status: {}".format(activity_run.status))
        if activity_run.status == 'Succeeded':
            print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
            print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
            print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
        else:
            print("\tErrors: {}".format(activity_run.error['message']))
    
  3. Tambahkan kode berikut ke metode Utama yang membuat instans kelas DataFactoryManagementClient. Anda menggunakan objek ini untuk membuat pabrik data, layanan tertaut, himpunan data, dan alur. Anda juga menggunakan obyek ini untuk memantau detail eksekusi alur. Atur variabel subscription_id ke ID langganan Azure Anda. Untuk daftar wilayah Azure tempat Data Factory saat ini tersedia, pilih wilayah yang menarik minat Anda pada halaman berikut, lalu perluas Analitik untuk menemukan Data Factory: Produk yang tersedia menurut wilayah. Penyimpanan data (Azure Storage, Azure SQL Database, dll.) dan komputasi (HDInsight, dll.) yang digunakan oleh pabrik data dapat berada di wilayah lain.

    def main():
    
        # Azure subscription ID
        subscription_id = '<subscription ID>'
    
        # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
        rg_name = '<resource group>'
    
        # The data factory name. It must be globally unique.
        df_name = '<factory name>'
    
        # Specify your Active Directory client ID, client secret, and tenant ID
        credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') 
    
        # Specify following for Soverign Clouds, import right cloud constant and then use it to connect.
        # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD
        # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id)
    
        resource_client = ResourceManagementClient(credentials, subscription_id)
        adf_client = DataFactoryManagementClient(credentials, subscription_id)
    
        rg_params = {'location':'westus'}
        df_params = {'location':'westus'}
    

Buat pabrik data

Tambahkan kode berikut ke metode Utama yang membuat pabrik data. Jika grup sumber daya Anda sudah ada, komentari pernyataan create_or_update pertama.

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    #Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

Membuat layanan tertaut

Tambahkan kode berikut ke metode Utama yang membuat layanan tertaut Azure Storage.

Anda membuat layanan tertaut di pabrik data untuk menautkan penyimpanan data dan layanan komputasi ke pabrik data. Dalam mulai cepat ini, Anda hanya perlu membuat satu layanan tertaut Azure Storage sebagai sumber salinan dan penyimpanan sink, bernama "AzureStorageLinkedService" dalam sampel. Ganti <storageaccountname> dan <storageaccountkey> dengan nama dan kunci akun Azure Storage Anda.

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

Membuat himpunan data

Di bagian ini, Anda membuat dua himpunan data: satu untuk sumber dan yang lain untuk sink.

Membuat himpunan data untuk Azure Blob sumber

Tambahkan kode berikut ke metode Utama yang membuat himpunan data blob Azure. Untuk mengetahui informasi tentang properti himpunan data Azure Blob, lihat artikel Konektor blob Azure.

Anda menentukan himpunan data yang mewakili data sumber di Azure Blob. Himpunan data Blob ini mengacu pada layanan tertaut Azure Storage yang Anda buat di langkah sebelumnya.

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)) 
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

Membuat himpunan data untuk Azure Blob sink

Tambahkan kode berikut ke metode Utama yang membuat himpunan data blob Azure. Untuk mengetahui informasi tentang properti himpunan data Azure Blob, lihat artikel Konektor blob Azure.

Anda menentukan himpunan data yang mewakili data sumber di Azure Blob. Himpunan data Blob ini mengacu pada layanan tertaut Azure Storage yang Anda buat di langkah sebelumnya.

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

Membuat alur

Tambahkan kode berikut ke metode Utama yang membuat alur dengan aktivitas salin.

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

    #Create a pipeline with the copy activity
    
    #Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
    #Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
    
    p_name = 'copyPipeline'
    params_for_pipeline = {}

    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

Membuat eksekusi alur

Tambahkan kode berikut ke metode Utama yang memicu eksekusi alur.

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

Memantau eksekusi alur

Untuk memantau eksekusi alur, tambahkan kode berikut ke metode Utama:

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])

Sekarang, tambahkan pernyataan berikut untuk memanggil metode Utama saat program dijalankan:

# Start the main method
main()

Skrip lengkap

Berikut adalah kode Python lengkap:

from azure.identity import ClientSecretCredential 
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time

def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)

def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n\n")

def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))


def main():

    # Azure subscription ID
    subscription_id = '<subscription ID>'

    # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
    rg_name = '<resource group>'

    # The data factory name. It must be globally unique.
    df_name = '<factory name>'

    # Specify your Active Directory client ID, client secret, and tenant ID
    credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>') 
    resource_client = ResourceManagementClient(credentials, subscription_id)
    adf_client = DataFactoryManagementClient(credentials, subscription_id)

    rg_params = {'location':'westus'}
    df_params = {'location':'westus'}
 
    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    # Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
                                 dsOut_ref], source=blob_source, sink=blob_sink)

    # Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(
        activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])


# Start the main method
main()

Menjalankan kode

Bangun dan mulai aplikasi, lalu verifikasi eksekusi alur.

Konsol mencetak kemajuan pembuatan pabrik data, layanan tertaut, himpunan data, alur, dan eksekusi alur. Tunggu hingga Anda melihat detail eksekusi aktivitas salin dengan data berukuran dibaca/ditulis. Kemudian, gunakan alat seperti Penjelajah Azure Storage untuk memeriksa blob disalin ke "outputBlobPath" dari "inputBlobPath" seperti yang Anda tentukan dalam variabel.

Berikut sampel outputnya:

Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}

Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService

Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in

Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out

Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline

Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.

Activity run details

Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4

Membersihkan sumber daya

Untuk menghapus pabrik data, tambahkan kode berikut ke program:

adf_client.factories.delete(rg_name, df_name)

Langkah berikutnya

Alur dalam sampel ini menyalin data dari satu lokasi ke lokasi lain dalam penyimpanan blob Azure. Ikuti tutorial untuk mempelajari tentang penggunaan Data Factory dalam skenario lainnya.

Mengapa Python digunakan untuk data science?

Mudah Dipelajari Selain itu python juga mudah dipelajari terutama bagi para pemula data science. Dengan menggunakan python program yang dibuat akan lebih ringkas, jelas dan mudah dipahami dibandingkan dengan bahasa pemrograman lainnya.

Apa itu Data Wrangling Python?

Data wrangling adalah proses transformasi data mentah ke dalam format yang lebih rapi. Pertumbuhan jumlah data yang cepat dari sumber data yang berbeda inilah yang dimaksud dengan data mentah. Data mentah ini berisikan beragam tipe data.

Mengapa Python banyak digunakan?

Karena memiliki library yang luas dan desain berorientasi objek yang bersih, penggunaan bahasa pemrograman Python dapat menunjang produktivitas programer dibanding saat menggunakan bahasa pemrograman lain seperti Java dan C++.

Function apa yang digunakan untuk melihat jumlah baris dan kolom dari suatu data frame?

info() info() digunakan untuk menampilkan informasi detail tentang dataframe, seperti jumlah baris data, nama-nama kolom berserta jumlah data dan tipe datanya, dan sebagainya.