RocketMQ 消息队列使用介绍
简介
Apache RocketMQ 是阿里巴巴开发的分布式消息中间件,后捐赠给 Apache 基金会维护。
直接对接华为云,所以未在本地安装。测试对 C# 兼容不是很好,而且服务端都逐渐使用 docker 部署,所以最终选择使用 Golang 开发。
C# 代码调用
官方库 rocketmq-client-csharp 的支持似乎并不好,调试了几次都运行不起来。
引用第三方 NewLife.RocketMQ 库。测试不支持 ACL(权限控制),如开启 ACL 则无法连接。
生产者
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 
 | class Program{
 public static string namesvr = "";
 public static string topic = "";
 
 static void Main(string[] args)
 {
 ThreadStart str = new ThreadStart(Producer);
 Thread ConstrolStr = new Thread(str);
 ConstrolStr.Start();
 }
 
 
 
 
 
 static void Producer()
 {
 
 var mq = new Producer()
 {
 
 Topic = topic,
 
 NameServerAddress = namesvr,
 };
 
 mq.Start();
 
 
 while (true)
 {
 try
 {
 var content = $"{DateTime.Now}";
 var message = new NewLife.RocketMQ.Protocol.Message();
 message.SetBody(content);
 
 var sr = mq.Publish(message);
 Console.WriteLine($"发送成功的消息,内容:{content}");
 Task.Delay(TimeSpan.FromMilliseconds(500)).Wait();
 }
 catch (Exception ex)
 {
 
 }
 }
 }
 }
 
 | 
消费者
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 
 | class Program{
 public static string namesvr = "";
 public static string topic = "";
 public static string group = "";
 
 static void Main(string[] args)
 {
 ThreadStart num = new ThreadStart(Consumer);
 Thread ConstrolNum = new Thread(num);
 ConstrolNum.Start();
 }
 
 
 
 
 
 static void Consumer()
 {
 
 var consumer = new NewLife.RocketMQ.Consumer
 {
 Topic = topic,
 NameServerAddress = namesvr,
 
 BatchSize = 1,
 Group = group
 };
 consumer.OnConsume = (q, ms) =>
 {
 string mInfo = $"BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}";
 Console.WriteLine(mInfo);
 foreach (var item in ms.ToList())
 {
 string msg = string.Format($"接收到消息:msgId={item.MsgId},key={item.Keys},产生时间【{item.BornTimestamp.ToDateTime()}】,内容:{item.BodyString}");
 Console.WriteLine(msg);
 }
 
 return true;
 };
 
 consumer.Start();
 }
 }
 
 | 
Golang 代码调用
引用 rocketmq-client-go 库。
生产者
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 
 | var server string = string()var topic string = string()
 
 p, _ := rocketmq.NewProducer(
 producer.WithNsResolver(primitive.NewPassthroughResolver([]string{server})),
 producer.WithRetry(2),
 )
 err := p.Start()
 if err != nil {
 fmt.Printf("start producer error: %s", err.Error())
 os.Exit(1)
 }
 msg := &primitive.Message{
 Topic: topic,
 Body:  []byte("Message"),
 }
 msg.WithTag("TagName")
 msg.WithKeys([]string{"KeyName"})
 
 for {
 res, err := p.SendSync(context.Background(), msg)
 if err != nil {
 fmt.Printf("send message error: %s\n", err)
 } else {
 fmt.Printf("send message success: result=%s\n", res.String())
 }
 
 time.Sleep(1000000000)
 }
 
 err = p.Shutdown()
 if err != nil {
 fmt.Printf("shutdown producer error: %s", err.Error())
 }
 
 | 
消费者
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 
 | var server string = string()var topic string = string()
 var group string = string()
 
 c, _ := rocketmq.NewPushConsumer(
 consumer.WithGroupName(group),
 consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{server})),
 )
 err := c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context,
 msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
 for i := range msgs {
 fmt.Printf("subscribe callback: %v \n", string(msgs[i].Message.Body))
 }
 return consumer.ConsumeSuccess, nil
 })
 if err != nil {
 fmt.Println(err.Error())
 }
 
 err = c.Start()
 if err != nil {
 fmt.Println(err.Error())
 os.Exit(-1)
 }
 time.Sleep(time.Hour)
 err = c.Shutdown()
 if err != nil {
 fmt.Printf("shutdown Consumer error: %s", err.Error())
 }
 
 | 
