ActiveMQ 消息队列使用介绍
简介
Apache ActiveMQ 是一个开放源代码的消息中间件。
安装部署
请参阅官方文档
Docker 部署
1 2 3 4 5 6
| # 拉取官方 ActiveMQ docker pull webcenter/activemq # 启动 ActiveMQ docker run -d --name myactivemq -p 61616:61616 -p 8161:8161 webcenter/activemq # WEB 端登录 默认账户密码为 admin/admin http://localhost:8161/
|
C# 代码调用
引用 Apache.NMS.ActiveMQ 库。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| using Apache.NMS; using Apache.NMS.ActiveMQ; using System;
class Program { static void Main(string[] args) { try { IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/"); using (IConnection connection = factory.CreateConnection()) { using (ISession session = connection.CreateSession()) { IMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing")); int i = 0; while (!Console.KeyAvailable) { ITextMessage msg = prod.CreateTextMessage(); msg.Text = i.ToString(); Console.WriteLine("Sending: " + i.ToString()); prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue); System.Threading.Thread.Sleep(5000); i++; } } } Console.ReadLine(); } catch (System.Exception e) { Console.WriteLine("{0}", e.Message); Console.ReadLine(); } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| using Apache.NMS; using Apache.NMS.ActiveMQ; using System;
class Program { static void Main(string[] args) { try { IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/"); using (IConnection connection = factory.CreateConnection()) { connection.ClientId = "testing listener"; connection.Start(); using (ISession session = connection.CreateSession()) { IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false); consumer.Listener += new MessageListener(consumer_Listener); Console.ReadLine(); } connection.Stop(); connection.Close(); } } catch (System.Exception e) { Console.WriteLine(e.Message); } } static void consumer_Listener(IMessage message) { try { ITextMessage msg = (ITextMessage)message; Console.WriteLine("Receive: " + msg.Text); } catch (System.Exception e) { Console.WriteLine(e.Message); } } }
|