杨前锦
2025-06-04 d44e3abf0d51cfea1ed7df510974d69458cf516d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
using HH.WCS.Mobox3.HD.models;
using HH.WCS.Mobox3.HD.util;
using HH.WCS.Mobox3.HD.wms;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Server;
using Newtonsoft.Json;
using NLog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Reflection;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using uPLibrary.Networking.M2Mqtt;
using static HH.WCS.Mobox3.HD.core.MQTTCore;
 
 
namespace HH.WCS.Mobox3.HD.core
{
    internal class MQTTCore
    {
        public static Dictionary<string, MqttClientService> mqqtClients = new Dictionary<string, MqttClientService>();
 
        static MQTTCore()
        {
            LogHelper.Info("MQTTS开始加载.....", "MQTTS");
            Init();
            LogHelper.Info("MQTTS加载完成.....", "MQTTS");
        }
        public static void Init() {
            try
            {
                MqttClientService mqttClientService = new MqttClientService();
                LogHelper.Info("MQTT初始化参数:" + JsonConvert.SerializeObject(Settings.mqttServer), "MQTTS");
                mqttClientService.MqttClientStart(Settings.mqttServer);
                mqqtClients.Add("数字孪生", mqttClientService);
            }
            catch (Exception ex) {
                LogHelper.Info("MQTT初始化错误....,错误原因:"+ex.Message, "MQTTS");
            }    
        }
        public class MqttClientService
        {
            public MqttClientOptionsBuilder optionsBuilder { get; set; }
            public MqttClientOptions clientOptions { get; set; }
            public IMqttClient _mqttClient { get; set; }
            
            public void MqttClientStart(Settings.MQTTServer mqttServer) {
                try
                {
                    /*// 加载CA证书用于验证服务器证书  
                    LogHelper.Info("加载CA证书.....", "MQTTS");
                    var caCert = X509Certificate.CreateFromCertFile(@"ca-cert.pem");
 
                    // 加载客户端证书和私钥  
                    LogHelper.Info("加载客户端证书和私钥.....", "MQTTS");
                    var clientCert = new X509Certificate2(@"client.pfx", "yang04519999", X509KeyStorageFlags.Exportable);*/
 
                    LogHelper.Info("开始加载MQTTS.....", "MQTTS");
                    if (optionsBuilder == null)
                    {
                    optionsBuilder = new MqttClientOptionsBuilder().WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
                    .WithTcpServer(mqttServer.url, int.Parse(mqttServer.port)) // 要访问的mqtt服务端的 ip 和 端口号("romaiotbeta01.his.huawei.com", 8443)
                                                                               // .WithCredentials(mqttServer.username, mqttServer.password) // 要访问的mqtt服务端的用户名和密码("52CD6E25623A43C385BD0152ABD1FA7C", "iXYeA/z3vTzcMJs1P8t6Eg==")
                    .WithClientId(mqttServer.clientId) // 设置客户端id("L002001A-0002")
                    .WithCleanSession()
                    .WithTls(new MqttClientOptionsBuilderTlsParameters
                    {
                        //UseTls = false  // 是否使用 tls加密
                        UseTls = false,
                        AllowUntrustedCertificates = false, // 不允许不受信任的证书 
                        IgnoreCertificateChainErrors = false, // 不忽略证书链错误
                        IgnoreCertificateRevocationErrors = false, // 不忽略证书吊销错误  
                        SslProtocol = SslProtocols.Tls12, // 使用TLS 1.2协议  
                        CertificateValidationHandler = (o) => {
                            return true;
                        },
                        /*Certificates = new List<X509Certificate>(){
                        caCert,
                        clientCert
                            //newCert,
                        },*/
                    });
                    }
                    LogHelper.Info("加载完成.....", "MQTTS");
                
                    if (clientOptions == null) {
                        clientOptions = optionsBuilder.Build();
 
                    }
                    if (_mqttClient == null) {
                        _mqttClient = new MqttFactory().CreateMqttClient();
                        _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 客户端连接成功事件
                        _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 客户端连接关闭事件
                        _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件
                    }
                    if (!_mqttClient.IsConnected) {
                        _mqttClient.ConnectAsync(clientOptions);
                    }
                    }
                catch (Exception e)
                {
                    LogHelper.Info("启动MQTT客户端失败..... ,错误原因:" + e.Message, "MQTTS");
                }
            }
 
            internal void Check() {
                if (_mqttClient != null) {
                    if (!_mqttClient.IsConnected) {
                        _mqttClient.ConnectAsync(clientOptions);
                    }
                }
                else {
                    Init();
                }
            }
 
