Hướng dẫn does python have asynchronous? - python có bất đồng bộ không?

Bất đồng bộ là một khái niệm rất hay gặp trong các ngôn ngữ lập trình như Javascript, Kotlin hay Python. Đặc biệt, các lập trình viên làm việc nhiều với networking như các web developer thường xuyên phải làm việc với khái niệm này. Trong bài viết này, mình sẽ giải thích một trong các thành phần cấu tạo nên hệ sinh thái async programming trong Python và tất nhiên, nó cũng mang tư tưởng này lên một số ngôn ngữ khác.

Nội dung chính

  • Mục tiêu
  • Bất đồng bộ là gì?
  • Khi nào chúng ta sử dụng event loop, thread hay process?
  • Coroutine là gì?
  • 2. Streaming system
  • 3. Bộ lập lịch cho OS
  • Tài liệu tham khảo:

Mục tiêu

  • Bất đồng bộ là gì?
  • Khi nào chúng ta sử dụng event loop, thread hay process?
  • Coroutine là gì?
  • 2. Streaming system

Bất đồng bộ là gì?

Khi nào chúng ta sử dụng event loop, thread hay process?

Coroutine là gì?

2. Streaming system

Hướng dẫn does python have asynchronous? - python có bất đồng bộ không?

3. Bộ lập lịch cho OS

Tài liệu tham khảo:

  • Tìm hiểu và so sánh các mô hình lập trình cho vấn đề xử lí bất đồng bộ
  • Bất đồng bộ và một vài cách tiếp cận

Coroutine là gì? Chúng làm việc như thế nào? So sánh các đơn vị xử lí

Coroutine trong ứng dụng thực tế

Khi nào chúng ta sử dụng event loop, thread hay process?

Coroutine là gì?

  • 2. Streaming system

    3. Bộ lập lịch cho OS

  • Tài liệu tham khảo:

    Tìm hiểu và so sánh các mô hình lập trình cho vấn đề xử lí bất đồng bộ

Bất đồng bộ và một vài cách tiếp cận:

  • Coroutine là gì? Chúng làm việc như thế nào? So sánh các đơn vị xử lí

  • Coroutine trong ứng dụng thực tế

    Theo Wikipedia

    Asynchrony, in computer programming, refers to the occurrence of events independent of the main program flow and ways to deal with such events. These may be “outside” events such as the arrival of signals, or actions instigated by a program that takes place concurrently with program execution, without the program blocking to wait for results. Asynchronous input/output is an example of the latter cause of asynchrony, and lets programs issue commands to storage or network devices that service these requests while the processor continues executing the program. Doing so provides a degree of parallelism

    Hình ảnh bên dưới cho chúng ta thấy một vài giải pháp cho các vấn đề về bất đồng bộ

  • Hệ sinh thái của lập trình bất đồng bộ

    Lưu ý, các thread trong Python là các native thread, nhưng do một vài policy (cụ thể là trong bản cpython), GIL (Global Interpreter Lock) sẽ không cho phép chúng chạy 2 thread đồng thời. Do đó, các Python thread không thực xử xử lí song song và thật sự mình không thích sử dụng chúng do:

    Tốn tài nguyên duy trì

    Fork một thread mới là cực kì tốn thời gian

Chúng ta có thể thấy rằng, các thread và process đều sở hữu một không gian bộ nhớ của riêng nó, do đó, chúng có thể thực hiện các công việc độc lập với main thread hay main process

Trái lại, event loop duy trì các task, các task này chia sẻ bộ nhớ chung và chúng ta phải trả lời câu hỏi:
def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = a + b, a

co = fib()
for _ in range(10):
    print(next(co), end=' ')
7

Trong khoa học máy tính, chúng ta có thể phân task thành 2 loại:

Trong I/O-bound task, chúng ta sẽ có một vài lệnh lấy dữ liệu (IO operation) mà tại đó chúng ta cần tối ưu (như ví dụ về David, anh ta có thể tạm ngưng các task bị pending bởi test và chuyển qua làm task khác rồi quay lại làm các task của anh ta).

Như vậy các ngắt là một vấn đề, làm sao chúng ta có thể tạo ra các ngắt trong hàm mà vẫn giữ được context của hàm để có thể thực thi tiếp?

Tại các ngắt đó, hàm đang thực thi(callee) cần trao lại quyền điều khiển chương trình (program control) cho nơi đã gọi hàm đó (caller), ở đây thực chất là event loop và chúng ta cũng cần bắt đầu tại điểm ngắt này khi caller trao quyền điều khiển cho callee khi nó được thực thi tiếp.

