Hướng dẫn convert nested json to parquet python - chuyển đổi json lồng nhau sang parquet python

Đây không chính xác là câu trả lời đúng, nhưng nó có thể giúp.

Chúng tôi có thể cố gắng chuyển đổi từ điển của bạn thành DataFrame của gấu trúc và sau đó, hãy viết tệp này thành tệp .parquet:

import pandas as pd
from fastparquet import write, ParquetFile

d = {
    "product": {
        "id": "abcdef",
        "price": 19.99,
        "specs": {
            "voltage": "110v",
            "color": "white"
        }
    },
    "user": "Daniel Severo"
}

df_test = pd.DataFrame(d)
write('file_test.parquet', df_test)

Điều này sẽ tăng và lỗi:

ValueError: Can't infer object conversion type: 0                                   abcdef
1                                    19.99
2    {'voltage': '110v', 'color': 'white'}
Name: product, dtype: object

Vì vậy, một giải pháp dễ dàng là chuyển đổi cột product thành danh sách:

df_test['product'] = df_test['product'].apply(lambda x: [x])

# this should now works
write('file_test.parquet', df_test)

# and now compare the file with the initial DataFrame
ParquetFile('file_test.parquet').to_pandas().explode('product')
    index            product                                 user
0   id               abcdef                             Daniel Severo
1   price             19.99                             Daniel Severo
2   specs   {'voltage': '110v', 'color': 'white'}       Daniel Severo

tl;dr

Bạn có thể sử dụng Coiled, nền tảng DASK dựa trên đám mây, để dễ dàng chuyển đổi dữ liệu JSON lớn thành một khung dữ liệu bảng được lưu trữ dưới dạng sàn trong một cửa hàng đối tượng đám mây. Bắt đầu bằng cách lặp lại với DASK trước đây để xây dựng và kiểm tra đường ống của bạn, sau đó chuyển cùng một quy trình công việc để cuộn với các thay đổi mã tối thiểu. Chúng tôi trình bày chuyển đổi JSON thành Parquet cho bộ dữ liệu 75GB chạy mà không tải xuống tập dữ liệu về máy cục bộ của bạn.

Tôi có thể chuyển đổi JSON thành sàn gỗ không?

Bạn có thể sử dụng Coiled, nền tảng DASK dựa trên đám mây, để dễ dàng chuyển đổi dữ liệu JSON lớn thành một khung dữ liệu bảng được lưu trữ dưới dạng sàn trong một cửa hàng đối tượng đám mây.

Parquet có hỗ trợ JSON lồng nhau không?

Các loại lồng nhau có thể được lưu trữ trong: Parquet, trong đó bạn có thể có nhiều cột phức tạp có chứa mảng và đối tượng. Các tệp JSON phân cấp, nơi bạn có thể đọc một tài liệu JSON phức tạp dưới dạng một cột duy nhất.

Hướng dẫn convert nested json to parquet python - chuyển đổi json lồng nhau sang parquet python

Làm thế nào để bạn chuyển đổi JSON lồng nhau thành DataFrame trong pyspark?

  1. Chuyển đổi sang DataFrame Thêm chuỗi JSON dưới dạng loại thu thập và chuyển nó dưới dạng đầu vào cho Spark. createdataset. Điều này chuyển đổi nó thành một khung dữ liệu. Trình đọc JSON tự động lấy lược đồ từ chuỗi JSON.
  2. Làm thế nào để bạn phân tích dữ liệu JSON lồng nhau trong Python?

Bắt đầu bằng cách nhập thư viện JSON. Chúng tôi sử dụng chức năng mở để đọc tệp JSON và sau đó là Json.Load () phương thức để phân tích chuỗi JSON vào từ điển Python có tên SuperHerosquad. Đó là nó!

Bạn có thể sử dụng Coiled, nền tảng DASK dựa trên đám mây, để dễ dàng chuyển đổi dữ liệu JSON lớn thành một khung dữ liệu bảng được lưu trữ dưới dạng sàn trong một cửa hàng đối tượng đám mây. Bắt đầu bằng cách lặp lại với DASK trước đây để xây dựng và kiểm tra đường ống của bạn, sau đó chuyển cùng một quy trình công việc để cuộn với các thay đổi mã tối thiểu. Chúng tôi trình bày chuyển đổi JSON thành Parquet cho bộ dữ liệu 75GB chạy mà không tải xuống tập dữ liệu về máy cục bộ của bạn.

* * *

