using HH.WCS.Mobox3.HD.models; using HH.WCS.Mobox3.HD.util; using HH.WCS.Mobox3.HD.wms; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; using MQTTnet.Server; using Newtonsoft.Json; using NLog; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Reflection; using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading; using System.Threading.Tasks; using uPLibrary.Networking.M2Mqtt; using static HH.WCS.Mobox3.HD.core.MQTTCore; namespace HH.WCS.Mobox3.HD.core { internal class MQTTCore { public static Dictionary mqqtClients = new Dictionary(); static MQTTCore() { LogHelper.Info("MQTTS开始加载.....", "MQTTS"); Init(); LogHelper.Info("MQTTS加载完成.....", "MQTTS"); } public static void Init() { try { MqttClientService mqttClientService = new MqttClientService(); LogHelper.Info("MQTT初始化参数:" + JsonConvert.SerializeObject(Settings.mqttServer), "MQTTS"); mqttClientService.MqttClientStart(Settings.mqttServer); mqqtClients.Add("数字孪生", mqttClientService); } catch (Exception ex) { LogHelper.Info("MQTT初始化错误....,错误原因:"+ex.Message, "MQTTS"); } } public class MqttClientService { public MqttClientOptionsBuilder optionsBuilder { get; set; } public MqttClientOptions clientOptions { get; set; } public IMqttClient _mqttClient { get; set; } public void MqttClientStart(Settings.MQTTServer mqttServer) { try { /*// 加载CA证书用于验证服务器证书 LogHelper.Info("加载CA证书.....", "MQTTS"); var caCert = X509Certificate.CreateFromCertFile(@"ca-cert.pem"); // 加载客户端证书和私钥 LogHelper.Info("加载客户端证书和私钥.....", "MQTTS"); var clientCert = new X509Certificate2(@"client.pfx", "yang04519999", X509KeyStorageFlags.Exportable);*/ LogHelper.Info("开始加载MQTTS.....", "MQTTS"); if (optionsBuilder == null) { optionsBuilder = new MqttClientOptionsBuilder().WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) .WithTcpServer(mqttServer.url, int.Parse(mqttServer.port)) // 要访问的mqtt服务端的 ip 和 端口号("romaiotbeta01.his.huawei.com", 8443) // .WithCredentials(mqttServer.username, mqttServer.password) // 要访问的mqtt服务端的用户名和密码("52CD6E25623A43C385BD0152ABD1FA7C", "iXYeA/z3vTzcMJs1P8t6Eg==") .WithClientId(mqttServer.clientId) // 设置客户端id("L002001A-0002") .WithCleanSession() .WithTls(new MqttClientOptionsBuilderTlsParameters { //UseTls = false // 是否使用 tls加密 UseTls = false, AllowUntrustedCertificates = false, // 不允许不受信任的证书 IgnoreCertificateChainErrors = false, // 不忽略证书链错误 IgnoreCertificateRevocationErrors = false, // 不忽略证书吊销错误 SslProtocol = SslProtocols.Tls12, // 使用TLS 1.2协议 CertificateValidationHandler = (o) => { return true; }, /*Certificates = new List(){ caCert, clientCert //newCert, },*/ }); } LogHelper.Info("加载完成.....", "MQTTS"); if (clientOptions == null) { clientOptions = optionsBuilder.Build(); } if (_mqttClient == null) { _mqttClient = new MqttFactory().CreateMqttClient(); _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 客户端连接成功事件 _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 客户端连接关闭事件 _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件 } if (!_mqttClient.IsConnected) { _mqttClient.ConnectAsync(clientOptions); } } catch (Exception e) { LogHelper.Info("启动MQTT客户端失败..... ,错误原因:" + e.Message, "MQTTS"); } } internal void Check() { if (_mqttClient != null) { if (!_mqttClient.IsConnected) { _mqttClient.ConnectAsync(clientOptions); } } else { Init(); } } public void close() { if (_mqttClient != null) { if (_mqttClient.IsConnected) { _mqttClient.DisconnectAsync(new MqttClientDisconnectOptions { ReasonString = "离线" }); } } } /// /// 客户端连接关闭事件 /// /// /// private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) { Console.WriteLine($"客户端已断开与服务端的连接……"); return Task.CompletedTask; } /// /// 客户端连接成功事件 /// /// /// private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) { Console.WriteLine($"客户端已连接服务端……"); // 订阅消息主题 // MqttQualityOfServiceLevel: (QoS): 0 最多一次,接收者不确认收到消息,并且消息不被发送者存储和重新发送提供与底层 TCP 协议相同的保证。 // 1: 保证一条消息至少有一次会传递给接收方。发送方存储消息,直到它从接收方收到确认收到消息的数据包。一条消息可以多次发送或传递。 // 2: 保证每条消息仅由预期的收件人接收一次。级别2是最安全和最慢的服务质量级别,保证由发送方和接收方之间的至少两个请求/响应(四次握手)。 //8qti7j822065 //d84y8ppo1291 /*_mqttClient.SubscribeAsync($"iot/v2/device/{Settings.productId}/item/{deiviceId}/event/datachange/reply", MqttQualityOfServiceLevel.AtLeastOnce); //topic_02 _mqttClient.SubscribeAsync($"iot/v2/device/{Settings.productId}/item/{deiviceId}/event/alarm/reply", MqttQualityOfServiceLevel.AtLeastOnce); //topic_02 */ /*1、设备心跳: 发布Topic: iot/v2/device/8qti7j822065/item/L002001A-0002/event/datachange/request 订阅Topic: iot/v2/device/8qti7j822065/item/L002001A-0002/event/datachange/reply 2、设备告警: 发布Topic: iot/v2/device/8qti7j822065/item/L002001A-0002/event/alarm/request 订阅Topic: iot/v2/device/8qti7j822065/item/L002001A-0002/event/alarm/reply */ return Task.CompletedTask; } /// /// 收到消息事件 /// /// /// private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { Console.WriteLine($"ApplicationMessageReceivedAsync:客户端ID=【{arg.ClientId}】接收到消息。 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】"); return Task.CompletedTask; } public bool Publish(string topic, string data) { try { LogHelper.Info($"主题:{topic} ,数据:{data}","MQTTS"); var message = new MqttApplicationMessage { Topic = topic, Payload = Encoding.Default.GetBytes(data), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Retain = false // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。 }; if (_mqttClient.IsConnected) { var res = _mqttClient.PublishAsync(message).Result; Console.WriteLine(data); return res.IsSuccess; } else { _mqttClient.ConnectAsync(_mqttClient.Options); } } catch (Exception ex) { Console.WriteLine(ex.Message); } return false; } } internal static MqttClientService GetClient(string agv) { if (mqqtClients.Keys.Contains(agv)) { return mqqtClients[agv]; } return null; } public static long GetTimeStamp(bool isMillisecond = true) { var ts = DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, 0); var timeStamp = isMillisecond ? Convert.ToInt64(ts.TotalMilliseconds) : Convert.ToInt64(ts.TotalSeconds); return timeStamp; } /// /// 推送货位状态 /// /// /// public static void pushLocStatus(string locCode, int status) { try { Location loc = LocationHelper.GetLoc(locCode); MqttClientService mqttClientService = null; mqqtClients.TryGetValue("数字孪生", out mqttClientService); mqttClientService.Publish("locationChange", JsonConvert.SerializeObject(new AreaLocStatusData() { areaCode = loc.S_AREA_CODE, locCode = locCode, status = status })); } catch (Exception e) { Console.WriteLine(e.Message); } } public class AreaLocStatusData { public string areaCode { get; set; } public string locCode { get; set; } public int status { get; set; } } } }