Python ống quy trình con là gì?

Quy trình con là cốt lõi của tự động hóa phần mềm. Trong Python, thư viện tiêu chuẩn cung cấp mô-đun quy trình con cho việc này với nhiều tùy chọn giúp nó phù hợp với nhiều mục đích sử dụng. Trong bài viết này, chúng tôi sẽ đề cập đến chủ đề cụ thể về giám sát các quy trình con đó. Các kỹ thuật cho việc này rất nhiều và một số trong số chúng hơi tiên tiến, vì vậy đây sẽ là cơ hội để so sánh chúng. Đây cũng sẽ là một cái cớ để nghiên cứu chi tiết mô-đun quy trình con [và đối tác

import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
2 của nó]

Quy trình con là gì?

Một quy trình con là việc thực thi một chương trình hoặc lệnh dưới dạng một quy trình bên ngoài nơi nó được khởi chạy. Như vậy, chương trình mẹ [ví dụ: ứng dụng Python] sẽ tạo và quản lý quy trình con cho chương trình bên ngoài này

Có hai cách chính để sử dụng các quy trình con như vậy

  • bằng cách chạy chương trình bên ngoài cho đến khi kết thúc, hoặc,
  • bằng cách bắt đầu chương trình bên ngoài dưới dạng daemon

Trong Python, mô-đun quy trình con của thư viện chuẩn cung cấp các điểm vào cho các trường hợp sử dụng chính này

  • chức năng, như tên gọi của nó, đáp ứng trường hợp sử dụng đầu tiên và,
  • lớp, được sử dụng để tạo một quy trình theo những cách linh hoạt hơn, đặc biệt là trường hợp sử dụng daemon

Cái tên

import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
3 xuất phát từ hàm popen[3], từ thư viện chuẩn C, hàm này “mở một quy trình bằng cách tạo một đường ống, rẽ nhánh và gọi trình bao”;

Thư viện tiêu chuẩn cũng đi kèm với asyncio. quy trình con cho các chương trình I/O không đồng bộ [chúng tôi sẽ trình bày cách sử dụng mô-đun này cho chủ đề giám sát của chúng tôi sau này]

Giao tiếp

Một khía cạnh quan trọng của quy trình con là giao tiếp vì người ta thường muốn có thể truyền dữ liệu từ/đến cha và con. Trong Python, điều này có thể thực hiện được bằng cách sử dụng các tham số

import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
4,
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
5 và
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
6 của
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
7 [hoặc
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
8, mặc dù hàm này cũng có tham số thuận tiện là
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
9]. Như tên gọi của chúng, đây là các luồng I/O tiêu chuẩn thường được gắn vào bất kỳ chương trình nào khi bắt đầu. một cho đầu vào, một cho đầu ra và một cho lỗi

Nhìn vào tài liệu, chúng ta có thể nhận thấy rằng các tham số này có thể nhận các loại giá trị khác nhau

, , bộ mô tả tệp hiện có [số nguyên dương], đối tượng tệp hiện có và

import subprocess

cmd = ["pg_basebackup", "-D", "pgdata", "-v", "-P"]
result = subprocess.run[cmd, capture_output=True, check=True, text=True]
0

Trong số đó, trường hợp

import subprocess

cmd = ["pg_basebackup", "-D", "pgdata", "-v", "-P"]
result = subprocess.run[cmd, capture_output=True, check=True, text=True]
1 đặc biệt thú vị vì nó cung cấp một cách thành ngữ để giao tiếp giữa cha mẹ [Python] và con thông qua các đối tượng luồng Python đẹp mắt [có thể được đọc từ hoặc ghi vào]

Giám sát các quy trình con [và một ví dụ hướng dẫn]