Tại sao chuyển đổi JSON thành sàn gỗ

  1. Dữ liệu được cạo từ Web ở định dạng JSON lồng nhau thường cần được chuyển đổi thành định dạng bảng để phân tích dữ liệu khám phá (EDA) và/hoặc học máy (ML). Định dạng tệp Parquet là một phương pháp tối ưu để lưu trữ dữ liệu bảng, cho phép các hoạt động như cắt tỉa cột và lọc đẩy dự đoán giúp tăng đáng kể hiệu suất của quy trình công việc của bạn. & NBSP;

Bài đăng này cho thấy một đường ống JSON đến Parquet cho bộ dữ liệu 75GB, sử dụng DASK và cuộn để chuyển đổi và lưu trữ dữ liệu sang cửa hàng đối tượng đám mây. Đường ống này bỏ qua sự cần thiết của bộ dữ liệu được lưu trữ cục bộ trên máy của bạn.

Sau khi hoàn thành sổ ghi chép này, bạn sẽ có thể:

Xây dựng và kiểm tra quy trình làm việc ETL của bạn trước tiên, sử dụng một tệp kiểm tra duy nhất đại diện cho 1 giờ dữ liệu hoạt động của GitHub

Quy mô cùng quy trình công việc ra đám mây bằng cách sử dụng cuộn để xử lý toàn bộ tập dữ liệu.

  1. SPOILER ALERT - Bạn sẽ chạy cùng một mã chính xác trong cả hai trường hợp, chỉ thay đổi nơi chạy các tính toán.

Bạn có thể tìm thấy một ví dụ đầy đủ mã trong sổ ghi chép này. Để chạy máy tính xách tay cục bộ, hãy xây dựng một môi trường Conda với tệp Môi trường.yml nằm trong repo Notebook.There are several different schemas overlapping in the data, which means you can't simply cast this into a pandas or Dask DataFrame. Instead, you could filter out one subset, such as PushEvents, and work with that.

JSON to Parquet: Xây dựng đường ống của bạn tại địa phương & NBSP;

Bạn có thể áp dụng chức năng quy trình (được xác định trong sổ ghi chép) để làm phẳng dữ liệu JSON lồng nhau vào định dạng bảng, với mỗi hàng hiện đại diện cho một cam kết github duy nhất.

//records.filter(lambda Bản ghi: Ghi ["type"] == "PUSHEVENT"). quy trình) .flatten () //]]>>

Sau đó chuyển dữ liệu này vào một khung dữ liệu bằng phương thức .to_dataFrame ().

// df = dẹt.to_dataframe () //]]>>>

  1. Tải dữ liệu vào một thư mục cục bộ trong định dạng tệp Parquet

Bây giờ bạn đã được thiết lập để ghi DataFrame của mình vào thư mục cục bộ dưới dạng tệp .parquet bằng phương thức dask dataFrame .to_parquet ().

//df.to_parquet (& nbsp; & nbsp; "test.parq", & nbsp; & nbsp; động cơ = "pyarrow", & nbsp; & nbsp; nén = "snappy") //]]
   "test.parq",
   engine="pyarrow",
   compression="snappy"
)//]]>

JSON TO PARQUET: Mở rộng ra với các cụm dask trên Coiled

Xây dựng công việc tuyệt vời và kiểm tra quy trình làm việc của bạn tại địa phương! Bây giờ, hãy để xây dựng một quy trình công việc sẽ thu thập dữ liệu trong cả năm, xử lý nó và lưu nó vào lưu trữ đối tượng đám mây.

  1. Quay lên cụm của bạn

Chúng tôi sẽ bắt đầu bằng cách khởi chạy một cụm dask trên đám mây có thể chạy đường ống của bạn trên toàn bộ bộ dữ liệu. Để chạy mã trong phần này, bạn sẽ cần một tài khoản cuộn miễn phí bằng cách đăng nhập vào đám mây cuộn. Bạn chỉ cần cung cấp thông tin đăng nhập GitHub của mình để tạo tài khoản.

Sau đó, bạn sẽ cần tạo một môi trường phần mềm với các thư viện chính xác để các công nhân trong cụm của bạn có thể thực hiện các tính toán của chúng tôi. & NBSP;

// nhập cuộn

# Tạo phần mềm cuộn Môi trườngColeD.Create_Software_Environment (& nbsp; & nbsp; name = "github-parquet", & nbsp; & nbsp; , "fastparquet"],) //]]>>
coiled.create_software_environment(
   name="github-parquet",
   conda=["dask", "pyarrow", "s3fs", "ujson", "requests", "lz4", "fastparquet"],
)//]]>

Bạn cũng có thể tạo môi trường phần mềm cuộn bằng các tệp Docker Images, Môi trường.yml (Conda) hoặc Yêu cầu.txt (PIP). Để biết thêm thông tin, hãy kiểm tra các tài liệu cuộn.

