Hướng dẫn transfer statements in python - chuyển câu lệnh trong python

Tôi không phải là người mới đối với Python nhưng vẫn đang tìm kiếm cách chia sẻ dữ liệu định dạng chuỗi phù hợp nhất giữa các quy trình.

Nội dung chính

  • Chuyển tin nhắn cho Procduces¶
  • Tín hiệu giữa các quá trình
  • Kiểm soát quyền truy cập vào các tài nguyên
  • Đồng bộ hóa các hoạt động
  • Kiểm soát truy cập đồng thời vào tài nguyên Or
  • Xử lý bể bơi
  • Làm cách nào để chuyển dữ liệu từ chương trình Python này sang chương trình khác?
  • Làm thế nào để bạn trao đổi dữ liệu giữa các quy trình?
  • Làm thế nào để bạn thực hiện đa xử lý trong Python?
  • Trình quản lý đa xử lý Python là gì?

Hiện tại tôi có 4 quy trình (1 cha mẹ + 3 con trong đa xử lý), cha mẹ đang tìm nạp dữ liệu từ máy chủ thăm dò ý kiến ​​dài và sau khi một vài kiểm tra gửi dữ liệu đến máy chủ MySQL.

Sau đó, trẻ em (1 cho mỗi loại tác vụ) đang xử lý dữ liệu theo cách tôi cần.

Mọi thứ đều hoạt động tốt, dữ liệu của tôi được lưu trữ an toàn và luôn luôn truy cập, đặc biệt nếu tôi cần gỡ lỗi hoặc thực hiện các tính năng mới, nhưng sau tất cả, nó hoạt động chậm hơn với sự phát triển của dự án (đối với ứng dụng web - không tốt). Như bạn có thể giả định, loại dữ liệu được lưu trữ trong DB là danh sách hoặc các đối tượng của nó. Tôi biết rằng có một số vấn đề trong Python với việc chuyển dữ liệu giữa các quy trình ('sẽ tốt hơn để nói, hạn chế phần mềm) ...

Tôi đã suy nghĩ về việc lưu trữ dữ liệu tạm thời trong các tệp JSON hoặc TXT đơn giản, nhưng đồng thời sẽ không cho phép tôi làm điều đó. Ngoài ra, tôi có thể thử sử dụng ổ cắm, nhưng có đáng để khởi động máy chủ gọi lại cho mục đích như vậy không?

Điều không đồng bộ cũng không hiệu quả với tôi. Vậy các lựa chọn của tôi trong trường hợp này là gì? Tôi không cần phải mất bất kỳ phần dữ liệu nào, giống như tôi cần giữ cho nó hoạt động nhanh. Bất kỳ biện pháp nào khác đều được chào đón :)

P.S Hầu hết các chủ đề liên quan đều lỗi thời hoặc không trả lời câu hỏi của tôi, bởi vì tôi chắc chắn rằng cách tôi làm việc với dữ liệu không tốt nhất cho đến nay.

Như với các chủ đề, một mẫu sử dụng phổ biến cho nhiều quy trình là chia một công việc giữa một số công nhân để chạy song song. Sử dụng hiệu quả nhiều quy trình thường đòi hỏi một số giao tiếp giữa chúng, để công việc có thể được chia và kết quả có thể được tổng hợp.

Chuyển tin nhắn cho Procduces¶

Tín hiệu giữa các quá trìnhmultiprocessing is to use a Queue to pass messages back and forth. Any pickle-able object can pass through a Queue.

import multiprocessing

class MyFancyClass(object):
    
    def __init__(self, name):
        self.name = name
    
    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print 'Doing something fancy in %s for %s!' % (proc_name, self.name)


def worker(q):
    obj = q.get()
    obj.do_something()


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    
    queue.put(MyFancyClass('Fancy Dan'))
    
    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()
    

Kiểm soát quyền truy cập vào các tài nguyên

$ python multiprocessing_queue.py

Doing something fancy in Process-1 for Fancy Dan!

Đồng bộ hóa các hoạt độngJoinableQueue and passing results back to the parent process. The poison pill technique is used to stop the workers. After setting up the real tasks, the main program adds one “stop” value per worker to the job queue. When a worker encounters the special value, it breaks out of its processing loop. The main process uses the task queue’s join() method to wait for all of the tasks to finish before processin the results.

import multiprocessing
import time

