Disruptor là gì

--- tags: LMAX Disruptor --- # 000: Tìm hiểu về LMAX Disruptor để thay thế BlockingQueue trong Java Các Java Engineer chắc hẳn không xa lạ gì với BlockingQueue, một dạng internal pub-sub trong chương trình. Đặc biệt với multi-thread programming, thread gửi/nhận message tới thread khác thông qua queue để xử lý các bài toán bất đồng bộ hoặc một số lượng cực lớn các task. Threadpool mà chúng ta hay dùng cũng implement BlockingQueue phía dưới nhằm điều phối các task đến các thread để được thực thi. Tuy nhiên như chúng ta biết, bản chất của BlockingQueue là **block**, là **lock**, là **synchronize**. Điều đó khiến [**context switch**](https://hackmd.io/@datbv/rk3pA6XK_#2-Context-switch) xảy ra, tăng latency và trực tiếp làm giảm performance của hệ thống. Các kỹ sư của LMAX đã tạo ra **Disruptor** để xử lý bài toán này. Không chỉ bài toán này mà rất nhiều bài toán khác liên quan đến internal pub-sub trong một ứng dụng. Chúng ta cùng đi tìm hiểu cụ thể hơn LMAX Disruptor là gì và vì sao nó có thể thay thế BlockingQueue trong Java qua series lần này với các chủ đề: - 001: Hiểu về Queue và BlockingQueue trong Java - 002: LMAX Disruptor là gì? - 003: LMAX Disruptor và BlockingQueue - 004: Sức mạnh của LMAX Disruptor nằm ở đâu? - 005: LMAX Disruptor nên được áp dụng thế nào? ### Reference - [LMAX Disruptor](https://lmax-exchange.github.io/disruptor) - [The LMAX Architecture](https://martinfowler.com/articles/lmax.html) - [Github LMAX Disruptor Wiki](https://github.com/LMAX-Exchange/disruptor/wiki) © [Dat Bui](https://www.linkedin.com/in/datbv/)

Tiếp nối bài trước về Mechanical Sympathy và giới thiệu qua về LMAX Disruptor, ở bài viết này người viết sẽ phân tích sâu hơn và hướng dẫn cách sử dụng về bộ thư viện Disruptor.

LMAX-Disruptor là một bộ thư viện giúp cho việc phát triển các ứng dụng với độ tải lớn (high-performance) cho phép xử lý đồng thời (concurrency) một số lượng rất lớn message mà không cần Lock (lock-free). Nếu bạn làm việc với Java thì thực tế đây là bộ thư viện về Concurrency tốt nhất và nhanh nhất hiện nay.

Quay lại ví dụ ở bài trước khi ta khởi tạo một Disruptor đơn giản với 1 Producer và 1 Comsumer.

Ở bài trước tui đã giải thích về ý nghĩa các tham số của Disruptor constructor, nhưng chưa bàn tới các "Waiting strategy" được đại diện bởi interface WaitStrategy. Disruptor có 4 chiến thuật chờ đợi chính là BlockingWaitStrategy, SleepingWaitStrategy, YieldingWaitStrategyBusySpinWaitStrategy chúng ta sẽ tìm hiểu bên dưới đây.

Wait Strategies

BlockingWaitStrategy:

Mặc định Disruptor sẽ sử dụng Wait Strategy là BlockingWaitStrategy, trong nội bộ (code) của BlockingWaitStrategy nó sử dụng ReentrantLock để đồng bộ hóa (synchronized) và dùng Condition để wait và notify các Producer và Consumer để giải quyết các vấn đề về “Producer–consumer problem”.
Đây là cách sử lý chậm nhất trong các wait strategies (vì sử dụng cơ chế Looking), nhưng đây là cách sử dụng an toàn và thân thiện nhất với tài nguyên của CPU, có nghĩa là Disruptor sẽ không gây áp lực lên CPU nhưng đánh đổi lại là tốc độ sử lý của Disruptor. Và đây là cách người viết khuyên dùng khi hệ thống của bạn lượng tài nguyên CPU hạn chế hoặc CPU hay bị tình trạng quá tải (high load).

SleepingWaitStrategy:

