Hướng dẫn does python have asynchronous? - python có bất đồng bộ không?
Bất đồng bộ là một khái niệm rất hay gặp trong các ngôn ngữ lập trình như Javascript, Kotlin hay Python. Đặc biệt, các lập trình viên làm việc nhiều với networking như các web developer thường xuyên phải làm việc với khái niệm này. Trong bài viết này, mình sẽ giải thích một trong các thành phần cấu tạo nên hệ sinh thái async programming trong Python và tất nhiên, nó cũng mang tư tưởng này lên một số ngôn ngữ khác. Show Nội dung chính
Mục tiêu
Bất đồng bộ là gì?Khi nào chúng ta sử dụng event loop, thread hay process?
2. Streaming system 3. Bộ lập lịch cho OSTài liệu tham khảo:
Coroutine là gì? Chúng làm việc như thế nào? So sánh các đơn vị xử lí Coroutine trong ứng dụng thực tế Khi nào chúng ta sử dụng event loop, thread hay process?Coroutine là gì?
Bất đồng bộ và một vài cách tiếp cận:
Chúng ta có thể thấy rằng, các thread và process đều sở hữu một không gian bộ nhớ của riêng nó, do đó, chúng có thể thực hiện các công việc độc lập với main thread hay main process Trái lại, event loop duy trì các task, các task này chia sẻ bộ nhớ chung và chúng ta phải trả lời câu hỏi: def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
|
Process | Native thread | Green thread | Goroutine | Coroutine | |
---|---|---|---|---|---|
Bộ nhớ | ≤ 8Mb | ≤ Nx2Mb | ≥ 64Kb | ≥ 8Kb | ≥ 0Mb |
Quản lí bởi OS | Yes | Yes | No | No | No |
Pre-emptive scheduling | Yes | Yes | Yes | No | No |
Pre-emptive scheduling | Yes | No | No | No | No |
Pre-emptive scheduling | Yes | Yes | No | Yes | No |
Pre-emptive scheduling
Không gian địa chỉ riêng
#include
int coroutine() {
static int i = 0, s = 0;
switch (s) {
case 0:
for (i = 0;; ++i) {
if (!s) s = 1;
return i;
case 1:;
}
}
}
int main(int argc, char** argv) {
printf("%d\n", coroutine()); // ?
printf("%d\n", coroutine()); // ?
printf("%d\n", coroutine()); // ?
return 0;
}
Khả năng song song
Câu hỏi là: Vậy coroutine làm việc như thế nào?
Làm thế nào để cài đặt một coroutine?
def coroutine():
i = 0
while 1:
yield i
i += 1
co = coroutine()
next(co)
next(co)
next(co)
Về cơ bản, nó cố gắng lưu lại trạng thái của hàm trong biến def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
8 và biến def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
9 đóng vai trò như là ngắt. Trước khi tạm dừng hàm(suspend), biến def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
9 được set là điểm bắt đầu khi nó được khôi phục(resume).
def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
Trong đoạn code này, điểm chính là biến
def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
9 và cách mà code có thể resume và suspend coroutine bằng cách dùng 0 1 1 2 3 5 8 13 21 34
20 1 1 2 3 5 8 13 21 34
Và ở bên dưới, nó được chuyển sang Python code từ code C ở trên
#include
int fib() {
static int i, __resume__ = 0;
static a = 0, b = 1, c;
switch (__resume__) {
case 0:
for (i = 0;; ++i) {
if (!__resume__) __resume__ = 1;
c = a + b;
b = a;
a = c;
return a;
case 1:;
}
}
}
int main() {
for (int i = 0; i < 10; ++i) {
printf("%d ", fib());
}
return 0;
}
def say():
yield "C"
yield "Java"
yield "Python"
co = say()
print(next(co))
print(next(co))
print(next(co))
print(next(co))
Bạn có thể chuyển Python code này sang C?
C
Java
Python
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-1-913b1d7d4200> in <module>
8 print(next(co))
9 print(next(co))
---> 10 print(next(co))
StopIteration:
#include
char* say() {
static int __resume__ = 0;
switch (__resume__) {
case 0:
__resume__ = 1;
return "C";
case 1:
__resume__ = 2;
return "Java";
case 2:
__resume__ = 3;
return "Python";
default:
return NULL; // GeneratorExit
}
}
int main() {
printf("%s\n", say());
printf("%s\n", say());
printf("%s\n", say());
printf("%s\n", say());
return 0;
}
Sau đó, bạn nên thấy kết quản như thế này
Mình có thể build bất kì một coroutine nào trong C. Bạn có thể làm được điều đó không?
Kết quả có thể thấyChúng ta có thể thấy rằng, coroutine cần một không gian bộ nhớ tĩnh để lưu lại trạng thái khi nó suspend và khôi phục lại mà không bị mất context. Trong C, không gian tĩnh là các biến static, chúng duy trì bởi OS khi một hàm thoát. Trong Python, context của hàm được lưu trữ trong các stack frame.
Hãy nghĩ về các coroutine như là các đoạn của một chương trình, không có bộ nhớ riêng, không thực thi song song và cực kì an toàn.
def coro_fn():
val = yield 'Starting' # started coroutine and suspend, return control to caller
print('Consume', val)
yield 'Hello World' # produce data
co = coro_fn() # create a new coroutine object
print(co.send(None)) # start coroutine
print(co.send('data')) # resume coroutine, pass control into coroutine
co.close() # close coroutine
Coroutine vs Threads
Starting
Consume data
Hello World
Coroutine giảm các lỗi do xử lí đa tiến trình (đa luồng) gây ra và mình nghĩ nó là giải pháp tốt nhất cho các task liên quan đến networking bởi nó chỉ tồn tại trong 1 tiến trình.
def coroutine():
i = 0
while 1:
yield i
i += 1
co = coroutine()
next(co)
next(co)
next(co)
0def coroutine():
i = 0
while 1:
yield i
i += 1
co = coroutine()
next(co)
next(co)
next(co)
1Trong Python, chúng ta có thể định nghĩa coroutine bằng việc sử dụng lệnh
0 1 1 2 3 5 8 13 21 34
3 trong định nghĩa hàm. Khi chúng ta gọi hàm, chúng trả về một coroutine thay vì kết quả cuối cùng.Sau đó, kết quả có thể thấyGenerator là một trường hợp đặc biệt của coroutine, chúng chỉ có thể sinh(produce) dữ liệu mà không thể tiêu thụ(consuming) dữ liệu.
def coroutine():
i = 0
while 1:
yield i
i += 1
co = coroutine()
next(co)
next(co)
next(co)
3Và đây là kết quả
def coroutine():
i = 0
while 1:
yield i
i += 1
co = coroutine()
next(co)
next(co)
next(co)
2
def coroutine():
i = 0
while 1:
yield i
i += 1
co = coroutine()
next(co)
next(co)
next(co)
def coroutine():
i = 0
while 1:
yield i
i += 1
co = coroutine()
next(co)
next(co)
next(co)
4def coroutine():
i = 0
while 1:
yield i
i += 1
co = coroutine()
next(co)
next(co)
next(co)
5Chúng ta có thể refactor đoạn code với 0 1 1 2 3 5 8 13 21 34
4
0 1 1 2 3 5 8 13 21 34
Build binary tree with 0 1 1 2 3 5 8 13 21 34
4
0 1 1 2 3 5 8 13 21 34
Build cây nhị phân với
0 1 1 2 3 5 8 13 21 34
6Ứng dụng của coroutine
1. Máy chủ TCP bất đồng bộEVENT_READ, EVENT_WRITE
Trong trường hợp này, một máy chủ TCP là một event system
event source: socket lắng nghe và các socket kết nối
def coroutine():
i = 0
while 1:
yield i
i += 1
co = coroutine()
next(co)
next(co)
next(co)
6về cơ bản, chúng ta có hai dạng sự kiện: EVENT_READ, EVENT_WRITE
def coroutine():
i = 0
while 1:
yield i
i += 1
co = coroutine()
next(co)
next(co)
next(co)
7và các task là các coroutine, mỗi task sẽ xử lí 1 sự kiện của 1 kết nối tại 1 thời điểm
Chúng ta cũng có một event loop, nó là một bộ I/O multiplexing cho file descriptor
Bạn có thể chạy nó
Trình lập lịch và task trong các thư viện thực tế: https://github.com/dabeaz/curio/blob/master/curio/kernel.py#L1882. Streaming system
Chúng ta có thể sử dụng coroutine để build một hệ thống xử lí dữ liệu. Về cơ bản, hệ thống tách biệt các khối logic nhỏ. Chúng được đặt vào các coroutine với context riêng. Bạn có thể thấy chúng trong hình bên dưới.
Ví dụ: build một bộ phân tích địa chỉ IP truy cập của một webserver
Đầu tiên, bạn cần một file dữ liệu log
def coroutine():
i = 0
while 1:
yield i
i += 1
co = coroutine()
next(co)
next(co)
next(co)
8Sau đó chạy
def coroutine():
i = 0
while 1:
yield i
i += 1
co = coroutine()
next(co)
next(co)
next(co)
9Kết quả có thể thấy
def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
0Cải thiện
Chúng ta có thể bọc các thread trong một couroutine, tại sao không?
Đơn giản, chúng ta sử dụng các thread thay cho các máy
OK, hãy thiết kế tại cái sơ đồ nào
Kết hợp coroutine và threadTrong sơ đồ trên, mình chuyển các logic vào trong các thread và sử dụng hàng đợi như là kênh giao tiếp với các thread.
Không chỉ vậy, hàng đơi còn đóng vai trò như là các buffer nếu tốc độ đầu vào lớn hơn tốc độ đầu ra của đơn vị xử lí đó.
def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
1Và chạy nó như sau
def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
2Then
def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
3Oh, Tại sao kết quả lại rỗng?
Mình cho các bạn gợi ý, hay thêm câu lệnh
0 1 1 2 3 5 8 13 21 34
7 trước câu lệnh 0 1 1 2 3 5 8 13 21 34
8 xem sao 😄Lưu ý
Khi chúng ta sử dụng coroutine, chúng ta nên xem xét coroutine có thể bị overload hay không. Nghĩa là, tại một thời điểm, coroutine đó có thể vừa bị đẩy dữ liệu vào, vừa đang xử lí dữ liệu bên trong nó hay không? Nó là một trường hợp khá nguy hiểm, khiến cho chương trình có thể bị crash.
Tránh các thiết kế (DAG)[https://en.wikipedia.org/wiki/Directed_acyclic_graph]
Chỉ gọi
9 trong luồng đồng bộ, ý mình là chỉ gọi0 1 1 2 3 5 8 13 21 34
9 trong một single thread.0 1 1 2 3 5 8 13 21 34
3. Bộ lập lịch cho OS
Operation system schedulerKhi một câu lệnh trong một task
#include
int fib() {
static int i, __resume__ = 0;
static a = 0, b = 1, c;
switch (__resume__) {
case 0:
for (i = 0;; ++i) {
if (!__resume__) __resume__ = 1;
c = a + b;
b = a;
a = c;
return a;
case 1:;
}
}
}
int main() {
for (int i = 0; i < 10; ++i) {
printf("%d ", fib());
}
return 0;
}
1, task sẽ trả lại quyền điều khiển cho OS và OS thực thi lệnh hoặc chuyển quyền điều khiển cho task khác trong hàng đợi.Nó là một non-preemptive scheduler, qua ví dụ dưới đây, các bạn có thể hiểu mối liên hệ giữa.
#include
int fib() {
static int i, __resume__ = 0;
static a = 0, b = 1, c;
switch (__resume__) {
case 0:
for (i = 0;; ++i) {
if (!__resume__) __resume__ = 1;
c = a + b;
b = a;
a = c;
return a;
case 1:;
}
}
}
int main() {
for (int i = 0; i < 10; ++i) {
printf("%d ", fib());
}
return 0;
}
2 trong OS và 0 1 1 2 3 5 8 13 21 34
3 trong Python.def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
4OK, khởi động OS nào
def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
5Và bạn có thể thấy
def fib():
a, b = 0, 1
while True:
yield a
a, b = a + b, a
co = fib()
for _ in range(10):
print(next(co), end=' ')
6Thật kì diệu.
Hẹn gặp lại các bạn ở các bài viết sau.
Tài liệu tham khảo:
- Python documentation https://docs.python.org/3/
- Talk about coroutine https://www.dabeaz.com/coroutines/Coroutines.pdf