class Consumer(multiprocessing.Process):
    
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % proc_name
                self.task_queue.task_done()
                break
            print '%s: %s' % (proc_name, next_task)
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    
    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print 'Creating %d consumers' % num_consumers
    consumers = [ Consumer(tasks, results)
                  for i in xrange(num_consumers) ]
    for w in consumers:
        w.start()
    
    # Enqueue jobs
    num_jobs = 10
    for i in xrange(num_jobs):
        tasks.put(Task(i, i))
    
    # Add a poison pill for each consumer
    for i in xrange(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()
    
    # Start printing results
    while num_jobs:
        result = results.get()
        print 'Result:', result
        num_jobs -= 1

Kiểm soát truy cập đồng thời vào tài nguyên Or

$ python -u multiprocessing_producer_consumer.py

Creating 16 consumers
Consumer-1: 0 * 0
Consumer-2: 1 * 1
Consumer-3: 2 * 2
Consumer-4: 3 * 3
Consumer-5: 4 * 4
Consumer-6: 5 * 5
Consumer-7: 6 * 6
Consumer-8: 7 * 7
Consumer-9: 8 * 8
Consumer-10: 9 * 9
Consumer-11: Exiting
Consumer-12: Exiting
Consumer-13: Exiting
Consumer-14: Exiting
Consumer-15: Exiting
Consumer-16: Exiting
Consumer-1: Exiting
Consumer-4: Exiting
Consumer-5: Exiting
Consumer-6: Exiting
Consumer-2: Exiting
Consumer-3: Exiting
Consumer-9: Exiting
Consumer-7: Exiting
Consumer-8: Exiting
Consumer-10: Exiting
Result: 0 * 0 = 0
Result: 3 * 3 = 9
Result: 8 * 8 = 64
Result: 5 * 5 = 25
Result: 4 * 4 = 16
Result: 6 * 6 = 36
Result: 7 * 7 = 49
Result: 1 * 1 = 1
Result: 2 * 2 = 4
Result: 9 * 9 = 81

Tín hiệu giữa các quá trình

Kiểm soát quyền truy cập vào các tài nguyênEvent class provides a simple way to communicate state information between processes. An event can be toggled between set and unset states. Users of the event object can wait for it to change from unset to set, using an optional timeout value.

import multiprocessing
import time

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print 'wait_for_event: starting'
    e.wait()
    print 'wait_for_event: e.is_set()->', e.is_set()

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print 'wait_for_event_timeout: starting'
    e.wait(t)
    print 'wait_for_event_timeout: e.is_set()->', e.is_set()


if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name='block', 
                                 target=wait_for_event,
                                 args=(e,))
    w1.start()

    w2 = multiprocessing.Process(name='non-block', 
                                 target=wait_for_event_timeout, 
                                 args=(e, 2))
    w2.start()

    print 'main: waiting before calling Event.set()'
    time.sleep(3)
    e.set()
    print 'main: event is set'

Đồng bộ hóa các hoạt độngwait() times out it returns without an error. The caller is responsible for checking the state of the event using is_set().

$ python -u multiprocessing_event.py

main: waiting before calling Event.set()
wait_for_event: starting
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set()-> False
main: event is set
wait_for_event: e.is_set()-> True

Kiểm soát quyền truy cập vào các tài nguyên

Trong các tình huống khi một tài nguyên cần được chia sẻ giữa nhiều quy trình, khóa có thể được sử dụng để tránh các truy cập mâu thuẫn.Lock can be used to avoid conflicting accesses.Lock can be used to avoid conflicting accesses.

import multiprocessing
import sys

def worker_with(lock, stream):
    with lock:
        stream.write('Lock acquired via with\n')
        
def worker_no_with(lock, stream):
    lock.acquire()
    try:
        stream.write('Lock acquired directly\n')
    finally:
        lock.release()

lock = multiprocessing.Lock()
w = multiprocessing.Process(target=worker_with, args=(lock, sys.stdout))
nw = multiprocessing.Process(target=worker_no_with, args=(lock, sys.stdout))

w.start()
nw.start()

w.join()
nw.join()

Trong ví dụ này, các tin nhắn được in vào bảng điều khiển có thể bị lộn xộn với nhau nếu hai quy trình không đồng bộ hóa việc truy cập luồng đầu ra của chúng bằng khóa.

$ python multiprocessing_lock.py

Lock acquired via with
Lock acquired directly

Đồng bộ hóa các hoạt động

