什麼是 Message Queue ? 幹嘛用的 ?
想像一下如果你要實作一個 賣門票 的網站,今天有一個很有名的歌唱巨星 JoeLin 要透過你的網站在 8/31 號 的中午 12 點開賣 , 你預估同時會有千萬人灌爆你的網站,這時候你要如何處理這樣的情況。

訊息佇列(Message Queue),簡單來說就是處理不同程式、執行緒之間的溝通
- 不限傳遞資料格式
- 需要額外 message queue middleware 協助,也會被稱作 message broker 或 message bus
- message broker 收到來自 source application 的訊息後,會轉發給 process application,而在這個方式中,source application 與 process application 通常又各自被稱為 producer 與 consumer。
實現 Message Queue 的幾種協定:
- CoAP
- MQTT
- AMQP
- XMPP
- DDS
MQTT vs AMQP
MQTT (Message Queuing Telemetry Transport)協議的三個特點,讓它非常適合計算能力有限、網路帶寬低、信號不穩定的遠程設備,所以它成為了物聯網系統事實上的網路協議標準。
- 采用二進制的訊息內容編碼格式,所以二進制資料、JSON 和圖片等負載內容都可以方便傳輸
- 協議頭很緊湊,協議互動也簡單,保證了網路傳輸流量很小
- 支持 3 種 QoS(Quality of Service,服務質量)級別,便于應用根據不同的場景需求靈活選擇
AMQP(Advanced Message Queuing Protocol) 協議擁有龐大的特性集,比較重,不適合計算資源有限、對功耗要求嚴苛的物聯網設備,但是它可以滿足后臺系統對于可靠性和可擴展性的要求,因此,它在物聯網的平臺系統中應用廣泛。
小實作 – Express.js + RabbitMQ


RabbitMQ 的重要名詞與概念
- Producer
丟訊息到 Queue 中,若有定義 Exchange,則丟給 Exchange 決定要給誰。
- Consumer
接收來自 Queue 的訊息。
- Queue
負責存放所需要的資料跟資料結構的 Queue 一樣,有先進先出 (FIFO) 特性,每個 Queue 都會有他的名字當 id。
- Exchange
用來決定 Producer 給的資料要丟給哪一個 Queue,主要有這四種方式。
direct: 直接丟給指定的 Queue
topic: 類似 regular expression,設定 binding 規則,丟給符合的 Queue
headers: 透過傳送資料的 header 來特別指定所要的 Queue
fanout: 一次丟給全部負責的 Queue
- Binding
跟 Exchange 成對搭配,主要是告訴 Exchange 他負責哪些 Queue
- 使用 Docker 建立一個 RabbitMQ 容器
docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672
-e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root
rabbitmq:3-management
※小地雷 15672 為管理頁面的預設 port,實作時拿來連會連不上
- 建立一個簡易的 Express server
- 初步建立方式 – 點我
- 下一步 安裝
amqplib
npm install amqplib
※ RabbitMQ 雖然有 MQTT 的 Plugin, 不過 RabbitMQ 主打的是滿足 AMQP 的協定
- 建立一個簡易的 app.js 以及 messageQueue.js
//app.js
const express = require('express')
const MessageQueueService = require('./messageQueue')
const app = express()
const port = 3000
app.get('/', async(req, res) => {
// 連結 message queue,如果沒有 帳號密碼 可以不填 [username:password@]
const messageQueue = new MessageQueueService('amqp://username:password@localhost:5672');
await messageQueue.connect();
// 填入要推送的 queue name 以及資料,此外 queue name 如果不存在,Rabbitmq 會自動創建。
messageQueue.publishToQueue("queueName" , "data");
res.status(200).send({
"message": "message sent successfully."
})
})
app.listen(port, () => {
console.log(`Example app listening on port ${port}`)
})
// messageQueue.js
const amqp = require('amqplib');
class MessageQueueService {
constructor(CONN_URL) {
this.connection_url = CONN_URL;
}
async connect() {
const connection = await amqp.connect(this.connection_url);
this.channel = await connection.createChannel();
}
async publishToQueue(queueName, data) {
await this.channel.assertQueue(queueName);
this.channel.sendToQueue(queueName, Buffer.from(data));
}
closeChannel() {
this.channel.close();
console.log(`Closing rabbitmq channel`);
}
}
module.exports = MessageQueueService;
- 建立 worker.js 來幫我們處理 message queue 裡的資料
//worker.js
const amqp = require('amqplib');
async function connect() {
try {
const connection = await amqp.connect('amqp://username:password@localhost:5672');
const channel = await connection.createChannel();
await channel.assertQueue("queueName");
channel.prefetch(1);
channel.consume("queueName", async (message) => {
console.log('Recieved job message: ', message.content.toString())
// 確認接收並將這筆資料從 queue 移除
channel.ack(message);
})
} catch (err) {
console.log(err);
}
}
connect();
※ prefetch 是什麼?
一個 consumer 一次可以從 RabbitMQ 中獲取多少 message 並緩存在 client 中( RabbitMQ提供的各種語言的client library )。一旦緩存區滿了,RabbitMQ 將會停止投遞新的 message 到該consumer 中直到它發出 ack。