Java 代码调用
通过 Maven 引用 rocketmq-client 等相关库。
Maven 引用库
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 
 | <dependencies><dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-client</artifactId>
 <version>4.9.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-acl</artifactId>
 <version>4.9.0</version>
 </dependency>
 </dependencies>
 
 | 
生产者
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 
 | import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 
 import java.util.Date;
 import java.text.SimpleDateFormat;
 
 public class Main {
 public static void main(String[] args) {
 String producerGroup = "";
 String namesvr = "";
 String topic = "";
 String tag = "*";
 
 
 DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
 
 
 producer.setNamesrvAddr(namesvr);
 
 
 try {
 producer.start();
 } catch (MQClientException e) {
 throw new RuntimeException(e);
 }
 
 try {
 while (true) {
 
 Date currentDate = new Date();
 
 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
 String formattedDate = dateFormat.format(currentDate);
 
 
 Message message = new Message(
 topic,
 tag,
 formattedDate.getBytes(RemotingHelper.DEFAULT_CHARSET)
 );
 
 
 producer.send(message);
 System.out.println("消息发送成功");
 Thread.sleep(500);
 }
 } catch (Exception e) {
 e.printStackTrace();
 } finally {
 
 producer.shutdown();
 }
 }
 }
 
 | 
生产者 ACL 验证
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 
 | import org.apache.rocketmq.acl.common.AclClientRPCHook;import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 
 import java.util.Date;
 import java.text.SimpleDateFormat;
 
 public class Main {
 public static void main(String[] args) {
 String producerGroup = "";
 String namesvr = "";
 String topic = "";
 String tag = "*";
 
 String ACL_ACCESS_KEY = "";
 String ACL_SECRET_KEY = "";
 
 RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY));
 
 DefaultMQProducer producer = new DefaultMQProducer(producerGroup, rpcHook);
 
 
 producer.setNamesrvAddr(namesvr);
 
 producer.setUseTLS(true);
 
 
 try {
 producer.start();
 } catch (MQClientException e) {
 throw new RuntimeException(e);
 }
 
 try {
 while (true) {
 
 Date currentDate = new Date();
 
 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
 String formattedDate = dateFormat.format(currentDate);
 
 
 Message message = new Message(
 topic,
 tag,
 formattedDate.getBytes(RemotingHelper.DEFAULT_CHARSET)
 );
 
 
 producer.send(message);
 System.out.println("消息发送成功");
 Thread.sleep(500);
 }
 } catch (Exception e) {
 e.printStackTrace();
 } finally {
 
 producer.shutdown();
 }
 }
 }
 
 | 
消费者
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 
 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 
 import java.util.List;
 
 public class Main {
 public static void main(String[] args) {
 String consumerGroup = "";
 String namesvr = "";
 String topic = "";
 String tag = "*";
 
 
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
 
 
 consumer.setNamesrvAddr(namesvr);
 
 
 consumer.registerMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 for (MessageExt message : msgs) {
 System.out.println("收到消息:" + new String(message.getBody()));
 }
 
 
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
 });
 
 try {
 
 consumer.subscribe(topic, tag);
 
 consumer.start();
 
 System.out.println("消费者启动成功");
 } catch (MQClientException e) {
 throw new RuntimeException(e);
 }
 }
 }
 
 | 
消费者 ACL 验证
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 
 | import org.apache.rocketmq.acl.common.AclClientRPCHook;import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
 
 import java.util.List;
 
 public class Main {
 public static void main(String[] args) {
 String consumerGroup = "";
 String namesvr = "";
 String topic = "";
 String tag = "*";
 
 String ACL_ACCESS_KEY = "";
 String ACL_SECRET_KEY = "";
 
 RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY));
 
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null, consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), true, null);
 
 
 consumer.setNamesrvAddr(namesvr);
 
 consumer.setUseTLS(true);
 
 
 consumer.registerMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 for (MessageExt message : msgs) {
 System.out.println("收到消息:" + new String(message.getBody()));
 }
 
 
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
 });
 
 try {
 
 consumer.subscribe(topic, tag);
 
 consumer.start();
 
 System.out.println("消费者启动成功");
 } catch (MQClientException e) {
 throw new RuntimeException(e);
 }
 }
 }
 
 |