728x90
반응형
MQTT는 경량의 Messaging Protocol로,
- 헤더 오버헤드가 작기 때문에 제한된 대역폭과 고비용의 네트워크 환경에서의 메시징에 최적화되어 있다.
- 주로 IoT(Internet of Things) 환경에서 사용된다
- 소형 장치들이 신뢰할 수 있게 데이터를 교환할 수 있도록 설계되었다.
발행/구도 모델(Publish/Subscribe Model)로
MQTT는 client가 특정 topic에 메시지를 발행하고, 다른 client가 해당 topic을 구독하여 메세지를 수신하게 된다.
위와 같은 아키텍처에서도 한 server에서 topic을 다르게 하여 다른 client에 각기 다른 message를 전송할 수 있다.
이 때 Mqtt는 세 가지 QoS(Quality of Service)레벨을 제공한다.
- QoS 0: 메시지가 한 번만 전달되며, 확인 없이 전송 (At most once).
- QoS 1: 메시지가 최소 한 번 이상 전달되며, 확인 후 전송 (At least once).
- QoS 2: 메시지가 정확히 한 번만 전달되며, 이중 확인 후 전송 (Exactly once).
c# 예제
(한 솔루션 안에 다른 프로젝트로 생성)
client에 connect, disconnect 되었을 때 handler를 등록한다.
1. Publisher
broker에 connect되어 handler가 호출될 때 topic과 message 등록 및 publish를 진행한다.
3초마다 idx를 올려 발행한다.
using MQTTnet;
using MQTTnet.Client;
using System.Text;
class Publisher
{
static async Task Main(string[] args)
{
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
client.ConnectedAsync += MqttCientConnected;
client.DisconnectedAsync += MqttClientDisConnected;
await MqttClientConnect();
await client.DisconnectAsync();
async Task MqttClientConnect()
{
var options = new MqttClientOptionsBuilder()
.WithClientId("Publisher")
.WithTcpServer("broker.hivemq.com")
.Build();
await client.ConnectAsync(options);
}
async Task MqttCientConnected(MqttClientConnectedEventArgs e)
{
var idx = 0;
while (true)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic("my/topic")
.WithPayload($"Hello {idx++}")
.Build();
await client.PublishAsync(applicationMessage: message);
Console.WriteLine($"published : {Encoding.UTF8.GetString(message.PayloadSegment)}");
// 3초마다 idx를 올려 발행
await Task.Delay(3000);
}
}
async Task MqttClientDisConnected(MqttClientDisconnectedEventArgs e)
{
Console.WriteLine("Disconnected to broker.");
try
{
await MqttClientConnect();
} catch (Exception ex)
{
Console.WriteLine($"Reconnection failed: {ex.Message}");
}
}
}
}
2. Subscriber
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Server;
using System.Text;
class Subscriber
{
static async Task Main(string[] args)
{
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
client.ApplicationMessageReceivedAsync += MqttClientApplicationMsgReceived;
client.ConnectedAsync += MqttCientConnected;
client.DisconnectedAsync += MqttClientDisConnected;
await MqttClientConnect();
async Task MqttClientConnect()
{
var options = new MqttClientOptionsBuilder()
.WithClientId("SubscriberClient")
.WithTcpServer("broker.hivemq.com") // 공용 브로커 사용
.Build();
await client.ConnectAsync(options);
}
async Task MqttCientConnected(MqttClientConnectedEventArgs e)
{
Console.WriteLine("Connected to broker.");
await client.SubscribeAsync(
new MqttTopicFilterBuilder()
.WithTopic("my/topic")
.Build());
Console.WriteLine("Subscribed to topic.");
}
async Task MqttClientDisConnected(MqttClientDisconnectedEventArgs e)
{
Console.WriteLine("Disconnected to broker.");
try
{
await MqttClientConnect();
} catch (Exception ex)
{
Console.WriteLine($"Reconnection failed: {ex.Message}");
}
}
async Task MqttClientApplicationMsgReceived(MqttApplicationMessageReceivedEventArgs e)
{
//var topic = e.ApplicationMessage.Topic;
var message = e.ApplicationMessage.PayloadSegment;
Console.WriteLine($"received: " +
$"{Encoding.UTF8.GetString(message)}");
await Task.Run(() =>
{
// save data
});
}
Console.WriteLine("Press any key to exit.");
Console.ReadLine();
}
}
구현하다 알게된건데 ApplicationMessage.Payload는 다음 release에 미지원 예정이라고 warning이 뜨기 때문에 사용을 지양하는 것이 좋겠다.
publish/subscribe가 잘 동작한다!
728x90
반응형