MQTT 协议

使用 MQTT 协议收发数据

简介

MQTT 协议 是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于 TCP/IP 协议上,由 IBM 在 1999 年发布。由于规范很简单,非常适合低功耗和网络带宽有限的 IOT 物联网场景。实际应用于第三方提供的道闸与雷达数据传输。

代码

  • NuGet 引用第三方 MQTTnet 库。
  • 默认服务端端口 1883,可任意修改为未被占用的端口。
  • 通过 Topic(主题) 匹配数据,以 “+” 与 “#” 作为通配符,”+” 为单层通配符,”#” 为多层通配符。
服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static async Task Main(string[] args)
{
var factory = new MqttFactory();

var options = new MqttServerOptionsBuilder()
.WithDefaultEndpoint()
.WithDefaultEndpointPort(1883)
.Build();

using (var server = factory.CreateMqttServer(options))
{
server.ClientConnectedAsync += e =>
{
Console.WriteLine($"{e.ClientId} ClientConnected");
return Task.FromResult(1);
};
server.ClientDisconnectedAsync += e =>
{
Console.WriteLine($"{e.ClientId} ClientDisconnected");
return Task.FromResult(1);
};
server.ClientSubscribedTopicAsync += e =>
{
Console.WriteLine($"{e.ClientId} ClientSubscribedTopic");
return Task.FromResult(1);
};
server.ClientUnsubscribedTopicAsync += e =>
{
Console.WriteLine($"{e.ClientId} ClientUnsubscribedTopic");
return Task.FromResult(1);
};
await server.StartAsync();
Console.WriteLine("Press Enter to exit.");
Console.ReadLine();

await server.StopAsync();
}
}
客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
static async Task Main(string[] args)
{
Thread t1 = new Thread(StartConsumer);
t1.Start();
Thread t2 = new Thread(StartPublisher);
t2.Start();
Console.WriteLine("Press Enter to exit.");
Console.ReadLine();
}

static async void StartConsumer()
{
var factory = new MqttFactory();

using (var client = factory.CreateManagedMqttClient())
{
var options = new MqttClientOptionsBuilder()
.WithTcpServer("127.0.0.1", 1234)
.Build();
var managedClientOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(options)
.Build();
client.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"Received message: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
return Task.FromResult(1);
};

await client.StartAsync(managedClientOptions);

var subscribeOptions = factory.CreateTopicFilterBuilder()
.WithTopic($"detrd/status/response/#")
.Build();
await client.SubscribeAsync(new List<MqttTopicFilter> { subscribeOptions });

Console.WriteLine("MQTT client subscribed to topic.");
Console.ReadLine();
}
}

static async void StartPublisher()
{
var factory = new MqttFactory();

using (var client = factory.CreateManagedMqttClient())
{
var options = new MqttClientOptionsBuilder()
.WithTcpServer("127.0.0.1", 1883)
.Build();
var managedClientOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(options)
.Build();
await client.StartAsync(managedClientOptions);

while (true)
{
await client.EnqueueAsync($"detrd/status/response/#", "data");

SpinWait.SpinUntil(() => client.PendingApplicationMessagesCount == 0, 10000);
Console.WriteLine("MQTT application message is published.");
Thread.Sleep(5000);
}
}
}