using HH.WCS.Mobox3.HD.models;
|
using HH.WCS.Mobox3.HD.util;
|
using HH.WCS.Mobox3.HD.wms;
|
using MQTTnet;
|
using MQTTnet.Client;
|
using MQTTnet.Protocol;
|
using MQTTnet.Server;
|
using Newtonsoft.Json;
|
using NLog;
|
using System;
|
using System.Collections.Generic;
|
using System.Linq;
|
using System.Net;
|
using System.Reflection;
|
using System.Security.Authentication;
|
using System.Security.Cryptography.X509Certificates;
|
using System.Text;
|
using System.Threading;
|
using System.Threading.Tasks;
|
using uPLibrary.Networking.M2Mqtt;
|
using static HH.WCS.Mobox3.HD.core.MQTTCore;
|
|
|
namespace HH.WCS.Mobox3.HD.core
|
{
|
internal class MQTTCore
|
{
|
public static Dictionary<string, MqttClientService> mqqtClients = new Dictionary<string, MqttClientService>();
|
|
static MQTTCore()
|
{
|
LogHelper.Info("MQTTS开始加载.....", "MQTTS");
|
Init();
|
LogHelper.Info("MQTTS加载完成.....", "MQTTS");
|
}
|
public static void Init() {
|
try
|
{
|
MqttClientService mqttClientService = new MqttClientService();
|
LogHelper.Info("MQTT初始化参数:" + JsonConvert.SerializeObject(Settings.mqttServer), "MQTTS");
|
mqttClientService.MqttClientStart(Settings.mqttServer);
|
mqqtClients.Add("数字孪生", mqttClientService);
|
}
|
catch (Exception ex) {
|
LogHelper.Info("MQTT初始化错误....,错误原因:"+ex.Message, "MQTTS");
|
}
|
}
|
public class MqttClientService
|
{
|
public MqttClientOptionsBuilder optionsBuilder { get; set; }
|
public MqttClientOptions clientOptions { get; set; }
|
public IMqttClient _mqttClient { get; set; }
|
|
public void MqttClientStart(Settings.MQTTServer mqttServer) {
|
try
|
{
|
/*// 加载CA证书用于验证服务器证书
|
LogHelper.Info("加载CA证书.....", "MQTTS");
|
var caCert = X509Certificate.CreateFromCertFile(@"ca-cert.pem");
|
|
// 加载客户端证书和私钥
|
LogHelper.Info("加载客户端证书和私钥.....", "MQTTS");
|
var clientCert = new X509Certificate2(@"client.pfx", "yang04519999", X509KeyStorageFlags.Exportable);*/
|
|
LogHelper.Info("开始加载MQTTS.....", "MQTTS");
|
if (optionsBuilder == null)
|
{
|
optionsBuilder = new MqttClientOptionsBuilder().WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
|
.WithTcpServer(mqttServer.url, int.Parse(mqttServer.port)) // 要访问的mqtt服务端的 ip 和 端口号("romaiotbeta01.his.huawei.com", 8443)
|
// .WithCredentials(mqttServer.username, mqttServer.password) // 要访问的mqtt服务端的用户名和密码("52CD6E25623A43C385BD0152ABD1FA7C", "iXYeA/z3vTzcMJs1P8t6Eg==")
|
.WithClientId(mqttServer.clientId) // 设置客户端id("L002001A-0002")
|
.WithCleanSession()
|
.WithTls(new MqttClientOptionsBuilderTlsParameters
|
{
|
//UseTls = false // 是否使用 tls加密
|
UseTls = false,
|
AllowUntrustedCertificates = false, // 不允许不受信任的证书
|
IgnoreCertificateChainErrors = false, // 不忽略证书链错误
|
IgnoreCertificateRevocationErrors = false, // 不忽略证书吊销错误
|
SslProtocol = SslProtocols.Tls12, // 使用TLS 1.2协议
|
CertificateValidationHandler = (o) => {
|
return true;
|
},
|
/*Certificates = new List<X509Certificate>(){
|
caCert,
|
clientCert
|
//newCert,
|
},*/
|
});
|
}
|
LogHelper.Info("加载完成.....", "MQTTS");
|
|
if (clientOptions == null) {
|
clientOptions = optionsBuilder.Build();
|
|
}
|
if (_mqttClient == null) {
|
_mqttClient = new MqttFactory().CreateMqttClient();
|
_mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 客户端连接成功事件
|
_mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 客户端连接关闭事件
|
_mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件
|
}
|
if (!_mqttClient.IsConnected) {
|
_mqttClient.ConnectAsync(clientOptions);
|
}
|
}
|
catch (Exception e)
|
{
|
LogHelper.Info("启动MQTT客户端失败..... ,错误原因:" + e.Message, "MQTTS");
|
}
|
}
|
|
internal void Check() {
|
if (_mqttClient != null) {
|
if (!_mqttClient.IsConnected) {
|
_mqttClient.ConnectAsync(clientOptions);
|
}
|
}
|
else {
|
Init();
|
}
|
}
|
|
public void close() {
|
if (_mqttClient != null) {
|
if (_mqttClient.IsConnected) {
|
_mqttClient.DisconnectAsync(new MqttClientDisconnectOptions { ReasonString = "离线" });
|
}
|
}
|
}
|
/// <summary>
|
/// 客户端连接关闭事件
|
/// </summary>
|
/// <param name="arg"></param>
|
/// <returns></returns>
|
private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) {
|
Console.WriteLine($"客户端已断开与服务端的连接……");
|
return Task.CompletedTask;
|
}
|
|
/// <summary>
|
/// 客户端连接成功事件
|
/// </summary>
|
/// <param name="arg"></param>
|
/// <returns></returns>
|
private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) {
|
Console.WriteLine($"客户端已连接服务端……");
|
|
// 订阅消息主题
|
// MqttQualityOfServiceLevel: (QoS): 0 最多一次,接收者不确认收到消息,并且消息不被发送者存储和重新发送提供与底层 TCP 协议相同的保证。
|
// 1: 保证一条消息至少有一次会传递给接收方。发送方存储消息,直到它从接收方收到确认收到消息的数据包。一条消息可以多次发送或传递。
|
// 2: 保证每条消息仅由预期的收件人接收一次。级别2是最安全和最慢的服务质量级别,保证由发送方和接收方之间的至少两个请求/响应(四次握手)。
|
|
//8qti7j822065
|
//d84y8ppo1291
|
/*_mqttClient.SubscribeAsync($"iot/v2/device/{Settings.productId}/item/{deiviceId}/event/datachange/reply", MqttQualityOfServiceLevel.AtLeastOnce); //topic_02
|
_mqttClient.SubscribeAsync($"iot/v2/device/{Settings.productId}/item/{deiviceId}/event/alarm/reply", MqttQualityOfServiceLevel.AtLeastOnce); //topic_02
|
*/
|
/*1、设备心跳:
|
发布Topic: iot/v2/device/8qti7j822065/item/L002001A-0002/event/datachange/request
|
订阅Topic: iot/v2/device/8qti7j822065/item/L002001A-0002/event/datachange/reply
|
2、设备告警:
|
发布Topic: iot/v2/device/8qti7j822065/item/L002001A-0002/event/alarm/request
|
订阅Topic: iot/v2/device/8qti7j822065/item/L002001A-0002/event/alarm/reply
|
*/
|
|
return Task.CompletedTask;
|
}
|
|
/// <summary>
|
/// 收到消息事件
|
/// </summary>
|
/// <param name="arg"></param>
|
/// <returns></returns>
|
private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) {
|
Console.WriteLine($"ApplicationMessageReceivedAsync:客户端ID=【{arg.ClientId}】接收到消息。 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
|
return Task.CompletedTask;
|
}
|
|
public bool Publish(string topic, string data) {
|
try
|
{
|
LogHelper.Info($"主题:{topic} ,数据:{data}","MQTTS");
|
var message = new MqttApplicationMessage
|
{
|
Topic = topic,
|
Payload = Encoding.Default.GetBytes(data),
|
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
|
Retain = false // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
|
};
|
if (_mqttClient.IsConnected)
|
{
|
var res = _mqttClient.PublishAsync(message).Result;
|
Console.WriteLine(data);
|
return res.IsSuccess;
|
}
|
else
|
{
|
_mqttClient.ConnectAsync(_mqttClient.Options);
|
}
|
}
|
catch (Exception ex) {
|
Console.WriteLine(ex.Message);
|
}
|
return false;
|
}
|
}
|
|
internal static MqttClientService GetClient(string agv) {
|
if (mqqtClients.Keys.Contains(agv)) {
|
return mqqtClients[agv];
|
}
|
return null;
|
}
|
|
public static long GetTimeStamp(bool isMillisecond = true) {
|
var ts = DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, 0);
|
var timeStamp = isMillisecond ? Convert.ToInt64(ts.TotalMilliseconds) : Convert.ToInt64(ts.TotalSeconds);
|
return timeStamp;
|
}
|
|
/// <summary>
|
/// 推送货位状态
|
/// </summary>
|
/// <param name="locCode"></param>
|
/// <param name="status"></param>
|
public static void pushLocStatus(string locCode, int status)
|
{
|
try
|
{
|
Location loc = LocationHelper.GetLoc(locCode);
|
MqttClientService mqttClientService = null;
|
mqqtClients.TryGetValue("数字孪生", out mqttClientService);
|
mqttClientService.Publish("locationChange", JsonConvert.SerializeObject(new AreaLocStatusData()
|
{
|
areaCode = loc.S_AREA_CODE,
|
locCode = locCode,
|
status = status
|
}));
|
}
|
catch (Exception e) {
|
Console.WriteLine(e.Message);
|
}
|
}
|
|
|
public class AreaLocStatusData
|
{
|
public string areaCode { get; set; }
|
public string locCode { get; set; }
|
public int status { get; set; }
|
}
|
|
}
|
}
|