Như đã đề cập trong phần giới thiệu, chủ đề của bài viết này không chỉ là các quy trình con trong Python, mà còn là việc giám sát chúng. Người đọc có thể thắc mắc tại sao chúng ta lại muốn giám sát một quy trình con và ý nghĩa thực sự của nó là gì. Trước khi giới thiệu động lực cho chủ đề này, trước tiên hãy xác định một ví dụ hướng dẫn trong đó chúng ta sẽ sử dụng chương trình pg_basebackup từ PostgreSQL để sao lưu cụm cơ sở dữ liệu [điều này thường được sử dụng để thiết lập máy chủ dự phòng sao chép trực tuyến]. Nói chung, lệnh được phát hành từ máy chủ lưu trữ dự phòng [hoặc dự phòng] và thường trông giống như

$ pg_basebackup -d "host=primary.example.com user=admin" -D pgdata -v -P
pg_basebackup: initiating base backup, waiting for checkpoint to complete
pg_basebackup: checkpoint completed
pg_basebackup: write-ahead log start point: 0/E9000028 on timeline 1
pg_basebackup: starting background WAL receiver
pg_basebackup: created temporary replication slot "pg_basebackup_12956"
1808807/1808807 kB [100%], 1/1 tablespace
pg_basebackup: write-ahead log end point: 0/E9000138
pg_basebackup: waiting for background process to finish streaming ...
pg_basebackup: syncing data to disk ...
pg_basebackup: renaming backup_manifest.tmp to backup_manifest
pg_basebackup: base backup completed

Để hoàn thiện [mặc dù hiểu những điều sau đây không quan trọng lắm], chúng tôi đã sử dụng các tùy chọn sau.

import subprocess

cmd = ["pg_basebackup", "-D", "pgdata", "-v", "-P"]
result = subprocess.run[cmd, capture_output=True, check=True, text=True]
2 giữ thông tin kết nối với cụm cơ sở dữ liệu nguồn,
import subprocess

cmd = ["pg_basebackup", "-D", "pgdata", "-v", "-P"]
result = subprocess.run[cmd, capture_output=True, check=True, text=True]
3 chỉ ra thư mục đích cho các tệp sao lưu,
import subprocess

cmd = ["pg_basebackup", "-D", "pgdata", "-v", "-P"]
result = subprocess.run[cmd, capture_output=True, check=True, text=True]
4 kích hoạt chế độ chi tiết và
import subprocess

cmd = ["pg_basebackup", "-D", "pgdata", "-v", "-P"]
result = subprocess.run[cmd, capture_output=True, check=True, text=True]
5 cho phép báo cáo tiến độ

Ngoài ra, lệnh này có thể yêu cầu mật khẩu để kết nối với máy chủ chính

Lệnh này có thể mất nhiều thời gian để hoàn thành khi có nhiều dữ liệu cần truyền và tùy thuộc vào kết nối mạng hoặc thậm chí tùy thuộc vào trạng thái thực tế của cụm cơ sở dữ liệu chính;

Chẳng hạn, có những lý do khác mà người ta có thể muốn giám sát một quy trình con

  • cho người dùng biết trực tiếp nếu các lệnh chúng tôi đang khởi chạy [và có thể dài] đang chạy tốt;
  • và ngay cả khi có, hãy để người dùng ngắt lệnh đó nếu họ muốn [ví dụ: đọc thông báo tường trình cho biết điều gì đó không mong muốn]

Tổng quát hơn, khi làm việc với các hệ thống phân tán, trong đó các thành phần thường đợi các thành phần khác sẵn sàng, các chương trình thường không bao giờ kết thúc ngay cả trong trường hợp bế tắc hoặc tương tự. Vì vậy, để người dùng biết trực tiếp về tình hình là tốt

Giao diện giám sát quy trình con Python

Như đã đề cập trước đó, các quy trình con được đại diện bởi các đối tượng phơi bày các luồng tiêu chuẩn của chúng. Nói chung, chúng tôi chủ yếu quan tâm đến

import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
6 cho mục đích giám sát. Quay trở lại các giao diện có sẵn cho các luồng tiêu chuẩn, chúng tôi có. luồng của cha mẹ [i. e. đầu ra của con sẽ được chuyển hướng đến đầu ra của cha mẹ], đường ống và tệp

