Share This Post

什麼是 Message Queue ? 幹嘛用的 ?

想像一下如果你要實作一個 賣門票 的網站,今天有一個很有名的歌唱巨星 JoeLin 要透過你的網站在 8/31 號 的中午 12 點開賣 , 你預估同時會有千萬人灌爆你的網站,這時候你要如何處理這樣的情況。

6339.png_300-removebg-preview.png

訊息佇列(Message Queue),簡單來說就是處理不同程式、執行緒之間的溝通

  • 不限傳遞資料格式
  • 需要額外 message queue middleware 協助,也會被稱作 message broker 或 message bus
  • message broker 收到來自 source application 的訊息後,會轉發給 process application,而在這個方式中,source application 與 process application 通常又各自被稱為 producer 與 consumer。

實現 Message Queue 的幾種協定:

  • CoAP
  • MQTT
  • AMQP
  • XMPP
  • DDS

MQTT vs AMQP

MQTT (Message Queuing Telemetry Transport)協議的三個特點,讓它非常適合計算能力有限、網路帶寬低、信號不穩定的遠程設備,所以它成為了物聯網系統事實上的網路協議標準。

  1. 采用二進制的訊息內容編碼格式,所以二進制資料、JSON 和圖片等負載內容都可以方便傳輸
  2. 協議頭很緊湊,協議互動也簡單,保證了網路傳輸流量很小
  3. 支持 3 種 QoS(Quality of Service,服務質量)級別,便于應用根據不同的場景需求靈活選擇

AMQP(Advanced Message Queuing Protocol) 協議擁有龐大的特性集,比較重,不適合計算資源有限、對功耗要求嚴苛的物聯網設備,但是它可以滿足后臺系統對于可靠性和可擴展性的要求,因此,它在物聯網的平臺系統中應用廣泛。

小實作 – Express.js + RabbitMQ

cv9OEdT.png
1_H2tHREIqbLGT6tlU50MnTQ.png

RabbitMQ 的重要名詞與概念

  • Producer

丟訊息到 Queue 中,若有定義 Exchange,則丟給 Exchange 決定要給誰。

  • Consumer

接收來自 Queue 的訊息。

  • Queue

負責存放所需要的資料跟資料結構的 Queue 一樣,有先進先出 (FIFO) 特性,每個 Queue 都會有他的名字當 id。

  • Exchange

用來決定 Producer 給的資料要丟給哪一個 Queue,主要有這四種方式。

direct: 直接丟給指定的 Queue

topic: 類似 regular expression,設定 binding 規則,丟給符合的 Queue

headers: 透過傳送資料的 header 來特別指定所要的 Queue

fanout: 一次丟給全部負責的 Queue

  • Binding

跟 Exchange 成對搭配,主要是告訴 Exchange 他負責哪些 Queue

  • 使用 Docker 建立一個 RabbitMQ 容器
docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 
-e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root 
rabbitmq:3-management

※小地雷 15672 為管理頁面的預設 port,實作時拿來連會連不上

  • 建立一個簡易的 Express server
  1. 初步建立方式 – 點我
  2. 下一步 安裝 amqplib
npm install amqplib

※ RabbitMQ 雖然有 MQTT 的 Plugin, 不過 RabbitMQ 主打的是滿足 AMQP 的協定

  1. 建立一個簡易的 app.js 以及 messageQueue.js
//app.js
const express = require('express')
const MessageQueueService = require('./messageQueue')
const app = express()
const port = 3000

app.get('/', async(req, res) => {
    // 連結 message queue,如果沒有 帳號密碼 可以不填 [username:password@]
    const messageQueue = new MessageQueueService('amqp://username:password@localhost:5672');
    await messageQueue.connect();
    // 填入要推送的 queue name 以及資料,此外 queue name 如果不存在,Rabbitmq 會自動創建。
		messageQueue.publishToQueue("queueName" , "data");
    res.status(200).send({
        "message": "message sent successfully."
    })
})

app.listen(port, () => {
    console.log(`Example app listening on port ${port}`)
})
// messageQueue.js
const amqp = require('amqplib');

class MessageQueueService {
	constructor(CONN_URL) {
		this.connection_url = CONN_URL;
	}

	async connect() {
		const connection = await amqp.connect(this.connection_url);
		this.channel = await connection.createChannel();
	}

	async publishToQueue(queueName, data) {
		await this.channel.assertQueue(queueName);
		this.channel.sendToQueue(queueName, Buffer.from(data));
	}

	closeChannel() {
		this.channel.close();
		console.log(`Closing rabbitmq channel`);
	}
}

module.exports = MessageQueueService;
  1. 建立 worker.js 來幫我們處理 message queue 裡的資料
//worker.js
const amqp = require('amqplib');

async function connect() {
	try {
		const connection = await amqp.connect('amqp://username:password@localhost:5672');
		const channel = await connection.createChannel();
		await channel.assertQueue("queueName");

		channel.prefetch(1);

		channel.consume("queueName", async (message) => {
			console.log('Recieved job message: ', message.content.toString())
			// 確認接收並將這筆資料從 queue 移除
			channel.ack(message);
		})
	} catch (err) {
		console.log(err);
	}
}

connect();

※ prefetch 是什麼?

一個 consumer 一次可以從 RabbitMQ 中獲取多少 message 並緩存在 client 中( RabbitMQ提供的各種語言的client library )。一旦緩存區滿了,RabbitMQ 將會停止投遞新的 message 到該consumer 中直到它發出 ack。

訂閱研究文章

Get updates and learn from the best

More To Explore

Scroll to Top

hurry up !

軟體工程師培訓

限時免費報名中

藉由與「真實世界軟體專案」相同的技術、工具與開發流程,化簡成與商業機密無關、門檻較低更容易上手的「模擬專案」,讓你有機會在職場前輩的陪伴下,完成真槍實彈的練習,動手解決真實的問題,快速累積個人的經驗與作品,而不只是「學習技術」而已。