Giống như BlockingWaitStrategy đây cũng là một cách sử dụng khá thân thiện với CPU, bằng cách sử dụng kỹ thuật "busy wait loop" bằng cách sử dụng BlockingWaitStrategy0 để dừng (pause) thread hiện tại của Producer hoặc Consumer để wait hoặc notify khi có điều kiện thỏa mãn (Ring buffer đầy hoặc rỗng hoặc hết đầy hoặc có dữ liệu tùy từng trường hợp), giá trị mặc định mỗi lần pause thread (DEFAULT_SLEEP) là 100 nanoseconds và sẽ lặp lại 200 lần cho tới khi có điều kiện thoả mãn trên xuất hiện.
Tuy không sử dụng cơ chế Looking (look-free) nhưng các thread liên tục phải waiting do đó độ trễ sẽ cao (high latency) cho nên tốc độ thực thi của Disruptor sẽ không cao, nhưng bù lại các thread/process sẽ không bị các trường hợp về context switching, do đó số lượng thread/process tạo ra sẽ không nhiều và CPU sẽ không bị stress.
Với trường hợp ứng dụng cần độ trễ thấp (low latency) thì đây là cách không nên sử dụng, nhưng lại rất phù hợp với các chức năng không quan tâm tới độ trễ, ví dụ như tính năng ghi log hệ thống chẳng hạn (asynchronous logging).

YieldingWaitStrategy:

YieldingWaitStrategy cũng là một kiểu Wait Strategy giống như SleepingWaitStrategy nhưng có độ trễ thấp hơn (low latency) bằng cách sử dụng chiến thuật "busy spin waiting". Bên trong YieldingWaitStrategy sẽ sử dụng method BlockingWaitStrategy4 để khi Ring Buffer đang rảnh rỗi thì sẽ nhường và ưu tiên các Thread khác được phép chạy trước.
Đây là chiến thuật được khuyến khích khi ứng dụng của bạn cần hiệu suất cao (high performance) và số lượng luồng (thread) của Consumer (Event Handler) nhỏ hơn số lượng logical cores của CPU. Có nghĩa là CPU của bạn có hỗ trợ công nghệ và bật chế độ Hyper Threading, và số lõi logical nhiều hơn số lượng của Multi-cast Consumer.

BusySpinWaitStrategy:

Tốc độ thực thi của BusySpinWaitStrategy là nhanh nhất trong các Wait Strategy nhưng nó lại gây áp lực (stress) lên tài nguyên hệ thống ở đây là CPU nhiều nhất, nó chỉ nên được sử dụng trong trường hợp số lượng luồng của Consumer (Event Handler) nhỏ hơn số lượng lõi vật lý (physical cores) của CPU. Ví dụ CPU server của bạn có 8 lõi thì chỉ nên sử dụng dưới 7 luồng của Consumer (Event Handler) của Disruptor đang chạy.

Core Concepts

Về cơ bản ta đã sử dụng được Disruptor để gửi nhận Producer và Consumer đơn và hiểu được các Wait Strategy trong Disruptor, nhưng để sử dụng Multicast Events hoặc Consumer Dependency Graph, có nghĩa là ta có thể sử dụng nhiều Producer lẫn Consumer kết hợp với nhau, để làm được điều này ta phải hiểu và biết cách sử dụng BlockingWaitStrategy6, BlockingWaitStrategy7 , BlockingWaitStrategy8 và BlockingWaitStrategy9.

Sequence:

Lớp này được thiết kế để thao tác với các sequence (số thứ tự) của Ring-Buffer nhằm việc đảm bảo việc hoạt động tốt với môi trường Concurrency. Sequence không sử dụng cơ chế Locking để giải quyết vấn đề về Mutual Exclusion như race-condition mà sử dụng cơ chế CAS (Lock-free) để giải quyết bài toán đó, chính vì thế sequence luôn được SleepingWaitStrategy0 and SleepingWaitStrategy1 vừa chính xác lại rất nhanh.

Sequencer:

Từ phiên bản 3.0 trở đi thì đây chính là cốt lõi (core) của Disruptor, nó được thiết kế để quản lý việc gửi và nhận dữ liệu đến Ring-Buffer giữa Producer và Consumer nhanh nhất và chính xác nhất . Sequencer là một interface và ta có hai class implement nó là SleepingWaitStrategy2 (cho case Disruptor sử dụng một producer) và SleepingWaitStrategy3 (cho case Disruptor sử dụng nhiều hơn một producer).

EventProcessor:

Interface này được sử dụng để quản lý nhiều Consumer khi ta sử dụng Multicast Events, nó giúp cho việc quản lý thứ tự sử lý của từng Consumer trong mô hình Consumer Dependency Graph. Nó chỉ có duy nhất một Implementation là lớp SleepingWaitStrategy4 giúp việc lấy event từ Ring-Buffer và điều phối hợp lý event cho từng Consumer theo chiến thuật người dùng đề ra. Thực tế thì EventProcessor là một Runnable và nó có thể được executed bởi một Thread.