Vì nhiều lý do, việc sử dụng các luồng của cha mẹ không phù hợp lắm vì chúng tôi đang trộn các luồng của một số chương trình thành một và điều đó có thể gây nhầm lẫn. Ngoài ra, không có cách nào dễ dàng để kiểm soát kết xuất đầu ra của con khi được chuyển tiếp đến cha mẹ

Cách tiếp cận tập tin [i. e. tạo các đối tượng tệp trong chương trình Python gốc và chuyển bộ mô tả tệp của chúng cho con] sẽ tốt hơn nhưng làm việc với các tệp cục bộ không phải lúc nào cũng phù hợp với thiết kế của một số ứng dụng mà có thể dựa vào một số hệ thống ghi nhật ký bên ngoài [chẳng hạn như Sentry hoặc Grafana Loki]

Điều này để lại cho chúng tôi cách tiếp cận đường ống thực sự rất linh hoạt vì tất cả dữ liệu được xử lý trong bộ nhớ để lại cho chúng tôi khả năng xử lý hậu kỳ, lọc và chuyển tiếp dựa trên logic của chính chúng tôi

Theo đó, chúng tôi tiến hành bằng cách sử dụng tham số

import subprocess

cmd = ["pg_basebackup", "-D", "pgdata", "-v", "-P"]
result = subprocess.run[cmd, capture_output=True, check=True, text=True]
7 của và/hoặc
import subprocess

cmd = ["pg_basebackup", "-D", "pgdata", "-v", "-P"]
result = subprocess.run[cmd, capture_output=True, check=True, text=True]
8 trong hoặc

Với tất cả những điều này đã được thiết lập, giả sử chúng ta sẽ có quyền truy cập vào

import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
6 của quy trình, chúng ta sẽ theo dõi điều này thông qua ghi nhật ký Python dưới dạng một hàm nhật ký đơn giản

import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]

sử dụng

Chạy lệnh

for errline in result.stderr.splitlines[]:
    log_stderr[cmd[0], errline]
0 của chúng tôi với
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
8 sẽ giống như

________số 8_______

Thông thường không có cách nào để giám sát những gì xảy ra trong quá trình thực thi tiến trình con tại đây. Chúng ta chỉ còn lại giá trị trả về của

for errline in result.stderr.splitlines[]:
    log_stderr[cmd[0], errline]
2, một ví dụ, mà chúng ta chỉ có thể sử dụng sau khi kết thúc chương trình

for errline in result.stderr.splitlines[]:
    log_stderr[cmd[0], errline]

sử dụng

Khi sử dụng , chúng tôi chuyển giá trị đặc biệt

import subprocess

cmd = ["pg_basebackup", "-D", "pgdata", "-v", "-P"]
result = subprocess.run[cmd, capture_output=True, check=True, text=True]
1 dưới dạng tham số
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
5 và
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
6. Điều này sẽ làm cho Python tạo một “đường ống” cho mỗi luồng, như một kênh dữ liệu một chiều để liên lạc giữa các quá trình [trích dẫn đường ống [2]]; . Ngoài ra, đối tượng
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
3 có thể được sử dụng như một trình quản lý bối cảnh sẽ đợi tiến trình con kết thúc trong khi cho phép chúng tôi chạy mã tùy ý trong khối
for errline in result.stderr.splitlines[]:
    log_stderr[cmd[0], errline]
7. Vì vậy, thông thường

cmd = ["pg_basebackup", "-D", "pgdata", "--verbose", "--progress"]
with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
    for line in proc.stderr:
        log_stderr[cmd[0], line]

Tại đây, chúng tôi có thể giám sát con trực tiếp

import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
6 trong khi quá trình đang chạy

Tuy nhiên, sử dụng