✅ Giải pháp ở đây là sử dụng coroutineGiải pháp ở đây là sử dụng coroutine

Coroutine là gì?

Donald Knuth nói rằng:

Subroutines là một trường hợp cơ bản của coroutine

Đúng vậy, tổng quát hoá, các hàm bình thường chúng ta hay sử dụng (hàm bị giải phóng context sau khi thoát khỏi hàm) là trường hợp đặc biệt của coroutine - nơi đó context có thể được giữ lại khi nó được tạm dùng.

Subroutine vs Coroutine

Tại sao coroutine lại hữu dụng cho event system?

  • là non-preemptive scheduling
  • có thể tạm dừng và tiếp tục tại bất kì đâu do đó, nếu dữ liệu là stream, chúng có thể tiếp kiệm bộ nhớ
  • có thể duy trì trạng thái
  • với ràng buộc về I/O, coroutine tối ưu bộ nhớ và CPU
  • chúng nhỏ gọn

Đơn vị làm việc

ProcessNative threadGreen threadGoroutineCoroutine
Bộ nhớ ≤ 8Mb ≤ Nx2Mb ≥ 64Kb ≥ 8Kb ≥ 0Mb
Quản lí bởi OS Yes Yes No No No
Pre-emptive scheduling Yes Yes Yes No No
Pre-emptive scheduling Yes No No No No
Pre-emptive scheduling Yes Yes No Yes No

Pre-emptive scheduling

Không gian địa chỉ riêng

#include 

int coroutine() {
    static int i = 0, s = 0;
    switch (s) {
        case 0:
            for (i = 0;; ++i) {
                if (!s) s = 1;
                return i;
                case 1:;
            }
    }
}

int main(int argc, char** argv) {
    printf("%d\n", coroutine());     // ?
    printf("%d\n", coroutine());     // ?
    printf("%d\n", coroutine());     // ?
    return 0;
}

Khả năng song song

Câu hỏi là: Vậy coroutine làm việc như thế nào?

Làm thế nào để cài đặt một coroutine?

def coroutine():
    i = 0
    while 1:
        yield i
        i += 1

co = coroutine()
next(co)
next(co)
next(co)

Về cơ bản, nó cố gắng lưu lại trạng thái của hàm trong biến
def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = a + b, a

co = fib()
for _ in range(10):
    print(next(co), end=' ')
8 và biến
def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = a + b, a

co = fib()
for _ in range(10):
    print(next(co), end=' ')
9 đóng vai trò như là ngắt. Trước khi tạm dừng hàm(suspend), biến
def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = a + b, a

co = fib()
for _ in range(10):
    print(next(co), end=' ')
9 được set là điểm bắt đầu khi nó được khôi phục(resume).

def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = a + b, a

co = fib()
for _ in range(10):
    print(next(co), end=' ')

Trong đoạn code này, điểm chính là biến

def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = a + b, a

co = fib()
for _ in range(10):
    print(next(co), end=' ')
9 và cách mà code có thể resume và suspend coroutine bằng cách dùng
0 1 1 2 3 5 8 13 21 34
2

0 1 1 2 3 5 8 13 21 34

Và ở bên dưới, nó được chuyển sang Python code từ code C ở trên

#include 

int fib() {
    static int i, __resume__ = 0;
    static a = 0, b = 1, c;
    switch (__resume__) {
        case 0:
            for (i = 0;; ++i) {
                if (!__resume__) __resume__ = 1;
                c = a + b;
                b = a;
                a = c;
                return a;
                case 1:;
            }
    }
}

int main() {
    for (int i = 0; i < 10; ++i) {
        printf("%d ", fib());
    }
    return 0;
}
def say():
    yield "C"
    yield "Java"
    yield "Python"
    
co = say()
print(next(co))
print(next(co))
print(next(co))
print(next(co))

Bạn có thể chuyển Python code này sang C?

C
Java
Python
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-1-913b1d7d4200> in <module>
      8 print(next(co))
      9 print(next(co))
---> 10 print(next(co))

StopIteration:
#include 

char* say() {
    static int __resume__ = 0;
    switch (__resume__) {
        case 0:
            __resume__ = 1;
            return "C";
        case 1:
            __resume__ = 2;
            return "Java";
        case 2:
            __resume__ = 3;
            return "Python";
        default:
            return NULL;           // GeneratorExit
    }
}