Bây giờ, hãy để Lôi quay lên cụm cuộn của bạn, chỉ định tên cụm, môi trường phần mềm mà nó chạy và số lượng công nhân DASK.

//# quay một clustercluster cuộn = coiled.cluster (& nbsp; & nbsp; name = "github-parquet", & nbsp; & nbsp; phần mềm = " //]]>>
cluster = coiled.Cluster(
   name="github-parquet",
   software="coiled-examples/github-parquet",
   n_workers=10,
)//]]>

Cuối cùng, Point Dask để chạy trên các tính toán trên cụm cuộn của bạn.

//# Kết nối dask với ClusterFrom của bạn.
from dask.distributed import Client
client = Client(cluster)
client//]]>

  1. Chạy đường ống của bạn trên bộ dữ liệu lưu trữ GitHub

Khoảnh khắc tất cả chúng ta đã chờ đợi! Cụm của bạn đang hoạt động và chạy, điều đó có nghĩa là tất cả các bạn đã được thiết lập để chạy JSON thành PARQUET PIPELINE mà bạn đã xây dựng ở trên trên toàn bộ bộ dữ liệu. & NBSP;

Điều này đòi hỏi 2 thay đổi tinh tế đối với mã của bạn:

  1. Tải xuống tất cả các tệp lưu trữ github thay vì chỉ 1 tệp kiểm tra
  2. point df.to_parquet () để ghi vào thùng S3 của bạn thay vì tại địa phương

Lưu ý rằng mã bên dưới sử dụng danh sách tên tệp chứa tất cả các tệp cho năm 2015 và chức năng quy trình được đề cập ở trên. Tham khảo sổ ghi chép để biết định nghĩa của hai đối tượng này.

// %% Thời gian# Đọc trong JSON DataRecords = DB.Read_Text (FileNames) .Map (UJSON.LOADS)
# read in json data
records = db.read_text(filenames).map(ujson.loads)

# Lọc ra PushEventsPush = records.filter (Lambda Record: Record ["type"] == "PushEvent")
push = records.filter(lambda record: record["type"] == "PushEvent")

# xử lý thành định dạng bảng, mỗi hàng là một cam kết duy nhất = push.map (quá trình)
processed = push.map(process)

# Flatten và đúc đến dataFlameF = đã xử lý.flatten (). TO_DATAFRAME ()
df = processed.flatten().to_dataframe()

# Ghi cho parquetdf.to_parquet (& nbsp; & nbsp; 's3: //coiled-datasets/etl/test.parq', & nbsp;
df.to_parquet(
   's3://coiled-datasets/etl/test.parq',
   engine='pyarrow',
   compression='snappy'
)

Thời gian CPU: Người dùng 15.1 S, SYS: 1,74 S, Tổng cộng: 16.8 Thời gian Swall: 19 phút 17S //]]>>
Wall time: 19min 17s//]]>

Tuyệt vời, hoạt động. Nhưng hãy để xem nếu chúng ta có thể tăng tốc một chút ...

Hãy mở rộng quy mô của chúng tôi để tăng hiệu suất. Chúng tôi sẽ sử dụng lệnh cluster.scale () để tăng gấp đôi số lượng công nhân trong cụm của chúng tôi. Chúng tôi cũng sẽ bao gồm một cuộc gọi đến client.wait_for_workers () sẽ chặn hoạt động cho đến khi tất cả các công nhân trực tuyến. Bằng cách này, chúng tôi có thể chắc chắn rằng chúng tôi ném tất cả các cơ bắp chúng tôi có trong tính toán của chúng tôi.

//# Double N_WorkersCluster.Scale (20)
cluster.scale(20)

# Điều này chặn hoạt động cho đến khi số lượng công nhân được chỉ định đã tham gia clusterclient.wait_for_workers (20) //]]>>>
client.wait_for_workers(20)//]]>

Bây giờ, chúng ta hãy chạy lại đường ống ETL tương tự trên cụm chia tỷ lệ của chúng tôi.

// %% Thời gian# RE-RUN ETL Pipelinerecords = db.Read_Text (fileNames) .map (ujson.loads) push = records.filter (bản ghi Lambda: record ["type"] == "pushEvent") Map (Process) df = đã xử lý.flatten (). to_dataframe () df.to_parquet (& nbsp; & nbsp; 's3: //coiled-datasets/etl/test.parq', & nbsp; ; & nbsp; nén = 'linh hoạt'))
# re-run etl pipeline
records = db.read_text(filenames).map(ujson.loads)
push = records.filter(lambda record: record["type"] == "PushEvent")
processed = push.map(process)
df = processed.flatten().to_dataframe()
df.to_parquet(
   's3://coiled-datasets/etl/test.parq',
   engine='pyarrow',
   compression='snappy'
)