import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
3 làm trình quản lý ngữ cảnh cũng sẽ đóng luồng khi thoát, nghĩa là chúng tôi cần xử lý chúng trong ngữ cảnh. Tương tự, chúng ta cần tự xử lý đầu vào, điều này thường cần một cuộc gọi đến

Nói chung, ví dụ trước sẽ trở thành

with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
    for line in proc.stderr:
        log_stderr[cmd[0], line]
    stdout, stderr = proc.communicate[]
result = subprocess.CompletedProcess[cmd, proc.returncode, stdout, stderr]

ngang bằng với , chúng tôi đã xây dựng một kết quả. Tuy nhiên, kiểm tra nó sẽ tiết lộ rằng thuộc tính

import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
6 của nó là một chuỗi rỗng. Điều này là do
cmd = ["pg_basebackup", "-D", "pgdata", "--verbose", "--progress"]
with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
    for line in proc.stderr:
        log_stderr[cmd[0], line]
1 đã trả về một chuỗi trống trong giá trị thứ hai của bộ dữ liệu. Đổi lại, điều này là do chúng tôi thực sự đã sử dụng đường ống
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
6 trong vòng lặp
cmd = ["pg_basebackup", "-D", "pgdata", "--verbose", "--progress"]
with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
    for line in proc.stderr:
        log_stderr[cmd[0], line]
3 để theo dõi. Vì vậy, nếu chúng tôi muốn giữ lại bản gốc
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
6 đồng thời giám sát nó, thì chúng tôi cần phải tự làm điều đó

with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
    errs = []
    for line in proc.stderr:
        log_stderr[cmd[0], line]
        errs.append[line]
    stdout, _ = proc.communicate[]
result = subprocess.CompletedProcess[cmd, proc.returncode, stdout, "\n".join[errs]]

Cũng lưu ý rằng bước

cmd = ["pg_basebackup", "-D", "pgdata", "--verbose", "--progress"]
with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
    for line in proc.stderr:
        log_stderr[cmd[0], line]
5 xuất hiện sau vòng giám sát, có nghĩa là bước này sẽ không hoạt động nếu chúng ta cần chuyển dữ liệu đầu vào. Trên thực tế, vấn đề này tổng quát hơn vì chúng tôi có thể quan tâm đến việc thực hiện đồng thời các việc trong khi quy trình đang chạy và trong khi chúng tôi đang giám sát

Giám sát đồng thời, với
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
2

Vì vậy, chúng tôi muốn giữ tác vụ ghi nhật ký của mình trong khi giao tiếp với quy trình [hoặc làm điều gì đó khác trong khi nó đang chạy]. Phân luồng có thể là một tùy chọn, nhưng vì điều này về cơ bản liên quan đến I/O, thay vào đó, chúng tôi đang hướng tới thư viện asyncio [tiêu chuẩn] cũng cung cấp hỗ trợ dựng sẵn cho các quy trình phụ. Tuy nhiên, chúng ta sẽ thấy rằng mọi thứ không đơn giản như vậy

Hãy thử điều đó

import asyncio.subprocess

proc = await asyncio.subprocess.create_subprocess_exec[
    *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
]

async def log_stderr[] -> bytes:
    errs = []
    async for line in proc.stderr:
        logger.debug["%s: %s", cmd[0], line.decode[].rstrip[]]
        errs.append[line]
    return b"".join[errs]

t = asyncio.create_task[log_stderr[]]
stdout, stderr = await asyncio.gather[proc.stdout.read[], t]

Trước tiên, chúng tôi đang xây dựng một asyncio với

cmd = ["pg_basebackup", "-D", "pgdata", "--verbose", "--progress"]
with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
    for line in proc.stderr:
        log_stderr[cmd[0], line]
7. Sau đó, chúng tôi đang sinh ra một nhiệm vụ để ghi nhật ký
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
6 của nó. Và cuối cùng, chúng tôi thu thập nhiệm vụ giám sát của mình với quy trình đọc của
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
5. Cũng lưu ý rằng tất cả các thông tin liên lạc liên quan đến
with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
    for line in proc.stderr:
        log_stderr[cmd[0], line]
    stdout, stderr = proc.communicate[]
