Hướng dẫn how does parallel processing work in python? - xử lý song song hoạt động như thế nào trong python?

Xử lý song song là một chế độ hoạt động trong đó tác vụ được thực hiện đồng thời trong nhiều bộ xử lý trong cùng một máy tính. Nó có nghĩa là để giảm thời gian xử lý tổng thể. Trong hướng dẫn này, bạn sẽ hiểu quy trình để song song hóa bất kỳ logic điển hình nào bằng mô -đun đa xử lý Python.

1. Giới thiệu

Xử lý song song là một chế độ hoạt động trong đó tác vụ được thực hiện đồng thời trong nhiều bộ xử lý trong cùng một máy tính. Nó có nghĩa là để giảm thời gian xử lý tổng thể.

Tuy nhiên, thường có một chút chi phí khi giao tiếp giữa các quá trình thực sự có thể tăng thời gian tổng thể được thực hiện cho các nhiệm vụ nhỏ thay vì giảm nó.

Trong Python, mô -đun

import numpy as np
from time import time

# Prepare data
np.random.RandomState[100]
arr = np.random.randint[0, 10, size=[200000, 5]]
data = arr.tolist[]
data[:5]
8 được sử dụng để chạy các quy trình song song độc lập bằng cách sử dụng các quy trình con [thay vì các luồng].

