使用 MQTT 协议收发数据
简介 MQTT 协议 是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于 TCP/IP 协议上,由 IBM 在 1999 年发布。由于规范很简单,非常适合低功耗和网络带宽有限的 IOT 物联网场景。实际应用于第三方提供的道闸与雷达数据传输。
代码
NuGet 引用第三方 MQTTnet 库。
默认服务端端口 1883,可任意修改为未被占用的端口。
通过 Topic(主题) 匹配数据,以 “+” 与 “#” 作为通配符,”+” 为单层通配符,”#” 为多层通配符。
服务端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 static async Task Main (string [] args ) { var factory = new MqttFactory(); var options = new MqttServerOptionsBuilder() .WithDefaultEndpoint() .WithDefaultEndpointPort(1883 ) .Build(); using (var server = factory.CreateMqttServer(options)) { server.ClientConnectedAsync += e => { Console.WriteLine($"{e.ClientId} ClientConnected" ); return Task.FromResult(1 ); }; server.ClientDisconnectedAsync += e => { Console.WriteLine($"{e.ClientId} ClientDisconnected" ); return Task.FromResult(1 ); }; server.ClientSubscribedTopicAsync += e => { Console.WriteLine($"{e.ClientId} ClientSubscribedTopic" ); return Task.FromResult(1 ); }; server.ClientUnsubscribedTopicAsync += e => { Console.WriteLine($"{e.ClientId} ClientUnsubscribedTopic" ); return Task.FromResult(1 ); }; await server.StartAsync(); Console.WriteLine("Press Enter to exit." ); Console.ReadLine(); await server.StopAsync(); } }
客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 static async Task Main (string [] args ) { Thread t1 = new Thread(StartConsumer); t1.Start(); Thread t2 = new Thread(StartPublisher); t2.Start(); Console.WriteLine("Press Enter to exit." ); Console.ReadLine(); } static async void StartConsumer ( ) { var factory = new MqttFactory(); using (var client = factory.CreateManagedMqttClient()) { var options = new MqttClientOptionsBuilder() .WithTcpServer("127.0.0.1" , 1234 ) .Build(); var managedClientOptions = new ManagedMqttClientOptionsBuilder() .WithClientOptions(options) .Build(); client.ApplicationMessageReceivedAsync += e => { Console.WriteLine($"Received message: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} " ); return Task.FromResult(1 ); }; await client.StartAsync(managedClientOptions); var subscribeOptions = factory.CreateTopicFilterBuilder() .WithTopic($"detrd/status/response/#" ) .Build(); await client.SubscribeAsync(new List<MqttTopicFilter> { subscribeOptions }); Console.WriteLine("MQTT client subscribed to topic." ); Console.ReadLine(); } } static async void StartPublisher ( ) { var factory = new MqttFactory(); using (var client = factory.CreateManagedMqttClient()) { var options = new MqttClientOptionsBuilder() .WithTcpServer("127.0.0.1" , 1883 ) .Build(); var managedClientOptions = new ManagedMqttClientOptionsBuilder() .WithClientOptions(options) .Build(); await client.StartAsync(managedClientOptions); while (true ) { await client.EnqueueAsync($"detrd/status/response/#" , "data" ); SpinWait.SpinUntil(() => client.PendingApplicationMessagesCount == 0 , 10000 ); Console.WriteLine("MQTT application message is published." ); Thread.Sleep(5000 ); } } }