result = subprocess.CompletedProcess[cmd, proc.returncode, stdout, stderr]
0 với thư viện asyncio

Vì vậy, chúng tôi đã đạt được sự giám sát đồng thời đối với

import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
6 của trẻ, nhưng việc sử dụng
cmd = ["pg_basebackup", "-D", "pgdata", "--verbose", "--progress"]
with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
    for line in proc.stderr:
        log_stderr[cmd[0], line]
1 vẫn không thể thực hiện được vì chúng tôi đã đọc từ
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
6 đến
with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
    for line in proc.stderr:
        log_stderr[cmd[0], line]
    stdout, stderr = proc.communicate[]
result = subprocess.CompletedProcess[cmd, proc.returncode, stdout, stderr]
4 và
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
2 không cho phép gắn nhiều đầu đọc vào một đường ống

Tại thời điểm này, chúng tôi có [ít nhất] hai lựa chọn

  • triển khai lại
    with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
        for line in proc.stderr:
            log_stderr[cmd[0], line]
        stdout, stderr = proc.communicate[]
    result = subprocess.CompletedProcess[cmd, proc.returncode, stdout, stderr]
    
    6 để bổ sung logic giám sát của chúng tôi,
  • sử dụng asyncio cấp thấp, để có nhiều quyền kiểm soát hơn đối với việc xử lý luồng
    with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
        for line in proc.stderr:
            log_stderr[cmd[0], line]
        stdout, stderr = proc.communicate[]
    result = subprocess.CompletedProcess[cmd, proc.returncode, stdout, stderr]
    
    7
Cấp thấp
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
2

Để có thể giao tiếp đúng cách với asyncio

with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
    for line in proc.stderr:
        log_stderr[cmd[0], line]
    stdout, stderr = proc.communicate[]
result = subprocess.CompletedProcess[cmd, proc.returncode, stdout, stderr]
7 của chúng tôi trong khi theo dõi
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
6 của nó, chúng tôi cần đi sâu vào API cấp thấp của asyncio. Điều này liên quan đến việc sử dụng và, trong khi chúng tôi có được sự linh hoạt hơn, điều này cũng ngụ ý việc tự xử lý các “giao thức” và “vận chuyển”. Nhưng may mắn thay, điều này được ghi lại bao gồm. Giữ lấy

Điểm bắt đầu của chúng tôi là , lấy đối số protocol_factory để xây dựng một đối số chịu trách nhiệm xử lý giao tiếp với trẻ em, cũng như quá trình thoát. Chúng tôi sẽ tận dụng điều này để xác định một giao thức tùy chỉnh liên kết một trình đọc bổ sung với mô tả tệp stderr [fd=2], điều này thực hiện giao diện

class MyProtocol[asyncio.subprocess.SubprocessStreamProtocol]:
    def __init__[self, reader, limit, loop]:
        super[].__init__[limit=limit, loop=loop]
        self._reader = reader

    def pipe_data_received[self, fd, data]:
        """Called when the child process writes data into its stdout
        or stderr pipe.
        """
        super[].pipe_data_received[fd, data]
        if fd == 2:
            self._reader.feed_data[data]

    def pipe_connection_lost[self, fd, exc]:
        """Called when one of the pipes communicating with the child
        process is closed.
        """
        super[].pipe_connection_lost[fd, exc]
        if fd == 2:
            if exc:
                self._reader.set_exception[exc]
            else:
                self._reader.feed_eof[]

Đối số

with subprocess.Popen[cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True] as proc:
    errs = []
    for line in proc.stderr:
        log_stderr[cmd[0], line]
        errs.append[line]
    stdout, _ = proc.communicate[]
