본문 바로가기
API

Mqtt C# 예제

by _S0_H2_ 2023. 1. 24.
728x90
반응형

MQTT는 경량의 Messaging Protocol로,

- 헤더 오버헤드가 작기 때문에 제한된 대역폭과 고비용의 네트워크 환경에서의 메시징에 최적화되어 있다.

- 주로 IoT(Internet of Things) 환경에서 사용된다

- 소형 장치들이 신뢰할 수 있게 데이터를 교환할 수 있도록 설계되었다.

 

 

 

발행/구도 모델(Publish/Subscribe Model)로
MQTT는 client가 특정 topic에 메시지를 발행하고, 다른 client가 해당 topic을 구독하여 메세지를 수신하게 된다.

위와 같은 아키텍처에서도 한 server에서 topic을 다르게 하여 다른 client에 각기 다른 message를 전송할 수 있다.

 

 

이 때 Mqtt는 세 가지 QoS(Quality of Service)레벨을 제공한다.

  • QoS 0: 메시지가 한 번만 전달되며, 확인 없이 전송 (At most once).
  • QoS 1: 메시지가 최소 한 번 이상 전달되며, 확인 후 전송 (At least once).
  • QoS 2: 메시지가 정확히 한 번만 전달되며, 이중 확인 후 전송 (Exactly once).

 

 

c# 예제 

(한 솔루션 안에 다른 프로젝트로 생성)

client에 connect, disconnect 되었을 때 handler를 등록한다.

 

1. Publisher

 broker에 connect되어 handler가 호출될 때 topic과 message 등록 및 publish를 진행한다.

3초마다 idx를 올려 발행한다.

using MQTTnet;
using MQTTnet.Client;
using System.Text;

class Publisher
{
    static async Task Main(string[] args)
    {
        var factory = new MqttFactory();
        var client = factory.CreateMqttClient();
        client.ConnectedAsync += MqttCientConnected;
        client.DisconnectedAsync += MqttClientDisConnected;

        await MqttClientConnect();
        await client.DisconnectAsync();

        async Task MqttClientConnect()
        {
            var options = new MqttClientOptionsBuilder()
            .WithClientId("Publisher")
            .WithTcpServer("broker.hivemq.com")
            .Build();
            await client.ConnectAsync(options);
        }

        async Task MqttCientConnected(MqttClientConnectedEventArgs e)
        {
            var idx = 0;
            while (true)
            {
                var message = new MqttApplicationMessageBuilder()
                    .WithTopic("my/topic")
                    .WithPayload($"Hello {idx++}")
                    .Build();
                await client.PublishAsync(applicationMessage: message);
                Console.WriteLine($"published : {Encoding.UTF8.GetString(message.PayloadSegment)}");
                // 3초마다 idx를 올려 발행
                await Task.Delay(3000);
            }
        }

        async Task MqttClientDisConnected(MqttClientDisconnectedEventArgs e)
        {
            Console.WriteLine("Disconnected to broker.");
            try
            {
                await MqttClientConnect();
            } catch (Exception ex)
            {
                Console.WriteLine($"Reconnection failed: {ex.Message}");
            }
        }
    }
}

 

 

2. Subscriber

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Server;
using System.Text;

class Subscriber
{
    static async Task Main(string[] args)
    {
        var factory = new MqttFactory();
        var client = factory.CreateMqttClient();
        client.ApplicationMessageReceivedAsync += MqttClientApplicationMsgReceived;
        client.ConnectedAsync += MqttCientConnected;
        client.DisconnectedAsync += MqttClientDisConnected;

        await MqttClientConnect();

        async Task MqttClientConnect()
        {
            var options = new MqttClientOptionsBuilder()
            .WithClientId("SubscriberClient")
            .WithTcpServer("broker.hivemq.com") // 공용 브로커 사용
            .Build();
            await client.ConnectAsync(options);
        }

        async Task MqttCientConnected(MqttClientConnectedEventArgs e)
        {
            Console.WriteLine("Connected to broker.");
            await client.SubscribeAsync(
           new MqttTopicFilterBuilder()
           .WithTopic("my/topic")
           .Build());
            Console.WriteLine("Subscribed to topic.");
        }

        async Task MqttClientDisConnected(MqttClientDisconnectedEventArgs e)
        {
            Console.WriteLine("Disconnected to broker.");
            try
            {
                await MqttClientConnect();
            } catch (Exception ex)
            {
                Console.WriteLine($"Reconnection failed: {ex.Message}");
            }
        }


        async Task MqttClientApplicationMsgReceived(MqttApplicationMessageReceivedEventArgs e)
        {
            //var topic = e.ApplicationMessage.Topic;
            var message = e.ApplicationMessage.PayloadSegment;
            Console.WriteLine($"received: " +
                $"{Encoding.UTF8.GetString(message)}");
            await Task.Run(() =>
            {
                // save data
            });
        }

        Console.WriteLine("Press any key to exit.");
        Console.ReadLine();
    }
}

 

 

구현하다 알게된건데 ApplicationMessage.Payload는 다음 release에 미지원 예정이라고 warning이 뜨기 때문에 사용을 지양하는 것이 좋겠다.

 

 


publish/subscribe가 잘 동작한다!

좌 subscriber 우 publisher

 

728x90
반응형