Nó cho phép bạn tận dụng nhiều bộ xử lý trên máy [cả Windows và UNIX], có nghĩa là, các quy trình có thể được chạy ở các vị trí bộ nhớ hoàn toàn riêng biệt. Đến cuối hướng dẫn này, bạn sẽ biết:

  1. Làm thế nào để cấu trúc mã và hiểu cú pháp để cho phép xử lý song song bằng cách sử dụng
    import numpy as np
    from time import time
    
    # Prepare data
    np.random.RandomState[100]
    arr = np.random.randint[0, 10, size=[200000, 5]]
    data = arr.tolist[]
    data[:5]
    
    8?
  2. Làm thế nào để thực hiện xử lý song song đồng bộ và không đồng bộ?
  3. Làm thế nào để song song hóa một gấu trúc DataFrame?
  4. Giải quyết 3 lần sử dụng khác nhau với giao diện
    # Solution Without Paralleization
    
    def howmany_within_range[row, minimum, maximum]:
        """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
        count = 0
        for n in row:
            if minimum  [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    5 [được viết bên dưới] để kiểm tra xem có bao nhiêu số nằm trong phạm vi và trả về số lượng.

    Nhận khóa học Python hoàn thành miễn phí

    Đối mặt với tình huống tương tự như mọi người khác?

    Xây dựng sự nghiệp khoa học dữ liệu của bạn với trình độ được công nhận trên toàn cầu, được công nghiệp phê duyệt. Có được suy nghĩ, sự tự tin và các kỹ năng làm cho nhà khoa học dữ liệu trở nên có giá trị.

    Nhận khóa học Python hoàn thành miễn phí

    Xây dựng sự nghiệp khoa học dữ liệu của bạn với trình độ được công nhận trên toàn cầu, được công nghiệp phê duyệt. Có được suy nghĩ, sự tự tin và các kỹ năng làm cho nhà khoa học dữ liệu trở nên có giá trị.

    # Solution Without Paralleization
    
    def howmany_within_range[row, minimum, maximum]:
        """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
        count = 0
        for n in row:
            if minimum  [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    9 và
    # Parallelizing using Pool.map[]
    import multiprocessing as mp
    
    # Redefine, with only 1 mandatory argument.
    def howmany_within_range_rowonly[row, minimum=4, maximum=8]:
        count = 0
        for n in row:
            if minimum  [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    5 với
    # Parallelizing using Pool.apply[]
    
    import multiprocessing as mp
    
    # Step 1: Init multiprocessing.Pool[]
    pool = mp.Pool[mp.cpu_count[]]
    
    # Step 2: `pool.apply` the `howmany_within_range[]`
    results = [pool.apply[howmany_within_range, args=[row, 4, 8]] for row in data]
    
    # Step 3: Don't forget to close
    pool.close[]    
    
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    9 và
    # Parallelizing using Pool.map[]
    import multiprocessing as mp
    
    # Redefine, with only 1 mandatory argument.
    def howmany_within_range_rowonly[row, minimum=4, maximum=8]:
        count = 0
        for n in row:
            if minimum  [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    

    5.2. Song song bằng cách sử dụng pool.map []

    # Solution Without Paralleization
    
    def howmany_within_range[row, minimum, maximum]:
        """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
        count = 0
        for n in row:
            if minimum  [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    7 bằng cách đặt mặc định thành các tham số
    # Parallelizing with Pool.starmap[]
    import multiprocessing as mp
    
    pool = mp.Pool[mp.cpu_count[]]
    
    results = pool.starmap[howmany_within_range, [[row, 4, 8] for row in data]]
    
    pool.close[]
    
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    8 và
    # Parallelizing with Pool.starmap[]
    import multiprocessing as mp
    
    pool = mp.Pool[mp.cpu_count[]]
    
    results = pool.starmap[howmany_within_range, [[row, 4, 8] for row in data]]
    
    pool.close[]
    
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    9 để tạo hàm
    # Parallel processing with Pool.apply_async[]
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    # Step 1: Redefine, to accept `i`, the iteration number
    def howmany_within_range2[i, row, minimum, maximum]:
        """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
        count = 0
        for n in row:
            if minimum  [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    9 ngoại trừ việc bạn cần cung cấp chức năng gọi lại cho biết các kết quả được tính toán nên được lưu trữ như thế nào.

    Tuy nhiên, một cảnh báo với

    # Parallel processing with Pool.apply_async[] without callback function
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    # call apply_async[] without callback
    result_objects = [pool.apply_async[howmany_within_range2, args=[i, row, 4, 8]] for i, row in enumerate[data]]
    
    # result_objects is a list of pool.ApplyResult objects
    results = [r.get[][1] for r in result_objects]
    
    pool.close[]
    pool.join[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    0 là, thứ tự các số trong kết quả bị xáo trộn cho thấy các quy trình không hoàn thành theo thứ tự nó được bắt đầu.

    Một cách giải quyết cho điều này là, chúng tôi xác định lại một

    # Parallel processing with Pool.apply_async[] without callback function
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    # call apply_async[] without callback
    result_objects = [pool.apply_async[howmany_within_range2, args=[i, row, 4, 8]] for i, row in enumerate[data]]
    
    # result_objects is a list of pool.ApplyResult objects
    results = [r.get[][1] for r in result_objects]
    
    pool.close[]
    pool.join[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    6 mới để chấp nhận và trả về số lặp [
    # Parallel processing with Pool.apply_async[] without callback function
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    # call apply_async[] without callback
    result_objects = [pool.apply_async[howmany_within_range2, args=[i, row, 4, 8]] for i, row in enumerate[data]]
    
    # result_objects is a list of pool.ApplyResult objects
    results = [r.get[][1] for r in result_objects]
    
    pool.close[]
    pool.join[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    7] và sau đó sắp xếp các kết quả cuối cùng.

    # Parallel processing with Pool.apply_async[]
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    # Step 1: Redefine, to accept `i`, the iteration number
    def howmany_within_range2[i, row, minimum, maximum]:
        """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
        count = 0
        for n in row:
            if minimum  [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    0 mà không cung cấp chức năng
    # Parallel processing with Pool.apply_async[] without callback function
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    # call apply_async[] without callback
    result_objects = [pool.apply_async[howmany_within_range2, args=[i, row, 4, 8]] for i, row in enumerate[data]]
    
    # result_objects is a list of pool.ApplyResult objects
    results = [r.get[][1] for r in result_objects]
    
    pool.close[]
    pool.join[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    9.

    Chỉ có vậy, nếu bạn không cung cấp một cuộc gọi lại, thì bạn sẽ nhận được một danh sách các đối tượng

    # Parallelizing with Pool.starmap_async[]
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[]
    
    # With map, use `howmany_within_range_rowonly` instead
    # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[]
    
    pool.close[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    0 chứa các giá trị đầu ra được tính toán từ mỗi quy trình.

    Từ đó, bạn cần sử dụng phương thức

    # Parallelizing with Pool.starmap_async[]
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[]
    
    # With map, use `howmany_within_range_rowonly` instead
    # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[]
    
    pool.close[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    1 để lấy kết quả cuối cùng mong muốn.

    # Parallel processing with Pool.apply_async[] without callback function
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    # call apply_async[] without callback
    result_objects = [pool.apply_async[howmany_within_range2, args=[i, row, 4, 8]] for i, row in enumerate[data]]
    
    # result_objects is a list of pool.ApplyResult objects
    results = [r.get[][1] for r in result_objects]
    
    pool.close[]
    pool.join[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    

    6.2 song song với pool.starmap_async []

    Bạn đã thấy cách

    # Parallel processing with Pool.apply_async[] without callback function
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    # call apply_async[] without callback
    result_objects = [pool.apply_async[howmany_within_range2, args=[i, row, 4, 8]] for i, row in enumerate[data]]
    
    # result_objects is a list of pool.ApplyResult objects
    results = [r.get[][1] for r in result_objects]
    
    pool.close[]
    pool.join[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    0 hoạt động.

    Bạn có thể tưởng tượng và viết lên một phiên bản tương đương cho

    # Parallelizing with Pool.starmap_async[]
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[]
    
    # With map, use `howmany_within_range_rowonly` instead
    # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[]
    
    pool.close[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    3 và
    # Parallelizing with Pool.starmap_async[]
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[]
    
    # With map, use `howmany_within_range_rowonly` instead
    # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[]
    
    pool.close[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    4 không?

    Việc thực hiện ở dưới dù sao.

    # Parallelizing with Pool.starmap_async[]
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[]
    
    # With map, use `howmany_within_range_rowonly` instead
    # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[]
    
    pool.close[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    

    7. Làm thế nào để song song hóa một bản dữ liệu gấu trúc?

    Cho đến nay, bạn đã thấy cách song song hóa một chức năng bằng cách làm cho nó hoạt động trong danh sách.

    Nhưng khi làm việc trong phân tích dữ liệu hoặc các dự án học máy, bạn có thể muốn song song hóa các khung dữ liệu gấu trúc, là các đối tượng được sử dụng phổ biến nhất [bên cạnh các mảng Numpy] để lưu trữ dữ liệu bảng.

    Khi nói đến song song với

    # Parallelizing with Pool.starmap_async[]
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[]
    
    # With map, use `howmany_within_range_rowonly` instead
    # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[]
    
    pool.close[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    5, bạn có thể thực hiện các chức năng song song để lấy làm tham số đầu vào:

    • Một hàng của DataFrame
    • Một cột của DataFrame
    • toàn bộ khung dữ liệu

    2 đầu tiên có thể được thực hiện bằng cách sử dụng mô -đun

    import numpy as np
    from time import time
    
    # Prepare data
    np.random.RandomState[100]
    arr = np.random.randint[0, 10, size=[200000, 5]]
    data = arr.tolist[]
    data[:5]
    
    8.

    Nhưng đối với cái cuối cùng, đó là song song trên toàn bộ khung dữ liệu, chúng tôi sẽ sử dụng gói

    # Parallelizing with Pool.starmap_async[]
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[]
    
    # With map, use `howmany_within_range_rowonly` instead
    # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[]
    
    pool.close[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    7 sử dụng
    # Parallelizing with Pool.starmap_async[]
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[]
    
    # With map, use `howmany_within_range_rowonly` instead
    # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[]
    
    pool.close[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    8 để tuần tự hóa trong nội bộ.

    Đầu tiên, hãy tạo một DataFrame mẫu và xem cách thực hiện các giao dịch thông minh và khôn ngoan về cột.

    Một cái gì đó như sử dụng

    # Parallelizing with Pool.starmap_async[]
    
    import multiprocessing as mp
    pool = mp.Pool[mp.cpu_count[]]
    
    results = []
    
    results = pool.starmap_async[howmany_within_range2, [[i, row, 4, 8] for i, row in enumerate[data]]].get[]
    
    # With map, use `howmany_within_range_rowonly` instead
    # results = pool.map_async[howmany_within_range_rowonly, [row for row in data]].get[]
    
    pool.close[]
    print[results[:10]]
    #> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
    
    9 trên hàm do người dùng xác định nhưng song song.

    import numpy as np
    import pandas as pd
    import multiprocessing as mp
    
    df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]]
    print[df.head[]]
    #>    0  1
    #> 0  8  5
    #> 1  5  3
    #> 2  3  4
    #> 3  4  4
    #> 4  7  9
    

    Chúng tôi có một khung dữ liệu. Hãy để áp dụng chức năng

    import numpy as np
    import pandas as pd
    import multiprocessing as mp
    
    df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]]
    print[df.head[]]
    #>    0  1
    #> 0  8  5
    #> 1  5  3
    #> 2  3  4
    #> 3  4  4
    #> 4  7  9
    
    0 trên mỗi hàng, nhưng chạy 4 quy trình cùng một lúc.

    Để làm điều này, chúng tôi khai thác

    import numpy as np
    import pandas as pd
    import multiprocessing as mp
    
    df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]]
    print[df.head[]]
    #>    0  1
    #> 0  8  5
    #> 1  5  3
    #> 2  3  4
    #> 3  4  4
    #> 4  7  9
    
    1.

    Bằng cách đặt

    import numpy as np
    import pandas as pd
    import multiprocessing as mp
    
    df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]]
    print[df.head[]]
    #>    0  1
    #> 0  8  5
    #> 1  5  3
    #> 2  3  4
    #> 3  4  4
    #> 4  7  9
    
    2, bạn đang chuyển từng hàng của DataFrame dưới dạng một bộ đơn giản cho hàm
    import numpy as np
    import pandas as pd
    import multiprocessing as mp
    
    df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]]
    print[df.head[]]
    #>    0  1
    #> 0  8  5
    #> 1  5  3
    #> 2  3  4
    #> 3  4  4
    #> 4  7  9
    
    0.

    import numpy as np
    from time import time
    
    # Prepare data
    np.random.RandomState[100]
    arr = np.random.randint[0, 10, size=[200000, 5]]
    data = arr.tolist[]
    data[:5]
    
    0

    Đó là một ví dụ về song song hàng ngày.

    Hãy để Lừa cũng làm một song song hóa cột.

    Đối với điều này, tôi sử dụng

    import numpy as np
    import pandas as pd
    import multiprocessing as mp
    
    df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]]
    print[df.head[]]
    #>    0  1
    #> 0  8  5
    #> 1  5  3
    #> 2  3  4
    #> 3  4  4
    #> 4  7  9
    
    4 để chuyển toàn bộ cột làm chuỗi cho hàm
    import numpy as np
    import pandas as pd
    import multiprocessing as mp
    
    df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]]
    print[df.head[]]
    #>    0  1
    #> 0  8  5
    #> 1  5  3
    #> 2  3  4
    #> 3  4  4
    #> 4  7  9
    
    5.

    import numpy as np
    from time import time
    
    # Prepare data
    np.random.RandomState[100]
    arr = np.random.randint[0, 10, size=[200000, 5]]
    data = arr.tolist[]
    data[:5]
    
    1

    Bây giờ đến phần thứ ba - song song hóa một hàm chấp nhận dữ liệu gấu trúc, mảng numpy, v.v ... Pathos theo kiểu

    import numpy as np
    from time import time
    
    # Prepare data
    np.random.RandomState[100]
    arr = np.random.randint[0, 10, size=[200000, 5]]
    data = arr.tolist[]
    data[:5]
    
    8 của: POOL> MAP> Đóng> Tham gia> Xóa.

    Kiểm tra các tài liệu Pathos để biết thêm thông tin.

    import numpy as np
    from time import time
    
    # Prepare data
    np.random.RandomState[100]
    arr = np.random.randint[0, 10, size=[200000, 5]]
    data = arr.tolist[]
    data[:5]
    
    2

    Cảm ơn NotSoprocoder vì sự đóng góp này dựa trên Pathos.

    Nếu bạn đã quen thuộc với các khung dữ liệu gấu trúc nhưng muốn thực hành và làm chủ nó, hãy xem các bài tập gấu trúc này.

    8. Bài tập

    Bài 1: Sử dụng

    # Solution Without Paralleization
    
    def howmany_within_range[row, minimum, maximum]:
        """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
        count = 0
        for n in row:
            if minimum     0  1
    #> 0  8  5
    #> 1  5  3
    #> 2  3  4
    #> 3  4  4
    #> 4  7  9
    
    8 và
    import numpy as np
    import pandas as pd
    import multiprocessing as mp
    
    df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]]
    print[df.head[]]
    #>    0  1
    #> 0  8  5
    #> 1  5  3
    #> 2  3  4
    #> 3  4  4
    #> 4  7  9
    
    9.
    Use
    # Solution Without Paralleization
    
    def howmany_within_range[row, minimum, maximum]:
        """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
        count = 0
        for n in row:
            if minimum     0  1
    #> 0  8  5
    #> 1  5  3
    #> 2  3  4
    #> 3  4  4
    #> 4  7  9
    
    8 and
    import numpy as np
    import pandas as pd
    import multiprocessing as mp
    
    df = pd.DataFrame[np.random.randint[3, 10, size=[5, 2]]]
    print[df.head[]]
    #>    0  1
    #> 0  8  5
    #> 1  5  3
    #> 2  3  4
    #> 3  4  4
    #> 4  7  9
    
    9.

    import numpy as np
    from time import time
    
    # Prepare data
    np.random.RandomState[100]
    arr = np.random.randint[0, 10, size=[200000, 5]]
    data = arr.tolist[]
    data[:5]
    
    3

    Hiển thị giải pháp

    import numpy as np
    from time import time
    
    # Prepare data
    np.random.RandomState[100]
    arr = np.random.randint[0, 10, size=[200000, 5]]
    data = arr.tolist[]
    data[:5]
    
    4

    Bài 2: Sử dụng

    # Solution Without Paralleization
    
    def howmany_within_range[row, minimum, maximum]:
        """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
        count = 0
        for n in row:
            if minimum 

Bài Viết Liên Quan

Chủ Đề