主管說想降低 AWS SQS 的依賴,叫我試試 Rabbit Mq 看看,紀錄一下結果以後可用。
環境 NET8,測試時用 docker,image 用 rabbitmq:4.0.5-management。
能處理一般的業務狀況即可,可短時間高併發(10 幾秒內), 持續高併發會有問題(qps 1000 以上)不建議使用,要改分布式。
我是習慣用依賴注入,把 RabbitMq 都寫在一個 class 裡(只負責送資料,不負責業務邏輯), program 要記得註冊依賴注入。
文件建議 rabbitMq 的 connection 一個 server 一個就好,因此用 singleton 注入每次拿到同一個 connection。
底下 Code 是預設 rabbitMq 會掛載硬碟做持久化,這樣 rabbitMq 有問題重啟時,queue 上的資料都會留著。
nuget安裝 RabbitMQ.Client
Producer Code (推訊息到 Queue)
/// <summary>
/// 注入時用singleton, IRabbitMq是自己寫的介面 (可用可不用
/// </summary>
public class RabbitMq : IRabbitMq
{
private readonly IConnection connection; // 建議一個process一個connection
public RabbitMq()
{
var factory = new ConnectionFactory { HostName = "localhost", UserName = "admin", Password = "admin" };
connection = factory.CreateConnectionAsync().Result; //只有async方法, .result雖會block thread但singleton只會跑一次, 當作workaround沒差
}
public async Task PushToHelloQueue()
{
// 預設一個connection最多2047個channel, qps建議在300內,
// 測試持續高併發channel會被用完, 會突然掛掉送不出去, 但短暫的高併發(qps1000)撐得住
using var channelA = await connection.CreateChannelAsync();
var arguments = new Dictionary<string, object>
{
{ "x-max-length", 10000 }, // queue 最多放X個
{ "x-overflow", "reject-publish" } //reject-publish:當queue滿時會拒絕新加入的資料
};
await channelA.QueueDeclareAsync(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: arguments);
var message = JsonSerializer.Serialize(new
{
Msg = "hello",
Date = DateTime.Now,
});
var body = Encoding.UTF8.GetBytes(message);
var properties = new BasicProperties
{
DeliveryMode = DeliveryModes.Persistent,
};
await channelA.BasicPublishAsync(exchange: string.Empty, routingKey: "hello", body: body, basicProperties: properties, mandatory: true);
}
Consumer Code (從 Queue 取資料,用 Console 寫的)
internal class Program
{
private static async Task Main(string[] args)
{
var factory = new ConnectionFactory { HostName = "localhost", UserName = "admin", Password = "admin" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
var arguments = new Dictionary<string, object>
{
{ "x-max-length", 10000 },
{ "x-overflow", "reject-publish" } //reject-publish:當queue滿時會拒絕新加入的msg
};
await channel.QueueDeclareAsync(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: arguments);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
return Task.CompletedTask;
}
catch (Exception ex)
{
Console.WriteLine($" err {ex.Message}");
return Task.CompletedTask;
}
};
await channel.BasicConsumeAsync("hello", autoAck: true, consumer: consumer);
Console.WriteLine(" [*] Waiting for messages. Press Ctrl+C to exit.");
await Task.Delay(-1); // 讓process一直跑
}
}
一些備註
有測試 2 種情況, channel 每次都建立新的再釋放還是每個 push 的 function 各用一個。
用 jmeter 壓測送 5000 筆資料,每次建新的大概 4 秒就跑完,共用一個 channel 要 20 幾秒以上。
每次建新的 channel 會多消耗記憶體,但多個 50Mb 而已。而用共用一個 channel 記憶體幾乎不會增長,但效能太差就沒選了。
Rabbitmq 有很靈活設定可以改,但這次是要簡單使用,盡量類似 aws sqs 那樣簡單。 中間不用 exchanger 去處理要推向哪個 queue,寫完上面 2 段就能直接跑。
用 jmeter 長時間高併發壓測好像是因為把 channel 用完,導致 rabbitMq 卡住。沒辦法處理新來的資料而且會整個停住。但我沒有細查下去。
Ref
Best Practice 建議 1
Best Practice 建議 2
13 Common RabbitMQ Mistakes and How to Avoid Them