            public void close() {
                if (_mqttClient != null) {
                    if (_mqttClient.IsConnected) {
                        _mqttClient.DisconnectAsync(new MqttClientDisconnectOptions { ReasonString = "离线" });
                    }
                }
            }
            /// <summary>
            /// 客户端连接关闭事件
            /// </summary>
            /// <param name="arg"></param>
            /// <returns></returns>
            private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) {
                Console.WriteLine($"客户端已断开与服务端的连接……");
                return Task.CompletedTask;
            }
 
            /// <summary>
            /// 客户端连接成功事件
            /// </summary>
            /// <param name="arg"></param>
            /// <returns></returns>
            private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) {
                Console.WriteLine($"客户端已连接服务端……");
 
                // 订阅消息主题
                // MqttQualityOfServiceLevel: (QoS):  0 最多一次,接收者不确认收到消息,并且消息不被发送者存储和重新发送提供与底层 TCP 协议相同的保证。
                // 1: 保证一条消息至少有一次会传递给接收方。发送方存储消息,直到它从接收方收到确认收到消息的数据包。一条消息可以多次发送或传递。
                // 2: 保证每条消息仅由预期的收件人接收一次。级别2是最安全和最慢的服务质量级别,保证由发送方和接收方之间的至少两个请求/响应(四次握手)。
 
                //8qti7j822065
                //d84y8ppo1291
                /*_mqttClient.SubscribeAsync($"iot/v2/device/{Settings.productId}/item/{deiviceId}/event/datachange/reply", MqttQualityOfServiceLevel.AtLeastOnce); //topic_02
                _mqttClient.SubscribeAsync($"iot/v2/device/{Settings.productId}/item/{deiviceId}/event/alarm/reply", MqttQualityOfServiceLevel.AtLeastOnce); //topic_02
*/
                /*1、设备心跳:  
                    发布Topic:     iot/v2/device/8qti7j822065/item/L002001A-0002/event/datachange/request  
                    订阅Topic:   iot/v2/device/8qti7j822065/item/L002001A-0002/event/datachange/reply
                  2、设备告警:
                    发布Topic:     iot/v2/device/8qti7j822065/item/L002001A-0002/event/alarm/request
                    订阅Topic:  iot/v2/device/8qti7j822065/item/L002001A-0002/event/alarm/reply
                */
 
                return Task.CompletedTask;
            }
 
            /// <summary>
            /// 收到消息事件
            /// </summary>
            /// <param name="arg"></param>
            /// <returns></returns>
            private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) {
                Console.WriteLine($"ApplicationMessageReceivedAsync:客户端ID=【{arg.ClientId}】接收到消息。 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
                return Task.CompletedTask;
            }
 
            public bool Publish(string topic, string data) {
                try
                {
                    LogHelper.Info($"主题:{topic} ,数据:{data}","MQTTS");
                    var message = new MqttApplicationMessage
                    {
                        Topic = topic,
                        Payload = Encoding.Default.GetBytes(data),
                        QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
                        Retain = false  // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
                    };
                    if (_mqttClient.IsConnected)
                    {
                        var res = _mqttClient.PublishAsync(message).Result;
                        Console.WriteLine(data);
                        return res.IsSuccess;
                    }
                    else
                    {
                        _mqttClient.ConnectAsync(_mqttClient.Options);
                    }
                }
                catch (Exception ex) {
                    Console.WriteLine(ex.Message);
                }
                return false;
            }
        }
 
        internal static MqttClientService GetClient(string agv) {
            if (mqqtClients.Keys.Contains(agv)) {
                return mqqtClients[agv];
            }
            return null;
        }
 
        public static long GetTimeStamp(bool isMillisecond = true) {
            var ts = DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, 0);
            var timeStamp = isMillisecond ? Convert.ToInt64(ts.TotalMilliseconds) : Convert.ToInt64(ts.TotalSeconds);
            return timeStamp;
        }       
 
        /// <summary>
        /// 推送货位状态
        /// </summary>
        /// <param name="locCode"></param>
        /// <param name="status"></param>
        public static void pushLocStatus(string locCode, int status)
        {
            try
            {
                Location loc = LocationHelper.GetLoc(locCode);
                MqttClientService mqttClientService = null;
                mqqtClients.TryGetValue("数字孪生", out mqttClientService);
                mqttClientService.Publish("locationChange", JsonConvert.SerializeObject(new AreaLocStatusData()
                {
                    areaCode = loc.S_AREA_CODE,
                    locCode = locCode,
                    status = status
                }));
            }
            catch (Exception e) {
                Console.WriteLine(e.Message);
            }
        }
 
       
        public class AreaLocStatusData
        {
            public string areaCode { get; set; }
            public string locCode { get; set; }
            public int status { get; set; }
        }
 
    }
}