Laravel Queue + RabbitMQ 實作經驗
近一兩年使用 Laravel Queue 搭配 RabbitMQ 來做非同步或排程任務處理的解決方案,但過程中遇到各種奇怪的問題,小則是單純的連線錯誤,重連就能解決,大則是會一直無法正常消化 Queue,造成整個 Queue 塞車並影響整個服務運作。
最近認真的去研究 RabbitMQ 連線的機制,並尋找問題可能的解法,在最近一次解法實驗上線時,發現了新的問題,至少總算是有初步了解問題核心了,這篇文章是來記錄這些經驗。
前情提要
Laravel 最大的問題就是效能不好,所以官方推出了新套件 Octane,它能夠讓 Laravel 開發者可以幾乎無痛升速。但這個無痛是有代價的,主要是因為 Octane 對程序的生命週期的處理方法,與傳統 Apache + PHP 的方式完全不同,雖然 Octane 提高了效能,但同時也帶來了很多問題。
官方有提供一份文件說明使用 Octane 基本的問題為何,最主要的原因是 process 會重複使用,因此當記憶體管理處理不當的時候,就有可能會出現 Memory Leak;或者依賴注入沒有寫好,也會造成不當的重複使用,之前我也曾經遇到過。
上面提到的兩個問題是 process 本身處理的問題,而今天要討論的問題網路連線,也就是當同個 process 透過網路連接其他服務所會遇到的問題。當程式要使用網路[^1],會有幾個生命週期:
- 連線
- 傳輸資料
- 關閉連線
執行一個網路傳資料任務時,若能一次把三件事做完就結束任務的話,例如:呼叫 HTTP API,當 HTTP response 完成之後就會關閉連線,這類的情境通常只會有連線逾時(connect timeout 或 read timeout)的問題。但因為 TCP 每次的連線都會需要經過交握,在網路延遲比較嚴重的場景下,很容易成為效能瓶頸,像 AWS 服務如果放在 US 的話,台灣的服務連線過去就會有極大的延遲。這個情境的解法之一,即是使用長連線。長連線的生命週期會是先連線,接著交由程式管理或使用,直到程式結束時才關閉連線。
長連線並不是萬靈丹,它所帶來的問題是:不管是 Server 還是 Client,程式都必須要額外做連線管理,這也是接下來要面對的主要問題。
[^1]: 這裡先專注在 TCP 連線,UDP 連線我還不確定會不會有這個問題。
實務上會遇到的問題
我真正遇到的問題如下:
Laravel 在執行 queue:work
指令時,如果有個任務執行非常久,這時執行完要回應 RabbitMQ ack 時,就會遇到連線錯誤。
這是 Client-Server 架構(PHP 是 client,RabbitMQ 是 server)要處理的問題,因為 server 建立 connection 都會有基本開銷,若建立了沒有管理,就會造成資源浪費,最終服務會因為資源不夠而無法運作。在這個情境的狀況是,server 檢查連線是否可用,若不可用的就會主動關閉連線,若 client 誤以為連線還存在並使用的話,就會發生連線錯誤。
通常長連線在確認連線可不可用時,會使用心跳機制(heartbeat)。當然 RabbitMQ 也有提供類似的解決方案,它的做法是要由 client 主動發送心跳訊號。但問題在於:PHP 是單執行緒 blocking 的語言,因此在執行某個任務的時候,PHP 沒有辦法多工去發送心跳訊息,只要任務跑太久,連線會被 RabbitMQ 關閉,接著 PHP 就會發生連線錯誤。
RabbitMQ 的心跳機制簡介
我對 RabbitMQ 不熟,下面的內容是我在研究這個問題時發現的現象。
假設啟動 RabbitMQ 的 Docker Compose 範例如下:
services: |
rabbitmq.conf
有個設定就叫 heartbeat
,它在設定預設的 heartbeat 時間,預設是 60:
# rabbitmq.conf |
這個設定的意義是,它期望 client 每 60 秒發送一次心跳訊號。但 client 有可能還是會有時間誤差或其他問題,導致無法在 60 秒內正常發送,因此會有容錯空間,這個容錯空間是 heartbeat
設定的兩倍。因此預設 RabbitMQ 在 120 秒內沒收到 heartbeat 的話,會主動關閉連線。
而 PHP 實作主要是用 Composer 套件 php-amqplib/php-amqplib
,裡面有個 AbstractConnection
類別,在建構連線物件的時候,可以設定 $heartbeat
的參數。這個參數的用意是,跟 RabbitMQ 說好這次的連線會多久送一次心跳訊號,但這個設定如果超過 RabbitMQ 的設定值的話,會以 RabbitMQ 的設定值為主。
從上述所說的結論就是:
在預設的狀況下,當 Laravel Queue 的 Job 執行超過 120 秒的話,必定會發生連線錯誤。
php-amqplib 提供的解決方案
連線是很基本的問題,套件也有提供對應的解法:定期發送心跳訊號。實作是放在 php-amqplib 套件的 Connection/Heartbeat
目錄裡。
裡面有兩種個實作:
PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender
PhpAmqpLib\Connection\Heartbeat\SIGHeartbeatSender
兩個實作方法不大一樣,因為不同的實作適用在不同的場景,因此下面會先說明實作,理解如何應用,接著再了解要面對的問題是什麼。
PCNTLHeartbeatSender
實作方法
這個實作使用了 PHP 的 pcntl
的 extension。
首先,會先使用 pcntl_signal()
註冊 SIGALRM
信號,以及當收到這個信號的時候,要執行的 callback function。callback function 是處理發送 heartbeat 的邏輯。
接著,使用 pcntl_alarm()
設定定時器,這個定時器會在指定的時間後發送 SIGALRM
信號,同時再設定下一個定時器。
SIGHeartbeatSender
實作方法
這個實作也使用了 PHP 的 pcntl
的 extension,但做法不大一樣。
首先,會先使用 pcntl_signal()
註冊另一種信號 SIGUSR1
,以及一樣會有當收到這個信號的時候,要執行的 callback function。callback function 是處理發送 heartbeat 的邏輯。
接著,使用 pcntl_fork()
讓 PHP 產生另一個子程序,然後在子程序裡執行一個無窮迴圈。在迴圈裡面會不斷的 sleep 以及發送 SIGUSR1
信號給主程序,讓主程序可以在指定的間隔時間觸發 callback function。
這個做法有個特殊的問題:因為會產生一個子程序,當主程序結束而子程序沒正常結束的時候,主程序一樣會無法正常結束,下面是一個簡單的範例:
$pid = pcntl_fork(); |
結果如下:
主程序結束 |
當看到主程序結束後,要等 5 秒才會看到子程序結束的訊息,這是因為主程序結束時,子程序還在執行,因此主程序無法正常結束。因為 SIGHeartbeatSender 的子程序實作是無窮迴圈,如果在這個狀況下結束主程序的話,子程序會無法結束,主程序會如同上面的實驗無法結束,最終浪費記憶體和 CPU 資源,嚴重的話會造成系統直接崩潰。
共同的問題
上述的兩個做法在實驗的過程中,發現有兩個共通的問題:
- 一個信號只能註冊一個 callback function,因此如果有多個 callback function 要註冊相同的信號時,會發生互相覆蓋的狀況,因此無法採用這個方法。
- 還是一樣的問題,因為 PHP 是單執行緒 blocking 的語言,所以當某個指令執行過久,如
sleep()
或是fread()
讀取大檔等,在觸發 callback 的時候還是會被阻擋,需要等前一動完成的時候才會觸發。
與 Laravel 整合
到目前為止,只有說明到 php-amqplib
套件本身的解法和問題,接下來要開始說明與 Laravel 整合之後會有的問題。
Laravel Queue 之前有寫過相關的文章可以參考。它在設計使用的情境是:Laravel 可以用同一包程式和設定,同時處理推 queue 與收 queue。因為這兩個任務都要使用網路連線,因此就會需要考慮上面提到的所有問題。
通常在使用 Laravel Queue 的時候,會是 Web server 推 queue,然後另外起一個 process 執行 queue:work
來收 queue。以下會用這個場㬌來說明。
以下先暫時忽略多個 RabbitMQ 連線所要面對「一個信號只能註冊一個 callback function」的問題。
在說明各別實作細節之前,先說明如何整合 php-amqplib
和 Laravel Queue。因為 Laravel Queue 原生並不支援 RabbitMQ,因此這段實作必須自行實作,目前有第三方套件 vladimir-yuldashev/laravel-queue-rabbitmq
可以直接安裝即可使用。但它有三個機制並沒有實作:
- Heartbeat 機制(即上面提到的
HeartbeatSender
) - Ack
- Reconnect
因此預設的情況下,如果收 queue 超過 120 秒的話,因為它不會發 Ack 訊號,所以不會有連線錯誤;相對的,因為它沒實作 Heartbeat 機制,所以如果兩次推 queue 的時間間隔超過 120 秒的話,就會發生連線錯誤,而且因為沒有 reconnect 機制,因此會直接丟例外。
今天要討論的只是 Heartbeat 機制,因此先假設已經有實作了 Ack 和 Reconnect 的機制。php-amqplib
所提供的 Heartbeat 機制必須要先建立連線,才能夠註冊 HeartbeatSender。可以參考 vladimir-yuldashev/laravel-queue-rabbitmq
的實作,它是向 Laravel Queue 註冊新的 queue 連接器(connector)叫 rabbitmq
,然後就能透過修改 queue 的設定來改使用 RabbitMQ 的實作。
QueueManager::extend()
的設定概念,可以參考我之前寫的 Laravel Manager 說明 裡的「擴充實體生成方法」段落。
考慮能注入點,會是實作新的連接器,可以複製 vladimir-yuldashev/laravel-queue-rabbitmq
來調整:
// RabbitMQWithHeartbeatConnector.php |
接著在 Provider 的 boot 階段註冊新的連接器:
Queue::addConnector('rabbitmq_heartbeat', function () { |
queue.php 的 driver 設定改使用 rabbitmq_heartbeat
即可套用上面的 Heartbeat 機制:
推 queue
前面有提到,Octane 會重複使用相同 process,因此建立長連線的時候需要做 php-amqplib
所提供的連線管理。而 Octane 有一個機制是讓 request 處理到某個數量後,會自動把 process 結束後再重新開新的。
這個機制在使用 PCNTLHeartbeatSender
的時候,比較沒什麼問題。但如果使用 SIGHeartbeatSender
的話,因為會產生子程序,在 Octane 要結束主程序時,因為子程序無法關閉,造成主程序也無法正常關閉,最終 Octane 會因為程序無法正常結束而造成系統崩潰。
解法是,在 Octane Process 結束的時候,會發出 Laravel\Octane\Events\WorkerStopping
的事件,因此可以註冊這個事件的監聽器(Listener)。當 Process 結束時,由監聽器關閉 RabbitMQ 的連線,與呼叫 SIGHeartbeatSender::unregister()
來解除子程序綁定,讓子程序可以正常結束。
$connection = ConnectionFactory::make($config); |
收 queue
Laravel 收 queue 是透過 queue:work
指令來執行的。這個指令會啟動一個 process 來持續監聽 RabbitMQ 的 queue。
與推 queue 相反,這個機制在使用 SIGHeartbeatSender
的時候沒有問題,但在使用 PCNTLHeartbeatSender
的時候,會遇到一個問題是:因為 SIGALRM
在 queue:work
指令裡已經有註冊過了,因此會有前後覆蓋 callback function 的問題。
PCNTLHeartbeatSender 的問題沒有解法,因為 Laravel queue:qork
的機制是先建連線之後再註冊 SIGALRM
的,因此必定會被覆蓋。
// 連線並取得 Job |
通常在收 queue 都是要執行真正的任務,因此 SIGHeartbeatSender 基本上就是個合適的解法了。
但有少部分的情境是在執行任務後,還需要再推一次 queue,如果在推 queue 的時候又使用新的連線,那就會遇到前面提到的「一個信號只能註冊一個 callback function」的問題,這個應用的時候得再多注意。
結論
Laravel 是用同一段連線的程式碼,同時應用在推 queue 跟收 queue,因此理論上有 workaround 解法的 SIGHeartbeatSender
會比較適用。但實際上還有另一個問題還沒解決的是,因為一個信號只能註冊一個 callback function,所以當有多個 RabbitMQ 的時候,就必須要有另一個統一管理連線的機制來處理連線問題。
因此最穩的做法最終還是:處理完推 queue 就關連線,或是實作 reconnect 機制,而 Laravel Octane 就不要使用 heartbeat 機制了。
if (getenv('LARAVEL_OCTANE') !== '1') { |
感謝 H 同事協助我驗證出這個結論。
附註
版本資訊參考:
- Laravel Framework - ^11(使用 10 的程式架構,但 Framework 是 11,這個差異應該還好)
- php-amqplib/php-amqplib - ^3.7
- vladimir-yuldashev/laravel-queue-rabbitmq - ^14.1