Các đối tượng điều kiện có thể được sử dụng để đồng bộ hóa các phần của quy trình công việc để một số chạy song song nhưng các đối tượng khác chạy tuần tự, ngay cả khi chúng ở trong các quy trình riêng biệt. objects can be used to synchronize parts of a workflow so that some run in parallel but others run sequentially, even if they are in separate processes. objects can be used to synchronize parts of a workflow so that some run in parallel but others run sequentially, even if they are in separate processes.

import multiprocessing
import time

def stage_1(cond):
    """perform first stage of work, then notify stage_2 to continue"""
    name = multiprocessing.current_process().name
    print 'Starting', name
    with cond:
        print '%s done and ready for stage 2' % name
        cond.notify_all()

def stage_2(cond):
    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print 'Starting', name
    with cond:
        cond.wait()
        print '%s running' % name

if __name__ == '__main__':
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name='s1', target=stage_1, args=(condition,))
    s2_clients = [
        multiprocessing.Process(name='stage_2[%d]' % i, target=stage_2, args=(condition,))
        for i in range(1, 3)
        ]

    for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()

    s1.join()
    for c in s2_clients:
        c.join()

Trong ví dụ này, hai quá trình chạy giai đoạn thứ hai của một công việc song song, nhưng chỉ sau khi giai đoạn đầu tiên được thực hiện.

$ python multiprocessing_condition.py

Starting s1
s1 done and ready for stage 2
Starting stage_2[1]
stage_2[1] running
Starting stage_2[2]
stage_2[2] running

Kiểm soát truy cập đồng thời vào tài nguyên Or

Đôi khi rất hữu ích khi cho phép nhiều hơn một công nhân truy cập vào một tài nguyên tại một thời điểm, trong khi vẫn giới hạn tổng số. Ví dụ: nhóm kết nối có thể hỗ trợ số lượng kết nối đồng thời cố định hoặc ứng dụng mạng có thể hỗ trợ số lượng tải xuống đồng thời cố định. Một semaphore là một cách để quản lý các kết nối đó.Semaphore is one way to manage those connections.Semaphore is one way to manage those connections.

$ python multiprocessing_queue.py

Doing something fancy in Process-1 for Fancy Dan!
0

Trong ví dụ này, lớp ActivePool chỉ đơn giản là một cách thuận tiện để theo dõi các quy trình nào đang chạy tại một thời điểm nhất định. Một nhóm tài nguyên thực sự có thể sẽ phân bổ một kết nối hoặc một số giá trị khác cho quy trình mới hoạt động và đòi lại giá trị khi nhiệm vụ được thực hiện. Ở đây, nhóm chỉ được sử dụng để giữ tên của các quy trình hoạt động để cho thấy rằng chỉ có ba người đang chạy đồng thời.ActivePool class simply serves as a convenient way to track which processes are running at a given moment. A real resource pool would probably allocate a connection or some other value to the newly active process, and reclaim the value when the task is done. Here, the pool is just used to hold the names of the active processes to show that only three are running concurrently.ActivePool class simply serves as a convenient way to track which processes are running at a given moment. A real resource pool would probably allocate a connection or some other value to the newly active process, and reclaim the value when the task is done. Here, the pool is just used to hold the names of the active processes to show that only three are running concurrently.

$ python multiprocessing_queue.py

Doing something fancy in Process-1 for Fancy Dan!
1

Xử lý bể bơi

Lớp học nhóm có thể được sử dụng để quản lý một số lượng công nhân cố định cho các trường hợp đơn giản trong đó công việc cần thực hiện có thể được phá vỡ và phân phối giữa các công nhân một cách độc lập. Các giá trị trả về từ các công việc được thu thập và trả lại làm danh sách. Các đối số nhóm bao gồm số lượng quy trình và một hàm để chạy khi bắt đầu quy trình nhiệm vụ (được gọi một lần cho mỗi đứa trẻ).Pool class can be used to manage a fixed number of workers for simple cases where the work to be done can be broken up and distributed between workers independently. The return values from the jobs are collected and returned as a list. The pool arguments include the number of processes and a function to run when starting the task process (invoked once per child).Pool class can be used to manage a fixed number of workers for simple cases where the work to be done can be broken up and distributed between workers independently. The return values from the jobs are collected and returned as a list. The pool arguments include the number of processes and a function to run when starting the task process (invoked once per child).

$ python multiprocessing_queue.py

Doing something fancy in Process-1 for Fancy Dan!
2