result = subprocess.CompletedProcess[cmd, proc.returncode, stdout, "\n".join[errs]]
1 đối với giao thức trước đây của chúng tôi chỉ đơn giản là một ví dụ. Nhờ giao thức của chúng tôi, trình đọc này sẽ nhận được dữ liệu giống như đường ống
import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
6 của quy trình, một loại bản sao. Đây sẽ là trình xử lý của chúng tôi để theo dõi

Cuối cùng, với một chút hệ thống ống nước

loop = asyncio.get_event_loop[]

reader = asyncio.StreamReader[loop=loop]
protocol_factory = functools.partial[
    MyProtocol, reader, limit=2**16, loop=loop
]

async def log_stderr[]:
    async for line in reader:
        logger.debug["%s: %s", cmd[0], line.decode[].rstrip[]]

transport, protocol = await loop.subprocess_exec[
    protocol_factory,
    *cmd,
    stdout=subprocess.PIPE, stderr=subprocess.PIPE,
]

proc = asyncio.subprocess.Process[transport, protocol, loop]
[out, err], _ = await asyncio.gather[proc.communicate[], log_stderr[]]

Chạy chương trình này tạo ra các bản ghi sau

import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
0

chế biến ưa thích

Với cách tiếp cận này, chúng ta còn có thể làm được nhiều điều thú vị hơn là chỉ ghi nhật ký. Cụ thể, đọc đầu ra trước đó từ

for errline in result.stderr.splitlines[]:
    log_stderr[cmd[0], errline]
0, chúng tôi có thể chặn các thông báo tiến độ như “ 714149/1808798 kB [39%], 0/1 không gian bảng […]” và sử dụng chúng để báo cáo tiến độ cho người dùng

Chúng tôi sẽ sử dụng phong phú tuyệt vời để hiển thị thông báo nhật ký và báo cáo tiến trình trong các bảng riêng biệt

import logging

logger = logging.getLogger[__name__]
logging.basicConfig[level=logging.DEBUG, format="%[asctime]s - %[message]s", datefmt="[%X]"]

def log_stderr[program: str, line: str] -> None:
    logger.debug["%s: %s", program, line.rstrip[]]
1

Xem màn hình diễn viên

Partagez cet bài báo

Twitter Mastodon Facebook Linkedin

ĐẠI LÍ BỐ

DALIBO là chuyên gia tiếng Pháp của PostgreSQL®. Nous proposons du support, de lamation et du conseil depuis 2005

Đường ống có nghĩa là gì trong quy trình con?

Việc sử dụng phổ biến của đường ống là để gửi dữ liệu đến hoặc nhận dữ liệu từ một chương trình đang được chạy dưới dạng quy trình con .

Đường ống trong Python là gì?

Ống là gì? . một đường ống [. ] chuyển kết quả của phương thức này sang phương thức khác. Tôi thích Pipe vì nó làm cho mã của tôi trông gọn gàng hơn khi áp dụng nhiều phương thức cho Python iterable. a Python library that enables you to use pipes in Python. A pipe [ | ] passes the results of one method to another method. I like Pipe because it makes my code look cleaner when applying multiple methods to a Python iterable.

Làm cách nào để chuyển đầu ra trong quy trình con?

Quy trình con của Python .
quy trình con nhập khẩu
quy trình con. chạy [['ls']]
cmd=['ls'] quy trình con. chạy [cmd]
cmd=['ls'] proc1 = quy trình con. chạy [cmd, thiết bị xuất chuẩn = quy trình con. PIPE] in [proc1. thiết bị xuất chuẩn]
f = open["đầu ra. txt", "w"] # điều này tạo ra quy trình con cmd=['ls'] tệp. run[cmd, stdout=f] # cái này sẽ gửi nó tới f

tẩu ở Popen là gì?

Hàm popen[] thực thi lệnh được chỉ định bởi lệnh chuỗi. Nó tạo một đường dẫn giữa chương trình đang gọi và lệnh được thực thi , đồng thời trả về một con trỏ tới một luồng có thể được sử dụng để đọc hoặc ghi vào .

Chủ Đề