int main() {
    printf("%s\n", say());
    printf("%s\n", say());
    printf("%s\n", say());
    printf("%s\n", say());
    return 0;
}

Sau đó, bạn nên thấy kết quản như thế này

Mình có thể build bất kì một coroutine nào trong C. Bạn có thể làm được điều đó không?

Kết quả có thể thấy

Chúng ta có thể thấy rằng, coroutine cần một không gian bộ nhớ tĩnh để lưu lại trạng thái khi nó suspend và khôi phục lại mà không bị mất context. Trong C, không gian tĩnh là các biến static, chúng duy trì bởi OS khi một hàm thoát. Trong Python, context của hàm được lưu trữ trong các stack frame.

Hãy nghĩ về các coroutine như là các đoạn của một chương trình, không có bộ nhớ riêng, không thực thi song song và cực kì an toàn.

def coro_fn():
    val = yield 'Starting'   # started coroutine and suspend, return control to caller
    print('Consume', val)
    yield 'Hello World'      # produce data
    
co = coro_fn()               # create a new coroutine object
print(co.send(None))         # start coroutine
print(co.send('data'))       # resume coroutine, pass control into coroutine
co.close()                   # close coroutine

Coroutine vs Threads

Starting
Consume data
Hello World

Coroutine giảm các lỗi do xử lí đa tiến trình (đa luồng) gây ra và mình nghĩ nó là giải pháp tốt nhất cho các task liên quan đến networking bởi nó chỉ tồn tại trong 1 tiến trình.

def coroutine():
    i = 0
    while 1:
        yield i
        i += 1

co = coroutine()
next(co)
next(co)
next(co)
0
def coroutine():
    i = 0
    while 1:
        yield i
        i += 1

co = coroutine()
next(co)
next(co)
next(co)
1

Trong Python, chúng ta có thể định nghĩa coroutine bằng việc sử dụng lệnh

0 1 1 2 3 5 8 13 21 34
3 trong định nghĩa hàm. Khi chúng ta gọi hàm, chúng trả về một coroutine thay vì kết quả cuối cùng.

Sau đó, kết quả có thể thấy

Generator là một trường hợp đặc biệt của coroutine, chúng chỉ có thể sinh(produce) dữ liệu mà không thể tiêu thụ(consuming) dữ liệu.

def coroutine():
    i = 0
    while 1:
        yield i
        i += 1

co = coroutine()
next(co)
next(co)
next(co)
3

Và đây là kết quả

def coroutine():
    i = 0
    while 1:
        yield i
        i += 1

co = coroutine()
next(co)
next(co)
next(co)
2

def coroutine():
    i = 0
    while 1:
        yield i
        i += 1

co = coroutine()
next(co)
next(co)
next(co)
4
def coroutine():
    i = 0
    while 1:
        yield i
        i += 1

co = coroutine()
next(co)
next(co)
next(co)
5

Chúng ta có thể refactor đoạn code với
0 1 1 2 3 5 8 13 21 34
4

Build binary tree with
0 1 1 2 3 5 8 13 21 34
4

Build cây nhị phân với

0 1 1 2 3 5 8 13 21 34
6

  • Ứng dụng của coroutine

  • 1. Máy chủ TCP bất đồng bộEVENT_READ, EVENT_WRITE

  • Trong trường hợp này, một máy chủ TCP là một event system

  • event source: socket lắng nghe và các socket kết nối

def coroutine():
    i = 0
    while 1:
        yield i
        i += 1

co = coroutine()
next(co)
next(co)
next(co)
6

về cơ bản, chúng ta có hai dạng sự kiện: EVENT_READ, EVENT_WRITE

def coroutine():
    i = 0
    while 1:
        yield i
        i += 1

co = coroutine()
next(co)
next(co)
next(co)
7

và các task là các coroutine, mỗi task sẽ xử lí 1 sự kiện của 1 kết nối tại 1 thời điểm

Chúng ta cũng có một event loop, nó là một bộ I/O multiplexing cho file descriptor

Bạn có thể chạy nó

Trình lập lịch và task trong các thư viện thực tế: https://github.com/dabeaz/curio/blob/master/curio/kernel.py#L188

2. Streaming system

Chúng ta có thể sử dụng coroutine để build một hệ thống xử lí dữ liệu. Về cơ bản, hệ thống tách biệt các khối logic nhỏ. Chúng được đặt vào các coroutine với context riêng. Bạn có thể thấy chúng trong hình bên dưới.

Ví dụ: build một bộ phân tích địa chỉ IP truy cập của một webserver