Sequence Barrier:

Là một rào chắn dữ liệu (memory barrier) được tạo ra từ Sequencer để đảm bảo thứ tự nhận event giữa Ring-Buffer tới các Consumer chính xác nhất. Nó luôn luôn nằm giữa và điều phối dữ liệu giữa Ring-Buffer và Consumer.

Để dễ hình dung ta có thể tham khảo mô hình hoạt động của Disruptor bên dưới.

Ở bên dưới ta có một mô hình với 2 Producer gửi dữ liệu tới Ring-Buffer, và ta có các Consumers là SleepingWaitStrategy5, SleepingWaitStrategy6 và SleepingWaitStrategy7 . Ví dụ này rất giống với các logic trong thực tế, đầu tiên khi một events tới đầu tiên ta cần ghi log thông tin event trước (SleepingWaitStrategy5) sau đó backup nó lại ở đâu đó (SleepingWaitStrategy6) và cuối cùng là thực hiện business logic của chương trình (SleepingWaitStrategy7). Tiến trình (task) ghi log và backup được thực hiện đồng thời và song song (parallel) với nhau, nhưng business logic chỉ được thực hiện sau khi hai task trên được thực hiện xong.

Quan hệ giữa 3 consumers bên trên ta gọi nó là Consumer Dependency Graph, chúng thực hiện được nhờ vai trò của BlockingWaitStrategy8 được nhắc ở bên trên.

  • JournalConsumer (C1) tiến trình thực hiện phụ thuộc vào sự điều khiển RingBuffer’s sequence.
  • ReplicationConsumer (C2) tiến trình thực hiện phụ thuộc vào sự điều khiển RingBuffer’s sequence.
  • ApplicationConsumer (C3) tiến trình thực hiện không chỉ phụ thuộc vào sự điều khiển RingBuffer’s sequence mà còn phụ thuộc vào JournalConsumer’s sequence vào ReplicationConsumer’s sequence.

Mô hình Disruptor với Consumer Dependency Graph.

Lý thuyết vậy là đủ, giờ ta hãy bắt tay vào thực hiện implement mô hình Consumer Dependency Graph bên trên với chỉ một Producer nhé.

Mô hình tổ chức code được minh họa như bên dưới:

           +-----+
+----->| EP1 |------+
| +-----+ |
| v
+----+ +-----+
| P1 | | EP3 |
+----+ +-----+
| ^
| +-----+ |
+----->| EP2 |------+
+-----+
Disruptor:
track to prevent wrap
+-------------------------------+
| |
| v
+----+ +====+ +=====+ +-----+
| P1 |--->| RB |<--------------| SB2 |<---| AC |
+----+ +====+ +=====+ +-----+
claim ^ get | waitFor
| |
+=====+ +-----+ |
| SB1 |<---| JC |<-----+
+=====+ +-----+ |
^ |
| +-----+ |
+-------| RC |<-----+
waitFor +-----+
P1 - Producer 1
RB - RingBuffer
SB1 - SequenceBarrier 1
JC - JournalConsumer
RC - ReplicationConsumer
SB2 - Sequence Barrier Conclusion
AC - ApplicationConsumer

Đầu tiên ta phải tạo một Event Object, đây chính là thông tin dữ liệu được lưu chuyển tới các Consumer để sử lý business logic. Event Object dưới chỉ chứa một giá trị có kiểu Long.

Tiếp đó là 3 Consumer là SleepingWaitStrategy5, SleepingWaitStrategy6 và SleepingWaitStrategy7 kế thừa với YieldingWaitStrategy5 và cài đặt hàm YieldingWaitStrategy6 để lấy hứng Event Object về và xử lý logic. Logic xử lý của Consumer chỉ đơn giản là in ra giá trị của Event Object và sequence của nó từ Ring-Buffer gửi về.

Journal Consumer:

Replication Consumer:

Application Consumer:

Bởi vì SleepingWaitStrategy7 là Consumer được sử lý sau cùng trong tiến trình cho nên ta thêm một biến YieldingWaitStrategy8 để tracking việc SleepingWaitStrategy7 thực sự đã hoàn thành xong chưa.

Để tiện cho việc chạy test ta sẽ tạo một Junit Test như sau, và chúng ta sẽ đi sâu vào phân tích nó tiếp sau:

Giờ hãy bắt đầu phân tích đoạn code demo bên trên. Hãy nhìn vào cách chúng ta khởi tạo RingBuffer, ta sẽ sử dụng static method được cung cấp sẵn trong class RingBuffer là BusySpinWaitStrategy0 (Disruptor chỉ có một Producer) với các đối số truyền vào là Event Factory Object (ValueEvent), buffer size (kích thước của ring buffer) và wait strategy (ở đây ta dùng YieldingWaitStrategy), nếu ta không truyền wait strategy thì RingBuffer sẽ mặc định sử dụng BlockingWaitStrategy. Ngoài ra RingBuffer cũng cung cấp cho ta một static method khác là BusySpinWaitStrategy3 để sử dụng với nhiều Producer.

Bản thân Ring-Buffer đã có một Sequence Barrier (SB1 như trên hình minh họa trên) bằng cách gọi tới hàm BusySpinWaitStrategy4, "rào chắn" này có tác dụng đảm bảo thứ tự gửi nhận tới chung cho tất các Consumers với RingBuffer, chính vì thế hàm newBarrier() không cần có tham số. Và các BlockingWaitStrategy9 của SleepingWaitStrategy5 và SleepingWaitStrategy6 phải đăng ký tới barrier rằng "chúng tôi muốn nhận event từ Ring-Buffer", bằng cách khởi tạo các BlockingWaitStrategy9 và truyền tham số như BusySpinWaitStrategy9, BlockingWaitStrategy0 và các Consumers.

SequenceBarrier ngoài việc đảm bảo thứ tự cũng như quyết định gửi tới những Consumers nào, thì nó còn có tác dụng khi Consumer muốn lấy dữ liệu tiếp theo nhưng event đó chưa có (Producer chưa kịp gửi) thì nó sẽ sử dụng các wait strategy để chờ đợi các event tiếp theo tới. Nó đảm bảo được rằng việc lấy dữ liệu từ Ring-Buffer sẽ đảm bảo được sự chính xác và tốc độ mà không cần phải "đồng bộ" hóa tiến trình này (synchronization).

OK vậy tiếp theo ta phải làm thế nào để SleepingWaitStrategy7 sẽ thực hiện chỉ khi hai Consumers trên thực hiện xong. Tương tự ta cũng tạo ra một BlockingWaitStrategy8 (SB2 như trên hình minh họa) được đặt giữa Ring-Buffer, SleepingWaitStrategy5 và SleepingWaitStrategy6. Bằng cách gọi tới hàm BusySpinWaitStrategy4 truyền vào hai tham số là sequence của hai BlockingWaitStrategy9 mới khởi tạo bên trên, việc này đảm bảo SleepingWaitStrategy7 chỉ thực hiện việc lấy event từ Ring-Buffer sau khi hai consumers Journal và Replication

Hãy để ý tới static code BlockingWaitStrategy8, hàm này có ý nghĩa rằng Ring-Buffer sẽ đảm bảo event-sequence của SleepingWaitStrategy7 nhỏ hơn hoặc bằng với event-sequence của hai consumers còn lại, điều này sẽ chắc chắn rằng SleepingWaitStrategy7 chỉ được lấy event sau khi hai consumers kia hoàn thành.

Về bản chất các BlockingWaitStrategy9 là một BlockingWaitStrategy2 có thể chạy với các Thread để cho các consumers bắt đầu "lắng nghe" và lấy event từ Ring-Buffer ta sẽ phải tạo một BlockingWaitStrategy3 để khởi tạo các Thread Pool.

Và start các thread pool để wake up các consumers

Và cuối cùng để gửi dữ liệu tới Ring-Buffer ta tạo một vòng lặp sau đó lấy ra sequence tiếp theo bằng cách sử dụng BlockingWaitStrategy4, sau đó get và set event data bằng cách sử dụng các hàm BlockingWaitStrategy5, và gửi event đi bằng hàm BlockingWaitStrategy6

Và cuối cùng ta có thể sử dụng hàm BlockingWaitStrategy7 của EventProcessor để stop các Consumer khi đảm bảo rằng các consumers đã hoàn tác các tác vụ.

Ta có thể thấy việc khởi tạo các Consumer hoàn toàn khác với các truyền thống là khởi tạo thông qua Disruptor, mà ta sử dụng các SleepingWaitStrategy4, nhờ đó mà các Consumers có thể phụ thuộc (depend) vào nhau, do đó nghiệp vụ của ứng dụng khi sử dụng Disruptor có thể đa dạng và uyển chuyển hơn rất nhiều.