ActiveMQ 消息队列帮助类使用介绍
简介
ActiveMQ Demo 写了关于 ActiveMQ 的基础介绍与官方提供的最简生产者与消费者的 Demo,完善代码封装为帮助类。
核心代码
ActiveMQHelper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
public class ActiveMQHelper { IConnectionFactory _factory;
IConnection _connection_producer; ISession _session_producer; IMessageProducer _prod;
IConnection _connection_consumer; ISession _session_consumer;
public event Action<string> MessageCallback;
public ActiveMQHelper(string brokerUri) { _factory = new ConnectionFactory(brokerUri); }
public void RegisterProducer(string topic) { _connection_producer = _factory.CreateConnection(); _session_producer = _connection_producer.CreateSession(); _prod = _session_producer.CreateProducer(new ActiveMQTopic(topic)); }
public void RegisterConsumer(string topic, string clientid) { _connection_consumer = _factory.CreateConnection(); _connection_consumer.ClientId = clientid; _connection_consumer.Start(); _session_consumer = _connection_consumer.CreateSession(); IMessageConsumer consumer = _session_consumer.CreateDurableConsumer(new ActiveMQTopic(topic), clientid, null, false); consumer.Listener += new MessageListener(consumer_Listener); }
private void consumer_Listener(IMessage message) { try { ITextMessage msg = (ITextMessage)message; MessageCallback?.Invoke(msg.Text); } catch (Exception) { } }
public void SendMessage(string message) { ITextMessage msg = _prod.CreateTextMessage(); msg.Text = message; _prod.Send(msg, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue); }
public void Close() { _prod?.Close(); _prod?.Dispose(); _session_producer?.Close(); _session_producer?.Dispose(); _connection_producer?.Stop(); _connection_producer?.Close(); _connection_producer?.Dispose();
_session_consumer?.Close(); _session_consumer?.Dispose(); _connection_consumer?.Stop(); _connection_consumer?.Close(); _connection_consumer?.Dispose(); } }
|
调用测试
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| static void Main(string[] args) { string brokerUri = "127.0.0.1:9092"; string topic;
ActiveMQHelper activeMQHelper = new ActiveMQHelper(brokerUri); activeMQHelper.RegisterProducer(topic); while (true) { activeMQHelper.SendMessage($"{DateTime.Now}"); Task.Delay(TimeSpan.FromMilliseconds(500)).Wait(); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| static void Main(string[] args) { string brokerUri = "127.0.0.1:9092"; string topic; string clientid;
ActiveMQHelper activeMQHelper = new ActiveMQHelper(brokerUri); activeMQHelper.MessageCallback += ActiveMQHelper_MessageCallback; activeMQHelper.RegisterConsumer(topic, clientid); }
private static void ActiveMQHelper_MessageCallback(string obj) { Console.WriteLine(obj); }
|