Kafka 帮助类

Kafka 消息队列帮助类使用介绍

简介

Kafka Demo 写了关于 Kafka 的基础介绍与官方提供的最简生产者与消费者的 Demo,完善代码封装为帮助类。

核心代码

KafkaHelper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/// <summary>
/// Kafka 消息队列帮助类
/// 创建日期:2023年10月25日
/// </summary>
public class KafkaHelper
{
private string _bootstrapServers;
Action<DeliveryReport<Null, string>> _handler;
private IProducer<Null, string> _producerBuilder;

/// <summary>
/// 消息回调
/// </summary>
public event Action<string> MessageCallback;

/// <summary>
/// 构造函数
/// </summary>
/// <param name="bootstrapServers">服务地址</param>
public KafkaHelper(string bootstrapServers)
{
// 127.0.0.1:9092
_bootstrapServers = bootstrapServers;
}

/// <summary>
/// 注册生产者
/// </summary>
public void RegisterProducer()
{
var conf = new ProducerConfig { BootstrapServers = _bootstrapServers };

_handler = r => Console.WriteLine(!r.Error.IsError
? $"Delivered message to {r.TopicPartitionOffset}"
: $"Delivery Error: {r.Error.Reason}");

_producerBuilder = new ProducerBuilder<Null, string>(conf).Build();
}

/// <summary>
/// 注册消费者
/// </summary>
/// <param name="groupId">消费组</param>
/// <param name="topic">主题</param>
/// <param name="autoOffsetReset">消费偏移量</param>
/// <param name="kafkaSSL">SSL 验证</param>
public void RegisterConsumer(string groupId, string topic, AutoOffsetReset autoOffsetReset = AutoOffsetReset.Earliest, KafkaSSL kafkaSSL = null)
{
var conf = new ConsumerConfig
{
GroupId = groupId,
BootstrapServers = _bootstrapServers,
// Note: The AutoOffsetReset property determines the start offset in the event
// there are not yet any committed offsets for the consumer group for the
// topic/partitions of interest. By default, offsets are committed
// automatically, so in this example, consumption will only start from the
// earliest message in the topic 'my-topic' the first time you run the program.
AutoOffsetReset = autoOffsetReset
};

// 如果 Kafka 开启了 SSL 验证,则需要填写以下信息,否则删除
if (kafkaSSL != null)
{
conf.SecurityProtocol = kafkaSSL.SecurityProtocol;
conf.SaslMechanism = kafkaSSL.SaslMechanism;
conf.SaslUsername = kafkaSSL.SaslUsername;
conf.SaslPassword = kafkaSSL.SaslPassword;
conf.SslCaLocation = kafkaSSL.SslCaLocation;
conf.SslKeystorePassword = kafkaSSL.SslKeystorePassword;
conf.SslEndpointIdentificationAlgorithm = kafkaSSL.SslEndpointIdentificationAlgorithm;
}

using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe(topic);

CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};

try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
//Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
MessageCallback?.Invoke(cr.Value);
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}

/// <summary>
/// 发送消息
/// </summary>
public void SendMessage(string topic, string message)
{
_producerBuilder.Produce(topic, new Message<Null, string> { Value = message }, _handler);

// wait for up to 10 seconds for any inflight messages to be delivered.
_producerBuilder.Flush(TimeSpan.FromSeconds(10));
}

/// <summary>
/// 关闭连接
/// </summary>
public void Close()
{
_producerBuilder.Dispose();
}
}

