James Stillのブログ記事に基づいて、RabbitMQサブスクライバーを備えたASP.NET Core MVC/WebApiサイトを作成しました。RabbitMQ を使用した実際の PubSub メッセージング。
彼の記事では、静的クラスを使用してキュー サブスクライバーを開始し、キューに入れられたイベントのイベント ハンドラーを定義しています。この静的メソッドは、静的ファクトリ クラスを介してイベント ハンドラー クラスをインスタンス化します。
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace NST.Web.MessageProcessing
{
public static class MessageListener
{
private static IConnection _connection;
private static IModel _channel;
public static void Start(string hostName, string userName, string password, int port)
{
var factory = new ConnectionFactory
{
HostName = hostName,
Port = port,
UserName = userName,
Password = password,
VirtualHost = "/",
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(15)
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "myExchange", type: "direct", durable: true);
var queueName = "myQueue";
QueueDeclareOk ok = _channel.QueueDeclare(queueName, true, false, false, null);
_channel.QueueBind(queue: queueName, exchange: "myExchange", routingKey: "myRoutingKey");
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += ConsumerOnReceived;
_channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
}
public static void Stop()
{
_channel.Close(200, "Goodbye");
_connection.Close();
}
private static void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea)
{
// get the details from the event
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var messageType = "endpoint"; // hardcoding the message type while we dev...
// instantiate the appropriate handler based on the message type
IMessageProcessor processor = MessageHandlerFactory.Create(messageType);
processor.Process(message);
// Ack the event on the queue
IBasicConsumer consumer = (IBasicConsumer)sender;
consumer.Model.BasicAck(ea.DeliveryTag, false);
}
}
}
コンソールに書き込むだけでなく、メッセージ プロセッサ ファクトリでサービスを解決する必要がある時点までは、うまく機能します。
using NST.Web.Services;
using System;
namespace NST.Web.MessageProcessing
{
public static class MessageHandlerFactory
{
public static IMessageProcessor Create(string messageType)
{
switch (messageType.ToLower())
{
case "ipset":
// need to resolve IIpSetService here...
IIpSetService ipService = ???????
return new IpSetMessageProcessor(ipService);
case "endpoint":
// need to resolve IEndpointService here...
IEndpointService epService = ???????
// create new message processor
return new EndpointMessageProcessor(epService);
default:
throw new Exception("Unknown message type");
}
}
}
}
ASP.NET Core IoC コンテナーにアクセスして依存関係を解決する方法はありますか? 依存関係のスタック全体を手動でスピンアップするのは本当にやりたくないです :(
あるいは、ASP.NET CoreアプリケーションからRabbitMQをサブスクライブするより良い方法はありますか?レストバスしかし、Core 1.x には更新されていません
ベストアンサー1
静的クラスを回避し、依存性注入を次のように組み合わせて使用できます。
- の用法
IApplicationLifetime
アプリケーションの起動/停止ごとにリスナーを起動/停止します。 IServiceProvider
メッセージ プロセッサのインスタンスを作成するためにを使用します。
まず最初に、appsettings.json から設定できる独自のクラスに構成を移動しましょう。
public class RabbitOptions
{
public string HostName { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
public int Port { get; set; }
}
// In appsettings.json:
{
"Rabbit": {
"hostName": "192.168.99.100",
"username": "guest",
"password": "guest",
"port": 5672
}
}
次に、依存関係としてMessageHandlerFactory
を受け取る非静的クラスに変換しますIServiceProvider
。サービス プロバイダーを使用して、メッセージ プロセッサ インスタンスを解決します。
public class MessageHandlerFactory
{
private readonly IServiceProvider services;
public MessageHandlerFactory(IServiceProvider services)
{
this.services = services;
}
public IMessageProcessor Create(string messageType)
{
switch (messageType.ToLower())
{
case "ipset":
return services.GetService<IpSetMessageProcessor>();
case "endpoint":
return services.GetService<EndpointMessageProcessor>();
default:
throw new Exception("Unknown message type");
}
}
}
この方法では、メッセージ プロセッサ クラスは、必要な依存関係をコンストラクターで受け取ることができます ( で構成している場合Startup.ConfigureServices
)。たとえば、サンプル プロセッサの 1 つに ILogger を挿入しています。
public class IpSetMessageProcessor : IMessageProcessor
{
private ILogger<IpSetMessageProcessor> logger;
public IpSetMessageProcessor(ILogger<IpSetMessageProcessor> logger)
{
this.logger = logger;
}
public void Process(string message)
{
logger.LogInformation("Received message: {0}", message);
}
}
次に、およびMessageListener
に依存する非静的クラスに変換します。これは元のクラスと非常に似ていますが、 Start メソッドのパラメーターをオプションの依存関係に置き換えただけで、ハンドラー ファクトリは静的クラスではなく依存関係になりました。IOptions<RabbitOptions>
MessageHandlerFactory
public class MessageListener
{
private readonly RabbitOptions opts;
private readonly MessageHandlerFactory handlerFactory;
private IConnection _connection;
private IModel _channel;
public MessageListener(IOptions<RabbitOptions> opts, MessageHandlerFactory handlerFactory)
{
this.opts = opts.Value;
this.handlerFactory = handlerFactory;
}
public void Start()
{
var factory = new ConnectionFactory
{
HostName = opts.HostName,
Port = opts.Port,
UserName = opts.UserName,
Password = opts.Password,
VirtualHost = "/",
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(15)
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "myExchange", type: "direct", durable: true);
var queueName = "myQueue";
QueueDeclareOk ok = _channel.QueueDeclare(queueName, true, false, false, null);
_channel.QueueBind(queue: queueName, exchange: "myExchange", routingKey: "myRoutingKey");
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += ConsumerOnReceived;
_channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
}
public void Stop()
{
_channel.Close(200, "Goodbye");
_connection.Close();
}
private void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea)
{
// get the details from the event
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var messageType = "endpoint"; // hardcoding the message type while we dev...
//var messageType = Encoding.UTF8.GetString(ea.BasicProperties.Headers["message-type"] as byte[]);
// instantiate the appropriate handler based on the message type
IMessageProcessor processor = handlerFactory.Create(messageType);
processor.Process(message);
// Ack the event on the queue
IBasicConsumer consumer = (IBasicConsumer)sender;
consumer.Model.BasicAck(ea.DeliveryTag, false);
}
}
もうすぐ完了です。サービスとオプションを認識できるようにメソッドを更新する必要がありますStartup.ConfigureServices
(必要に応じて、リスナーとハンドラー ファクトリのインターフェイスを作成できます)。
public void ConfigureServices(IServiceCollection services)
{
// ...
// Add RabbitMQ services
services.Configure<RabbitOptions>(Configuration.GetSection("rabbit"));
services.AddTransient<MessageListener>();
services.AddTransient<MessageHandlerFactory>();
services.AddTransient<IpSetMessageProcessor>();
services.AddTransient<EndpointMessageProcessor>();
}
最後に、Startup.Configure
メソッドを更新して、追加のパラメータを受け取り、 /イベントIApplicationLifetime
でメッセージリスナーを開始/停止します(ただし、しばらく前にIISExpressを使用したApplicationStoppingイベントでいくつかの問題があることに気付きました。ApplicationStarted
ApplicationStopped
この質問):
public MessageListener MessageListener { get; private set; }
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IApplicationLifetime appLifetime)
{
appLifetime.ApplicationStarted.Register(() =>
{
MessageListener = app.ApplicationServices.GetService<MessageListener>();
MessageListener.Start();
});
appLifetime.ApplicationStopping.Register(() =>
{
MessageListener.Stop();
});
// ...
}