Đầu tiên, bạn cần một file dữ liệu log

Thống kê IP
def coroutine():
    i = 0
    while 1:
        yield i
        i += 1

co = coroutine()
next(co)
next(co)
next(co)
8

Sau đó chạy

def coroutine():
    i = 0
    while 1:
        yield i
        i += 1

co = coroutine()
next(co)
next(co)
next(co)
9

Kết quả có thể thấy

def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = a + b, a

co = fib()
for _ in range(10):
    print(next(co), end=' ')
0

Cải thiện

Chúng ta có thể bọc các thread trong một couroutine, tại sao không?

Đơn giản, chúng ta sử dụng các thread thay cho các máy

OK, hãy thiết kế tại cái sơ đồ nào

Kết hợp coroutine và thread

Trong sơ đồ trên, mình chuyển các logic vào trong các thread và sử dụng hàng đợi như là kênh giao tiếp với các thread.

Không chỉ vậy, hàng đơi còn đóng vai trò như là các buffer nếu tốc độ đầu vào lớn hơn tốc độ đầu ra của đơn vị xử lí đó.

def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = a + b, a

co = fib()
for _ in range(10):
    print(next(co), end=' ')
1

Và chạy nó như sau

def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = a + b, a

co = fib()
for _ in range(10):
    print(next(co), end=' ')
2

Then

def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = a + b, a

co = fib()
for _ in range(10):
    print(next(co), end=' ')
3

Oh, Tại sao kết quả lại rỗng?

Mình cho các bạn gợi ý, hay thêm câu lệnh

0 1 1 2 3 5 8 13 21 34
7 trước câu lệnh
0 1 1 2 3 5 8 13 21 34
8 xem sao 😄

Lưu ý

  • Khi chúng ta sử dụng coroutine, chúng ta nên xem xét coroutine có thể bị overload hay không. Nghĩa là, tại một thời điểm, coroutine đó có thể vừa bị đẩy dữ liệu vào, vừa đang xử lí dữ liệu bên trong nó hay không? Nó là một trường hợp khá nguy hiểm, khiến cho chương trình có thể bị crash.

  • Tránh các thiết kế (DAG)[https://en.wikipedia.org/wiki/Directed_acyclic_graph]

  • Chỉ gọi

    0 1 1 2 3 5 8 13 21 34
    
    9 trong luồng đồng bộ, ý mình là chỉ gọi
    0 1 1 2 3 5 8 13 21 34
    
    9 trong một single thread.

3. Bộ lập lịch cho OS

Operation system scheduler

Khi một câu lệnh trong một task

#include 

int fib() {
    static int i, __resume__ = 0;
    static a = 0, b = 1, c;
    switch (__resume__) {
        case 0:
            for (i = 0;; ++i) {
                if (!__resume__) __resume__ = 1;
                c = a + b;
                b = a;
                a = c;
                return a;
                case 1:;
            }
    }
}

int main() {
    for (int i = 0; i < 10; ++i) {
        printf("%d ", fib());
    }
    return 0;
}
1, task sẽ trả lại quyền điều khiển cho OS và OS thực thi lệnh hoặc chuyển quyền điều khiển cho task khác trong hàng đợi.

Nó là một non-preemptive scheduler, qua ví dụ dưới đây, các bạn có thể hiểu mối liên hệ giữa.

#include 

int fib() {
    static int i, __resume__ = 0;
    static a = 0, b = 1, c;
    switch (__resume__) {
        case 0:
            for (i = 0;; ++i) {
                if (!__resume__) __resume__ = 1;
                c = a + b;
                b = a;
                a = c;
                return a;
                case 1:;
            }
    }
}

int main() {
    for (int i = 0; i < 10; ++i) {
        printf("%d ", fib());
    }
    return 0;
}
2 trong OS và
0 1 1 2 3 5 8 13 21 34
3 trong Python.

def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = a + b, a

co = fib()
for _ in range(10):
    print(next(co), end=' ')
4

OK, khởi động OS nào

def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = a + b, a

co = fib()
for _ in range(10):
    print(next(co), end=' ')
5

Và bạn có thể thấy

def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = a + b, a

co = fib()
for _ in range(10):
    print(next(co), end=' ')
6

Thật kì diệu.

Hẹn gặp lại các bạn ở các bài viết sau.

Tài liệu tham khảo:

  • Python documentation https://docs.python.org/3/
  • Talk about coroutine https://www.dabeaz.com/coroutines/Coroutines.pdf