RocketMQ 消息队列使用介绍
简介
Apache RocketMQ 是阿里巴巴开发的分布式消息中间件,后捐赠给 Apache 基金会维护。
直接对接华为云,所以未在本地安装。测试对 C# 兼容不是很好,而且服务端都逐渐使用 docker 部署,所以最终选择使用 Golang 开发。
C# 代码调用
官方库 rocketmq-client-csharp 的支持似乎并不好,调试了几次都运行不起来。
引用第三方 NewLife.RocketMQ 库。测试不支持 ACL(权限控制),如开启 ACL 则无法连接。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| class Program { public static string namesvr = ""; public static string topic = "";
static void Main(string[] args) { ThreadStart str = new ThreadStart(Producer); Thread ConstrolStr = new Thread(str); ConstrolStr.Start(); }
static void Producer() { var mq = new Producer() { Topic = topic, NameServerAddress = namesvr, };
mq.Start();
while (true) { try { var content = $"{DateTime.Now}"; var message = new NewLife.RocketMQ.Protocol.Message(); message.SetBody(content); var sr = mq.Publish(message); Console.WriteLine($"发送成功的消息,内容:{content}"); Task.Delay(TimeSpan.FromMilliseconds(500)).Wait(); } catch (Exception ex) { } } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| class Program { public static string namesvr = ""; public static string topic = ""; public static string group = "";
static void Main(string[] args) { ThreadStart num = new ThreadStart(Consumer); Thread ConstrolNum = new Thread(num); ConstrolNum.Start(); }
static void Consumer() { var consumer = new NewLife.RocketMQ.Consumer { Topic = topic, NameServerAddress = namesvr, BatchSize = 1, Group = group }; consumer.OnConsume = (q, ms) => { string mInfo = $"BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}"; Console.WriteLine(mInfo); foreach (var item in ms.ToList()) { string msg = string.Format($"接收到消息:msgId={item.MsgId},key={item.Keys},产生时间【{item.BornTimestamp.ToDateTime()}】,内容:{item.BodyString}"); Console.WriteLine(msg); } return true; };
consumer.Start(); } }
|
Golang 代码调用
引用 rocketmq-client-go 库。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| var server string = string() var topic string = string()
p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{server})), producer.WithRetry(2), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } msg := &primitive.Message{ Topic: topic, Body: []byte("Message"), } msg.WithTag("TagName") msg.WithKeys([]string{"KeyName"})
for { res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) }
time.Sleep(1000000000) }
err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| var server string = string() var topic string = string() var group string = string()
c, _ := rocketmq.NewPushConsumer( consumer.WithGroupName(group), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{server})), ) err := c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for i := range msgs { fmt.Printf("subscribe callback: %v \n", string(msgs[i].Message.Body)) } return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) }
err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("shutdown Consumer error: %s", err.Error()) }
|
Java 代码调用
通过 Maven 引用 rocketmq-client 等相关库。
Maven 引用库
1 2 3 4 5 6 7 8 9 10 11 12
| <dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.9.0</version> </dependency> </dependencies>
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.Date; import java.text.SimpleDateFormat;
public class Main { public static void main(String[] args) { String producerGroup = ""; String namesvr = ""; String topic = ""; String tag = "*";
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesvr);
try { producer.start(); } catch (MQClientException e) { throw new RuntimeException(e); }
try { while (true) { Date currentDate = new Date(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String formattedDate = dateFormat.format(currentDate);
Message message = new Message( topic, tag, formattedDate.getBytes(RemotingHelper.DEFAULT_CHARSET) );
producer.send(message); System.out.println("消息发送成功"); Thread.sleep(500); } } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
|
生产者 ACL 验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.Date; import java.text.SimpleDateFormat;
public class Main { public static void main(String[] args) { String producerGroup = ""; String namesvr = ""; String topic = ""; String tag = "*";
String ACL_ACCESS_KEY = ""; String ACL_SECRET_KEY = "";
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY)); DefaultMQProducer producer = new DefaultMQProducer(producerGroup, rpcHook);
producer.setNamesrvAddr(namesvr); producer.setUseTLS(true);
try { producer.start(); } catch (MQClientException e) { throw new RuntimeException(e); }
try { while (true) { Date currentDate = new Date(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String formattedDate = dateFormat.format(currentDate);
Message message = new Message( topic, tag, formattedDate.getBytes(RemotingHelper.DEFAULT_CHARSET) );
producer.send(message); System.out.println("消息发送成功"); Thread.sleep(500); } } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Main { public static void main(String[] args) { String consumerGroup = ""; String namesvr = ""; String topic = ""; String tag = "*";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesvr);
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt message : msgs) { System.out.println("收到消息:" + new String(message.getBody())); }
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
try { consumer.subscribe(topic, tag); consumer.start();
System.out.println("消费者启动成功"); } catch (MQClientException e) { throw new RuntimeException(e); } } }
|
消费者 ACL 验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class Main { public static void main(String[] args) { String consumerGroup = ""; String namesvr = ""; String topic = ""; String tag = "*";
String ACL_ACCESS_KEY = ""; String ACL_SECRET_KEY = "";
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY)); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null, consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), true, null);
consumer.setNamesrvAddr(namesvr); consumer.setUseTLS(true);
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt message : msgs) { System.out.println("收到消息:" + new String(message.getBody())); }
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
try { consumer.subscribe(topic, tag); consumer.start();
System.out.println("消费者启动成功"); } catch (MQClientException e) { throw new RuntimeException(e); } } }
|