Kết quả của phương thức Bản đồ () có chức năng tương đương với bản đồ tích hợp (), ngoại trừ các tác vụ riêng lẻ chạy song song. Vì nhóm đang xử lý các đầu vào của nó song song, nên đóng () và tham gia () có thể được sử dụng để đồng bộ hóa quy trình chính với các quy trình nhiệm vụ để đảm bảo dọn dẹp thích hợp.map() method is functionally equivalent to the built-in map(), except that individual tasks run in parallel. Since the pool is processing its inputs in parallel, close() and join() can be used to synchronize the main process with the task processes to ensure proper cleanup.map() method is functionally equivalent to the built-in map(), except that individual tasks run in parallel. Since the pool is processing its inputs in parallel, close() and join() can be used to synchronize the main process with the task processes to ensure proper cleanup.

$ python multiprocessing_queue.py

Doing something fancy in Process-1 for Fancy Dan!
3

Theo mặc định, nhóm tạo ra một số lượng cố định các quy trình của công nhân và chuyển các công việc cho họ cho đến khi không còn việc làm nữa. Đặt tham số MaxTasksperChild cho nhóm khởi động lại quy trình công nhân sau khi hoàn thành một vài nhiệm vụ. Điều này có thể được sử dụng để tránh có những người lao động chạy dài tiêu thụ nhiều tài nguyên hệ thống hơn bao giờ hết.Pool creates a fixed number of worker processes and passes jobs to them until there are no more jobs. Setting the maxtasksperchild parameter tells the pool to restart a worker process after it has finished a few tasks. This can be used to avoid having long-running workers consume ever more system resources.Pool creates a fixed number of worker processes and passes jobs to them until there are no more jobs. Setting the maxtasksperchild parameter tells the pool to restart a worker process after it has finished a few tasks. This can be used to avoid having long-running workers consume ever more system resources.

$ python multiprocessing_queue.py

Doing something fancy in Process-1 for Fancy Dan!
4

Bể bơi khởi động lại các công nhân khi họ đã hoàn thành các nhiệm vụ được phân bổ của họ, ngay cả khi không còn công việc nữa. Trong đầu ra này, tám công nhân được tạo ra, mặc dù chỉ có 10 nhiệm vụ và mỗi công nhân có thể hoàn thành hai trong số họ cùng một lúc.

$ python multiprocessing_queue.py

Doing something fancy in Process-1 for Fancy Dan!
5

Làm cách nào để chuyển dữ liệu từ chương trình Python này sang chương trình khác?

Các bước để chạy một kịch bản Python từ một kịch bản khác...

Bước 1: Đặt các tập lệnh Python vào cùng một thư mục. Để bắt đầu, hãy đặt các tập lệnh Python của bạn vào cùng một thư mục. ....

Bước 2: Thêm cú pháp. Tiếp theo, thêm cú pháp vào từng tập lệnh của bạn. ....

Bước 3: Chạy một kịch bản Python từ một bản khác ..

Làm thế nào để bạn trao đổi dữ liệu giữa các quy trình?

Để chia sẻ dữ liệu lớn: Bộ nhớ chia sẻ được đặt tên Cách nhanh nhất để xử lý việc này là tạo đối tượng ánh xạ tệp, ánh xạ đối tượng tệp vào không gian bộ nhớ, thông báo cho quy trình cha mẹ của tay cầm ánh xạ tệp và kích thước dữ liệu, sau đó đổ dữ liệu vào Bộ đệm được ánh xạ cho quá trình cha mẹ để đọc.

Làm thế nào để bạn thực hiện đa xử lý trong Python?

Trong ví dụ này, lúc đầu, chúng tôi nhập lớp quy trình sau đó bắt đầu đối tượng quy trình với hàm Display ().Chúng ta cũng có thể chuyển các đối số cho chức năng bằng cách sử dụng từ khóa Args.first we import the Process class then initiate Process object with the display() function.Then process is started with start() method and then complete the process with the join() method. We can also pass arguments to the function using args keyword.first we import the Process class then initiate Process object with the display() function. Then process is started with start() method and then complete the process with the join() method. We can also pass arguments to the function using args keyword.

Trình quản lý đa xử lý Python là gì?

Trình quản lý đa xử lý cung cấp một cách tạo các đối tượng Python tập trung có thể được chia sẻ an toàn giữa các quy trình.Các nhà quản lý cung cấp một cách để tạo dữ liệu có thể được chia sẻ giữa các quy trình khác nhau, bao gồm chia sẻ qua mạng giữa các quy trình chạy trên các máy khác nhau.provides a way of creating centralized Python objects that can be shared safely among processes. Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines.provides a way of creating centralized Python objects that can be shared safely among processes. Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines.