主管說想降低 AWS SQS 的依賴,叫我試試 Rabbit Mq 看看,紀錄一下結果以後可用。
環境 NET8,測試時用 docker,image 用 rabbitmq:4.0.5-management。

能處理一般的業務狀況即可,可短時間高併發(10 幾秒內), 持續高併發會有問題(qps 1000 以上)不建議使用,要改分布式。

我是習慣用依賴注入,把 RabbitMq 都寫在一個 class 裡(只負責送資料,不負責業務邏輯), program 要記得註冊依賴注入。

文件建議 rabbitMq 的 connection 一個 server 一個就好,因此用 singleton 注入每次拿到同一個 connection。

底下 Code 是預設 rabbitMq 會掛載硬碟做持久化,這樣 rabbitMq 有問題重啟時,queue 上的資料都會留著。

nuget安裝 RabbitMQ.Client

Producer Code (推訊息到 Queue)

/// <summary>
///  注入時用singleton, IRabbitMq是自己寫的介面 (可用可不用
/// </summary>
public class RabbitMq : IRabbitMq
{
    private readonly IConnection connection;  // 建議一個process一個connection

    public RabbitMq()
    {
        var factory = new ConnectionFactory { HostName = "localhost", UserName = "admin", Password = "admin" };
        connection = factory.CreateConnectionAsync().Result;  //只有async方法, .result雖會block thread但singleton只會跑一次, 當作workaround沒差
    }

    public async Task PushToHelloQueue()
    {
        // 預設一個connection最多2047個channel, qps建議在300內,
        // 測試持續高併發channel會被用完, 會突然掛掉送不出去, 但短暫的高併發(qps1000)撐得住
        using var channelA = await connection.CreateChannelAsync();

        var arguments = new Dictionary<string, object>
            {
                { "x-max-length", 10000 },  // queue 最多放X個
                { "x-overflow", "reject-publish" }  //reject-publish:當queue滿時會拒絕新加入的資料
            };

        await channelA.QueueDeclareAsync(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: arguments);

        var message = JsonSerializer.Serialize(new
        {
            Msg = "hello",
            Date = DateTime.Now,
        });

        var body = Encoding.UTF8.GetBytes(message);

        var properties = new BasicProperties
        {
            DeliveryMode = DeliveryModes.Persistent,
        };

        await channelA.BasicPublishAsync(exchange: string.Empty, routingKey: "hello", body: body, basicProperties: properties, mandatory: true);
    }

Consumer Code (從 Queue 取資料,用 Console 寫的)

internal class Program
{
    private static async Task Main(string[] args)
    {
        var factory = new ConnectionFactory { HostName = "localhost", UserName = "admin", Password = "admin" };
        using var connection = await factory.CreateConnectionAsync();
        using var channel = await connection.CreateChannelAsync();

        var arguments = new Dictionary<string, object>
            {
                { "x-max-length", 10000 },
                { "x-overflow", "reject-publish" }  //reject-publish:當queue滿時會拒絕新加入的msg
            };

        await channel.QueueDeclareAsync(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: arguments);

        Console.WriteLine(" [*] Waiting for messages.");

        var consumer = new AsyncEventingBasicConsumer(channel);
        consumer.ReceivedAsync += (model, ea) =>
        {
            try
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine($" [x] Received {message}");
                return Task.CompletedTask;
            }
            catch (Exception ex)
            {
                Console.WriteLine($" err {ex.Message}");
                return Task.CompletedTask;
            }
        };


        await channel.BasicConsumeAsync("hello", autoAck: true, consumer: consumer);

        Console.WriteLine(" [*] Waiting for messages. Press Ctrl+C to exit.");
        await Task.Delay(-1); // 讓process一直跑
    }
}

一些備註

有測試 2 種情況, channel 每次都建立新的再釋放還是每個 push 的 function 各用一個。
用 jmeter 壓測送 5000 筆資料,每次建新的大概 4 秒就跑完,共用一個 channel 要 20 幾秒以上。
每次建新的 channel 會多消耗記憶體,但多個 50Mb 而已。而用共用一個 channel 記憶體幾乎不會增長,但效能太差就沒選了。

Rabbitmq 有很靈活設定可以改,但這次是要簡單使用,盡量類似 aws sqs 那樣簡單。 中間不用 exchanger 去處理要推向哪個 queue,寫完上面 2 段就能直接跑。

用 jmeter 長時間高併發壓測好像是因為把 channel 用完,導致 rabbitMq 卡住。沒辦法處理新來的資料而且會整個停住。但我沒有細查下去。

Ref

Best Practice 建議 1
Best Practice 建議 2
13 Common RabbitMQ Mistakes and How to Avoid Them