Bộ sưu tập đồng thời Python

Để triển khai đồng thời của riêng bạn bằng cách sử dụng trình tạo, trước tiên bạn cần có hiểu biết cơ bản về các hàm của trình tạo và câu lệnh yield. Cụ thể, hành vi cơ bản của yield là nó khiến trình tạo tạm dừng thực thi. Bằng cách tạm dừng thực thi, có thể viết một bộ lập lịch xử lý các trình tạo như một loại "nhiệm vụ" và thay thế việc thực hiện chúng bằng cách sử dụng một loại chuyển đổi tác vụ hợp tác

Để minh họa ý tưởng này, hãy xem xét hai hàm tạo sau đây bằng cách sử dụng một yield đơn giản

# Two simple generator functions def countdown(n): while n > 0: print('T-minus', n) yield n -= 1 print('Blastoff!') def countup(n): x = 0 while x < n: print('Counting up', x) yield x += 1

Các chức năng này có thể trông hơi buồn cười khi sử dụng chính yield. Tuy nhiên, hãy xem đoạn mã sau thực hiện một lịch trình tác vụ đơn giản

from collections import deque class TaskScheduler(object): def __init__(self): self._task_queue = deque() def new_task(self, task): '''         Admit a newly started task to the scheduler         ''' self._task_queue.append(task) def run(self): '''         Run until there are no more tasks         ''' while self._task_queue: task = self._task_queue.popleft() try: # Run until the next yield statement next(task) self._task_queue.append(task) except StopIteration: # Generator is no longer executing pass # Example use sched = TaskScheduler() sched.new_task(countdown(10)) sched.new_task(countdown(5)) sched.new_task(countup(15)) sched.run()

Trong đoạn mã này, lớp

from collections import deque class TaskScheduler(object): def __init__(self): self._task_queue = deque() def new_task(self, task): '''         Admit a newly started task to the scheduler         ''' self._task_queue.append(task) def run(self): '''         Run until there are no more tasks         ''' while self._task_queue: task = self._task_queue.popleft() try: # Run until the next yield statement next(task) self._task_queue.append(task) except StopIteration: # Generator is no longer executing pass # Example use sched = TaskScheduler() sched.new_task(countdown(10)) sched.new_task(countdown(5)) sched.new_task(countup(15)) sched.run()
0 chạy một tập hợp các trình tạo theo kiểu vòng tròn—mỗi trình tạo chạy cho đến khi chúng gặp câu lệnh yield. Đối với mẫu, đầu ra sẽ như sau

T-minus 10
T-minus 5
Counting up 0
T-minus 9
T-minus 4
Counting up 1
T-minus 8
T-minus 3
Counting up 2
T-minus 7
T-minus 2
...

Tại thời điểm này, về cơ bản, bạn đã triển khai phần lõi nhỏ của một “hệ điều hành” nếu bạn muốn. Các hàm tạo là các tác vụ và câu lệnh yield là cách các tác vụ báo hiệu rằng chúng muốn tạm dừng. Bộ lập lịch chỉ cần lặp lại các nhiệm vụ cho đến khi không còn nhiệm vụ nào đang thực thi

Trong thực tế, có lẽ bạn sẽ không sử dụng trình tạo để triển khai đồng thời cho một thứ đơn giản như được hiển thị. Thay vào đó, bạn có thể sử dụng các trình tạo để thay thế việc sử dụng các luồng khi triển khai các tác nhân (xem Xác định tác vụ của tác nhân) hoặc máy chủ mạng

Đoạn mã sau minh họa việc sử dụng trình tạo để triển khai phiên bản diễn viên không có luồng

from collections import deque class ActorScheduler(object): def __init__(self): self._actors = { } # Mapping of names to actors self._msg_queue = deque() # Message queue def new_actor(self, name, actor): '''         Admit a newly started actor to the scheduler and give it a name         ''' self._msg_queue.append((actor,None)) self._actors[name] = actor def send(self, name, msg): '''         Send a message to a named actor         ''' actor = self._actors.get(name) if actor: self._msg_queue.append((actor,msg)) def run(self): '''         Run as long as there are pending messages.         ''' while self._msg_queue: actor, msg = self._msg_queue.popleft() try: actor.send(msg) except StopIteration: pass # Example use if __name__ == '__main__': def printer(): while True: msg = yield print('Got:', msg) def counter(sched): while True: # Receive the current count n = yield if n == 0: break # Send to the printer task sched.send('printer', n) # Send the next count to the counter task (recursive) sched.send('counter', n-1) sched = ActorScheduler() # Create the initial actors sched.new_actor('printer', printer()) sched.new_actor('counter', counter(sched)) # Send an initial message to the counter to initiate sched.send('counter', 10000) sched.run()

Việc thực thi đoạn mã này có thể cần nghiên cứu một chút, nhưng điều quan trọng là hàng đợi các thư đang chờ xử lý. Về cơ bản, bộ lập lịch chạy miễn là có tin nhắn cần gửi. Một tính năng đáng chú ý là trình tạo

from collections import deque class TaskScheduler(object): def __init__(self): self._task_queue = deque() def new_task(self, task): '''         Admit a newly started task to the scheduler         ''' self._task_queue.append(task) def run(self): '''         Run until there are no more tasks         ''' while self._task_queue: task = self._task_queue.popleft() try: # Run until the next yield statement next(task) self._task_queue.append(task) except StopIteration: # Generator is no longer executing pass # Example use sched = TaskScheduler() sched.new_task(countdown(10)) sched.new_task(countdown(5)) sched.new_task(countup(15)) sched.run()
2 gửi tin nhắn cho chính nó và kết thúc trong một chu kỳ đệ quy không bị ràng buộc bởi giới hạn đệ quy của Python

