Hướng dẫn thread manager python - trăn quản lý luồng

Trong bài viết này, tôi muốn giới thiệu với các bạn về các cơ chế, kỹ thuật đồng bộ trong lập trình đa luồng [multithreading]. Các kỹ thuật được trình bày trong ngôn ngữ Python nhưng về nguyên lý đều có thể áp dụng cho các ngôn ngữ khác. Những từ khóa chính trong bài viết:

import math
import threading

def is_prime[n]:
	"""Check if n is prime or not
    """
    root = int[math.sqrt[n]]
    for i in range[2, root + 1]:
        if n % i == 0:
            return False
    return True

class CountPrime[threading.Thread]:

    def __init__[self, list_num, list_lock, out_lock, output]:
        threading.Thread.__init__[self]
        self._list = list_num	# list of number from 2 to N
        self._list_lock = list_lock	# Lock for list_num
        self._out_lock = out_lock	# Lock for output
        self._output = output	# list of prime numbers

    def run[self]:
        while True:
        	# request to access shared resource
            # if there are many threads acquiring Lock, only one thread get the Lock
            # and other threads will get blocked
            self._list_lock.acquire[]
            try:
                n = next[self._list]	# pop a number in list_num
            except StopIteration:
                return
            finally:
            	# release the Lock, so other thread can gain the Lock to access list_num
                self._list_lock.release[]

            if not is_prime[n]:
                continue

            self._out_lock.acquire[]
            self._output.append[n]
            self._out_lock.release[]

def parallel_primes[n, num_threads=None]:
    """Parallel count number of prime from 2 to n

    Count prime numbers from 2 to n using num_threads threads
    If num_threads is None, using os.cpu_count[]

    """
    list_num = [i for i in range[2, n + 1]]
    list_lock = threading.Lock[]
    out_lock = threading.Lock[]
    output = []
    threads = []

    if num_threads is None:
        try:
            num_threads = os.cpu_count[]
        except AttributeError:
            num_threads = 4

    elif num_threads  N thành các đoạn, nếu chương trình có K luồng, thì ta sẽ có K đoạn [có số lượng phần tử bằng nhau], và mỗi luồng tính toán trong từng đoạn riêng biệt.
  • Lưu các số 2 -> N vào một mảng chung, nếu một luồng nào đấy rảnh sẽ lấy ra một số trong mảng đấy để kiểm tra, nếu là số nguyên tố thì thêm nó vào một mảng chứa các số nguyên tố [mảng này cũng được chia sẻ giữa các luồng].
  • Về cân bằng tải giữa luồng: Ta dễ thấy rằng nếu số càng lớn thì khối lượng tính toán để kiểm tra xem số đó có là số nguyên tố hay không càng lớn, vì vậy với cách chia thứ nhất, những luồng xử lý trên dãy các số nguyên bé sẽ hoàn thành công việc sớm hơn, và những luồng về sau sẽ càng mất nhiều thời gian để tính toán hơn, cho dù số phần tử của mỗi đoạn là bằng nhau. Do đó, ta thấy rằng cách chia thứ nhất cân bằng tải giữa các luồng không được tốt.

    Với cách chia thứ hai, các luồng sẽ luân phiên nhau lấy ra một gía trị trong mảng chung, kiểm tra tính nguyên tố của gía trị đó một cách độc lập, sau khi hoàn thành công việc sẽ lấy một gía trị khác để kiểm tra, ta thấy rằng công việc được phân bố đều giữa các luồng, vì vậy cách chia thứ hai cho ta cân bằng tải tốt hơn.

    Về đồng bộ luồng: Ở đây ta xét cách chia thứ hai, các luồng sẽ chia sẻ 2 tài nguyên chung đó là mảng các số 2 -> N và mảng chứa các số nguyên tố. Ta cần đảm bảo tại mỗi thời điểm chỉ có duy nhất một luồng được truy cập tài nguyên chung, nếu luồng khác muốn truy cập phải đợi [block] cho đến khi luồng trước đó cập nhật xong. Python cung cấp cho chúng ta các class cơ bản để thực hiện đồng bộ luồng bao gồm:

    import math
    import threading
    
    def is_prime[n]:
    	"""Check if n is prime or not
        """
        root = int[math.sqrt[n]]
        for i in range[2, root + 1]:
            if n % i == 0:
                return False
        return True
    
    class CountPrime[threading.Thread]:
    
        def __init__[self, list_num, list_lock, out_lock, output]:
            threading.Thread.__init__[self]
            self._list = list_num	# list of number from 2 to N
            self._list_lock = list_lock	# Lock for list_num
            self._out_lock = out_lock	# Lock for output
            self._output = output	# list of prime numbers
    
        def run[self]:
            while True:
            	# request to access shared resource
                # if there are many threads acquiring Lock, only one thread get the Lock
                # and other threads will get blocked
                self._list_lock.acquire[]
                try:
                    n = next[self._list]	# pop a number in list_num
                except StopIteration:
                    return
                finally:
                	# release the Lock, so other thread can gain the Lock to access list_num
                    self._list_lock.release[]
    
                if not is_prime[n]:
                    continue
    
                self._out_lock.acquire[]
                self._output.append[n]
                self._out_lock.release[]
    
    def parallel_primes[n, num_threads=None]:
        """Parallel count number of prime from 2 to n
    
        Count prime numbers from 2 to n using num_threads threads
        If num_threads is None, using os.cpu_count[]
    
        """
        list_num = [i for i in range[2, n + 1]]
        list_lock = threading.Lock[]
        out_lock = threading.Lock[]
        output = []
        threads = []
    
        if num_threads is None:
            try:
                num_threads = os.cpu_count[]
            except AttributeError:
                num_threads = 4
    
        elif num_threads 

    Bài Viết Liên Quan

    Chủ Đề