Thời gian CPU: Người dùng 11.4 S, SYS: 1.1 S, Tổng cộng: 12,5 Thời gian SWALL: 9min 53S //]]>>
Wall time: 9min 53s//]]>

Chúng tôi đã cắt giảm thời gian chạy làm đôi, công việc tuyệt vời!

Chuyển đổi JSON lớn thành Tóm tắt sàn gỗ

Trong sổ ghi chép này, chúng tôi đã chỉ ra cách chuyển đổi JSON thành sàn gỗ bằng cách chuyển đổi dữ liệu JSON thô thành một khung dữ liệu phẳng và lưu trữ nó theo định dạng tệp Parquet hiệu quả trên cửa hàng đối tượng đám mây. Chúng tôi đã thực hiện quy trình làm việc này trước trên một tệp thử nghiệm duy nhất cục bộ. Sau đó, chúng tôi đã mở rộng quy trình công việc tương tự để chạy trên đám mây bằng cách sử dụng các cụm DASK trên cuộn để xử lý toàn bộ bộ dữ liệu 75GB.

Takeaways chính:

  • Coile cho phép bạn mở rộng quy trình công việc ETL chung sang các bộ dữ liệu lớn hơn bộ nhớ.
  • Chỉ có tỷ lệ lên đám mây nếu và khi bạn cần. Điện toán đám mây đi kèm với tập hợp các thách thức và chi phí của riêng mình. Vì vậy, hãy chiến lược về việc quyết định xem và khi nào cần nhập cuộn và quay một cụm.
  • Mở rộng quy mô của bạn để tăng hiệu suất. Chúng tôi cắt giảm thời gian chạy của hàm ETL làm đôi bằng cách nhân rộng cụm từ 10 đến 20 công nhân.

Nếu bạn có bất kỳ câu hỏi hoặc đề xuất nào cho tài liệu trong tương lai, vui lòng gửi cho chúng tôi một dòng tại hoặc trong kênh Slack cộng đồng cuộn của chúng tôi. Chúng tôi muốn nghe từ bạn!

Thời gian đặt với Richard

Hãy thử cuộn miễn phí

Cảm ơn vì đã đọc. Nếu bạn quan tâm đến việc thử Coiled, nơi cung cấp các cụm DASK được lưu trữ, phần mềm do Docker-ít quản lý và triển khai một lần nhấp, bạn có thể làm như vậy miễn phí ngay hôm nay khi bạn nhấp vào bên dưới.

Tôi có thể chuyển đổi JSON thành sàn gỗ không?

Bạn có thể sử dụng Coiled, nền tảng DASK dựa trên đám mây, để dễ dàng chuyển đổi dữ liệu JSON lớn thành một khung dữ liệu bảng được lưu trữ dưới dạng sàn trong một cửa hàng đối tượng đám mây..

Parquet có hỗ trợ JSON lồng nhau không?

Các loại lồng nhau có thể được lưu trữ trong: Parquet, trong đó bạn có thể có nhiều cột phức tạp có chứa mảng và đối tượng.Các tệp JSON phân cấp, nơi bạn có thể đọc một tài liệu JSON phức tạp dưới dạng một cột duy nhất. Parquet, where you can have multiple complex columns that contain arrays and objects. Hierarchical JSON files, where you can read a complex JSON document as a single column.

Làm thế nào để bạn chuyển đổi JSON lồng nhau thành DataFrame trong pyspark?

Chuyển đổi sang DataFrame Thêm chuỗi JSON dưới dạng loại thu thập và chuyển nó dưới dạng đầu vào cho Spark.createdataset.Điều này chuyển đổi nó thành một khung dữ liệu.Trình đọc JSON tự động lấy lược đồ từ chuỗi JSON.Add the JSON string as a collection type and pass it as an input to spark. createDataset . This converts it to a DataFrame. The JSON reader infers the schema automatically from the JSON string.

Làm thế nào để bạn phân tích dữ liệu JSON lồng nhau trong Python?

Bắt đầu bằng cách nhập thư viện JSON.Chúng tôi sử dụng chức năng mở để đọc tệp JSON và sau đó là Json.Load () phương thức để phân tích chuỗi JSON vào từ điển Python có tên SuperHerosquad.Đó là nó!use the function open to read the JSON file and then the method json. load() to parse the JSON string into a Python dictionary called superHeroSquad. That's it!