主管說想降低 AWS SQS 的依賴,叫我試試 Rabbit Mq 看看,紀錄一下結果以後可用。
環境 NET8,測試時用 docker,image 用 rabbitmq:4.0.5-management。
能處理一般的業務狀況即可,可短時間高併發(10 幾秒內), 持續高併發會有問題(qps 1000 以上)不建議使用,要改分布式。
我是習慣用依賴注入,把 RabbitMq 都寫在一個 class 裡(只負責送資料,不負責業務邏輯), program 要記得註冊依賴注入。
文件建議 rabbitMq 的 connection 一個 server 一個就好,因此用 singleton 注入每次拿到同一個 connection。
底下 Code 是預設 rabbitMq 會掛載硬碟做持久化,這樣 rabbitMq 有問題重啟時,queue 上的資料都會留著。
nuget安裝 RabbitMQ.Client Producer Code (推訊息到 Queue) /// <summary> /// 注入時用singleton, IRabbitMq是自己寫的介面 (可用可不用 /// </summary> public class RabbitMq : IRabbitMq { private readonly IConnection connection; // 建議一個process一個connection public RabbitMq() { var factory = new ConnectionFactory { HostName = "localhost", UserName = "admin", Password = "admin" }; connection = factory.CreateConnectionAsync().Result; //只有async方法, .result雖會block thread但singleton只會跑一次, 當作workaround沒差 } public async Task PushToHelloQueue() { // 預設一個connection最多2047個channel, qps建議在300內, // 測試持續高併發channel會被用完, 會突然掛掉送不出去, 但短暫的高併發(qps1000)撐得住 using var channelA = await connection.CreateChannelAsync(); var arguments = new Dictionary<string, object> { { "x-max-length", 10000 }, // queue 最多放X個 { "x-overflow", "reject-publish" } //reject-publish:當queue滿時會拒絕新加入的資料 }; await channelA.QueueDeclareAsync(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: arguments); var message = JsonSerializer.Serialize(new { Msg = "hello", Date = DateTime.Now, }); var body = Encoding.UTF8.GetBytes(message); var properties = new BasicProperties { DeliveryMode = DeliveryModes.Persistent, }; await channelA.BasicPublishAsync(exchange: string.Empty, routingKey: "hello", body: body, basicProperties: properties, mandatory: true); } Consumer Code (從 Queue 取資料,用 Console 寫的) internal class Program { private static async Task Main(string[] args) { var factory = new ConnectionFactory { HostName = "localhost", UserName = "admin", Password = "admin" }; using var connection = await factory.CreateConnectionAsync(); using var channel = await connection.CreateChannelAsync(); var arguments = new Dictionary<string, object> { { "x-max-length", 10000 }, { "x-overflow", "reject-publish" } //reject-publish:當queue滿時會拒絕新加入的msg }; await channel.QueueDeclareAsync(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: arguments); Console.WriteLine(" [*] Waiting for messages."); var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += (model, ea) => { try { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] Received {message}"); return Task.CompletedTask; } catch (Exception ex) { Console.WriteLine($" err {ex.Message}"); return Task.CompletedTask; } }; await channel.BasicConsumeAsync("hello", autoAck: true, consumer: consumer); Console.WriteLine(" [*] Waiting for messages. Press Ctrl+C to exit."); await Task.Delay(-1); // 讓process一直跑 } } 一些備註 有測試 2 種情況, channel 每次都建立新的再釋放還是每個 push 的 function 各用一個。
用 jmeter 壓測送 5000 筆資料,每次建新的大概 4 秒就跑完,共用一個 channel 要 20 幾秒以上。
每次建新的 channel 會多消耗記憶體,但多個 50Mb 而已。而用共用一個 channel 記憶體幾乎不會增長,但效能太差就沒選了。
...