Dưới đây là một ví dụ nâng cao cho thấy việc sử dụng trình tạo để triển khai ứng dụng mạng đồng thời

from collections import deque from select import select # This class represents a generic yield event in the scheduler class YieldEvent(object): def handle_yield(self, sched, task): pass def handle_resume(self, sched, task): pass # Task Scheduler class Scheduler(object): def __init__(self): self._numtasks = 0 # Total num of tasks self._ready = deque() # Tasks ready to run self._read_waiting = {} # Tasks waiting to read self._write_waiting = {} # Tasks waiting to write # Poll for I/O events and restart waiting tasks def _iopoll(self): rset,wset,eset = select(self._read_waiting, self._write_waiting,[]) for r in rset: evt, task = self._read_waiting.pop(r) evt.handle_resume(self, task) for w in wset: evt, task = self._write_waiting.pop(w) evt.handle_resume(self, task) def new(self,task): '''         Add a newly started task to the scheduler         ''' self._ready.append((task, None)) self._numtasks += 1 def add_ready(self, task, msg=None): '''         Append an already started task to the ready queue.         msg is what to send into the task when it resumes.         ''' self._ready.append((task, msg)) # Add a task to the reading set def _read_wait(self, fileno, evt, task): self._read_waiting[fileno] = (evt, task) # Add a task to the write set def _write_wait(self, fileno, evt, task): self._write_waiting[fileno] = (evt, task) def run(self): '''         Run the task scheduler until there are no tasks         ''' while self._numtasks: if not self._ready: self._iopoll() task, msg = self._ready.popleft() try: # Run the coroutine to the next yield r = task.send(msg) if isinstance(r, YieldEvent): r.handle_yield(self, task) else: raise RuntimeError('unrecognized yield event') except StopIteration: self._numtasks -= 1 # Example implementation of coroutine-based socket I/O class ReadSocket(YieldEvent): def __init__(self, sock, nbytes): self.sock = sock self.nbytes = nbytes def handle_yield(self, sched, task): sched._read_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): data = self.sock.recv(self.nbytes) sched.add_ready(task, data) class WriteSocket(YieldEvent): def __init__(self, sock, data): self.sock = sock self.data = data def handle_yield(self, sched, task): sched._write_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): nsent = self.sock.send(self.data) sched.add_ready(task, nsent) class AcceptSocket(YieldEvent): def __init__(self, sock): self.sock = sock def handle_yield(self, sched, task): sched._read_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): r = self.sock.accept() sched.add_ready(task, r) # Wrapper around a socket object for use with yield class Socket(object): def __init__(self, sock): self._sock = sock def recv(self, maxbytes): return ReadSocket(self._sock, maxbytes) def send(self, data): return WriteSocket(self._sock, data) def accept(self): return AcceptSocket(self._sock) def __getattr__(self, name): return getattr(self._sock, name) if __name__ == '__main__': from socket import socket, AF_INET, SOCK_STREAM import time # Example of a function involving generators.  This should # be called using line = yield from readline(sock) def readline(sock): chars = [] while True: c = yield sock.recv(1) if not c: break chars.append(c) if c == b'\n': break return b''.join(chars) # Echo server using generators class EchoServer(object): def __init__(self,addr,sched): self.sched = sched sched.new(self.server_loop(addr)) def server_loop(self,addr): s = Socket(socket(AF_INET,SOCK_STREAM)) s.bind(addr) s.listen(5) while True: c,a = yield s.accept() print('Got connection from ', a) self.sched.new(self.client_handler(Socket(c))) def client_handler(self,client): while True: line = yield from readline(client) if not line: break line = b'GOT:' + line while line: nsent = yield client.send(line) line = line[nsent:] client.close() print('Client closed') sched = Scheduler() EchoServer(('',16000),sched) sched.run()

Mã này chắc chắn sẽ yêu cầu một lượng nghiên cứu cẩn thận nhất định. Tuy nhiên, về cơ bản nó đang triển khai một hệ điều hành nhỏ. Có một hàng đợi các tác vụ đã sẵn sàng để chạy và có các khu vực chờ cho các tác vụ đang ngủ cho I/O. Phần lớn bộ lập lịch liên quan đến việc di chuyển các tác vụ giữa hàng đợi sẵn sàng và khu vực chờ I/O

Python có tốt cho lập trình đồng thời không?

Python cung cấp cơ chế cho cả đồng thời và song song , mỗi cơ chế có cú pháp và trường hợp sử dụng riêng. Python có hai cơ chế khác nhau để triển khai đồng thời, mặc dù chúng chia sẻ nhiều thành phần chung. Đây là luồng và coroutines hoặc không đồng bộ.

Có bao nhiêu luồng có thể chạy đồng thời Python?

Nói chung, Python chỉ sử dụng một luồng để thực thi tập hợp các câu lệnh đã viết. Điều này có nghĩa là trong python, chỉ có một luồng sẽ được thực thi tại một thời điểm.

Các luồng Python có đồng thời không?

Cả đa luồng và đa xử lý đều cho phép mã Python chạy đồng thời . Chỉ đa xử lý mới cho phép mã của bạn thực sự song song.

Python có thể chạy nhiều luồng cùng một lúc không?

Tóm lại, phân luồng trong Python cho phép nhiều luồng được tạo trong một quy trình duy nhất, nhưng do GIL, không có luồng nào chạy cùng một lúc. Threading is still a very good option when it comes to running multiple I/O bound tasks concurrently.