/// <summary>
/// Kafka SSL 验证
/// </summary>
public class KafkaSSL
{
/// <summary>
/// Protocol used to communicate with brokers. default: plaintext importance: high
/// </summary>
public SecurityProtocol? SecurityProtocol { get; set; } = Confluent.Kafka.SecurityProtocol.SaslSsl;

/// <summary>
/// SASL mechanism to use for authentication. Supported: GSSAPI, PLAIN, SCRAM-SHA-256,
/// SCRAM-SHA-512. **NOTE**: Despite the name, you may not configure more than one
/// mechanism.
/// </summary>
public SaslMechanism? SaslMechanism { get; set; } = Confluent.Kafka.SaslMechanism.Plain;

/// <summary>
/// SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms default: ''
/// importance: high
/// </summary>
public string SaslUsername { get; set; }

/// <summary>
/// SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism default: ''
/// importance: high
/// </summary>
public string SaslPassword { get; set; }

/// <summary>
/// File or directory path to CA certificate(s) for verifying the broker's key. Defaults:
/// On Windows the system's CA certificates are automatically looked up in the Windows
/// Root certificate store. On Mac OSX this configuration defaults to `probe`. It
/// is recommended to install openssl using Homebrew, to provide CA certificates.
/// On Linux install the distribution's ca-certificates package. If OpenSSL is statically
/// linked or `ssl.ca.location` is set to `probe` a list of standard paths will be
/// probed and the first one found will be used as the default CA certificate location
/// path. If OpenSSL is dynamically linked the OpenSSL library's default path will
/// be used (see `OPENSSLDIR` in `openssl version -a`). default: '' importance: low
/// </summary>
public string SslCaLocation { get; set; }

/// <summary>
/// Client's keystore (PKCS#12) password. default: '' importance: low
/// </summary>
public string SslKeystorePassword { get; set; }

/// <summary>
/// Endpoint identification algorithm to validate broker hostname using broker certificate.
/// https - Server (broker) hostname verification as specified in RFC2818. none -
/// No endpoint verification. OpenSSL >= 1.0.2 required. default: https importance:
/// low
/// </summary>
public SslEndpointIdentificationAlgorithm? SslEndpointIdentificationAlgorithm { get; set; }
}

调用测试

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void Main(string[] args)
{
// 服务地址
string bootstrapServers = "127.0.0.1:9092";
// 主题
string topic;

KafkaHelper kafkaHelper = new KafkaHelper(bootstrapServers);
kafkaHelper.RegisterProducer();

while (true)
{
kafkaHelper.SendMessage(topic, $"{DateTime.Now}");
Task.Delay(TimeSpan.FromMilliseconds(500)).Wait();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static void Main(string[] args)
{
// 服务地址
string bootstrapServers = "127.0.0.1:9092";
// 主题
string topic;
// 消费组
string group;

KafkaHelper kafkaHelper = new KafkaHelper(bootstrapServers);
kafkaHelper.MessageCallback += KafkaHelper_MessageCallback;
Task.Run(() =>
{
kafkaHelper.RegisterConsumer(group, topic);
});
}

/// <summary>
/// 队列消息回调
/// </summary>
/// <param name="message"></param>
private static void KafkaHelper_MessageCallback(string obj)
{
Console.WriteLine(obj);
}

消费者(SSL 验证)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static void Main(string[] args)
{
// 服务地址
string bootstrapServers = "127.0.0.1:9092";
// 主题
string topic;
// 消费组
string group;

KafkaHelper kafkaHelper = new KafkaHelper(bootstrapServers);
kafkaHelper.MessageCallback += KafkaHelper_MessageCallback;
Task.Run(() =>
{
KafkaSSL kafkaSSL = new KafkaSSL();
kafkaSSL.SecurityProtocol = SecurityProtocol.SaslSsl;
kafkaSSL.SaslMechanism = SaslMechanism.Plain;
kafkaSSL.SaslUsername = "xxx";
kafkaSSL.SaslPassword = "xxx";
kafkaSSL.SslCaLocation = "phy_ca.crt";
kafkaSSL.SslKeystorePassword = "dms@kafka";
kafkaSSL.SslEndpointIdentificationAlgorithm = null;

kafkaHelper.RegisterConsumer(group, topic, kafkaSSL: kafkaSSL);
});
}

/// <summary>
/// 队列消息回调
/// </summary>
/// <param name="message"></param>
private static void KafkaHelper_MessageCallback(string obj)
{
Console.WriteLine(obj);
}