基於NetMQ的數據對象傳輸方案
基於NetMQ的數據對象傳輸方案 1
一.總體框架 1
二.代碼示例 2
ref="https://zhuanlan.zhihu.com/write#__RefHeading___Toc465691301">2.1 傳輸消息封裝相關對象 2
2.2 消息處理 8
href="https://zhuanlan.zhihu.com/write#__RefHeading___Toc465691303">2.3客戶端示例代碼 11
數據傳輸系統主要是將大量井場數據通過一定的傳輸協議安全的傳輸到基地,並存儲到資料庫。
井場業務數據需要安全、快捷的傳到基地,井場一體化數據採集平台採集井場業務數據存儲到井場本地資料庫中(MySql),數據傳輸系統根據用戶定義策略,經過數據拆分後傳輸到基地的伺服器上,並存儲到Oracle資料庫中。因為涉及到多個客戶端,數據在接收時要識別是哪個客戶端傳來的數據。通過該系統工作人員可以對井場數據傳輸進行監控。
數據傳輸系統的主要功能是:將各個井場採集到的數據經過加密、壓縮後傳輸到伺服器,之後再進行解壓解密,整理後將數據存儲到資料庫中。
一.總體框架
圖1-1產品框圖
基於開源NetMQ請求回應模型(Request-Reply)實現輕量級對象傳輸解決方案。
NetMQ:ZeroMQ的.Net版本,ZeroMQ簡單來說就是區域網內的消息中間件(與MSMQ類似),包括了進程間通訊、點對點通訊、訂閱模式通訊等等,底層用更「完美」的Socket實現,ZeroMQ實現了多語言、跨平台、高效率等諸多優勢。
具體實現步驟如下:
把業務對象分別轉化為數據傳輸(DTO)對象、領域對象、數據訪問對象(DAO),並提供某種機制方便的實現對象之間的相互轉換。業務對象與資料庫脫離,通過職責庫與數據持久層進行交互。DTO對象只包括簡單的屬性用來進行數據傳輸,包括跨邏輯層傳輸和跨進程和網路的遠程傳輸。Domain對象用於實現業務邏輯,包含屬性和方案,DAO對象擁有與資料庫交互。
傳輸客戶端從客戶端依照特定的策略收集需要更新的業務對象,轉化成方便傳輸使用的DTO對象,傳輸消息管理組件把對象轉換成JSON格式的字元串,對字元串形式數據對象進行數據驗證、壓縮、加密後,按配置進行消息封裝、拆分、和打包,發現送給NETMQ Client對象。
NETMQ Client通過設定的網路協議和埠把基於消息形式的數據對象傳輸到伺服器上。
傳輸伺服器上把接收到的消息進行解壓、解密、數據完整性驗證等,把消息存儲到隊列中。
伺服器上的消息處理服務和數據存儲服務對接收到的數據進行處理,把字元串還原成DTO對象。
DTO對象使用伺服器上配置的職責庫服務(轉換DTO對象為DAO對象),調用伺服器上配置的數據持久層(http://IBatis.Net)進行入庫操作。
二.代碼示例
2.1 傳輸消息封裝相關對象
傳輸信封對象:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ServerApp.Common
{
public class MessageEnvelope
{
public string MetaDataClass { get; set; }
public string RepositoryMetaDataClass { get; set; }
public string Version { get; set; }
public MessageHeader Header { get; set; }
public MessageBody Body { get; set; }
public string CreatedTime { get;set;}
public string UniqueKey { get; set; }
public string MD5 { get; set; }
/// <summary>
/// 0,add;1,update;2,delete
/// </summary>
public string UpLoadType { get; set; }
public MessageEnvelope()
{
this.CreatedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff");
this.UniqueKey = System.Guid.NewGuid().ToString("N");
this.Body = new MessageBody();
this.Header = new MessageHeader();
}
}
}
傳輸標題對象
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ServerApp.Common
{
public class MessageHeader
{
public string EncodingStyle { get; set; }
public string Actor { get; set; }
public string NextActor { get; set; }
public string TockenKey { get; set; }
}
}
傳輸體對象
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ServerApp.Common
{
public class MessageBody
{
public List<object> Items { get; set; }
}
}
傳輸失敗對象
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ServerApp.Common
{
public class MessageFault
{
public string FaultCode {get;set;}
public string FaultString {get;set;}
public string FaultActor {get;set;}
}
}
傳輸客戶端發送對象
using NetMQ.Sockets;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using Newtonsoft.Json;
namespace ServerApp.Common
{
public class ClientSender
{
private IList<MessageEnvelope> envelopeList;
private int envelopeMaxCounts = 100;
private MessageHeader messageHeader = new MessageHeader();
private MessageEnvelope messageEnvelope = new MessageEnvelope();
private string serverURL = "";
public event EventHandler NotifyMessage;
public void Notify(string message)
{
if(this.NotifyMessage != null)
{
NotifyMessage(message, EventArgs.Empty);
}
}
public ClientSender(string serverURL)
{
this.serverURL = serverURL;
messageHeader = new MessageHeader();
messageEnvelope = new MessageEnvelope();
envelopeList = new List<MessageEnvelope>();
}
public ClientSender(string serverURL,MessageHeader header = null,MessageEnvelope envelope = null)
: this(serverURL)
{
this.messageHeader = header;
this.messageEnvelope = envelope;
}
public void AddNewEnvelope(List<object> addNewList)
{
List<List<object>> listGroup = SplitObjectList(addNewList);
foreach (var item in listGroup)
{
MessageEnvelope messageEnvelope = new MessageEnvelope();
messageEnvelope.Header = this.messageHeader;
messageEnvelope.UpLoadType = "0";
messageEnvelope.Body.Items = item;
envelopeList.Add(messageEnvelope);
}
}
public void AddUpdateEnvelope(List<object> updateList)
{
List<List<object>> listGroup = SplitObjectList(updateList);
foreach (var item in listGroup)
{
MessageEnvelope messageEnvelope = new MessageEnvelope();
messageEnvelope.Header = this.messageHeader;
messageEnvelope.UpLoadType = "1";
messageEnvelope.Body.Items = item;
envelopeList.Add(messageEnvelope);
}
}
private static List<List<object>> SplitObjectList(List<object> updateList)
{
List<List<object>> listGroup = new List<List<object>>();
int j = 10;
for (int i = 0; i < updateList.Count; i += 10)
{
List<object> cList = new List<object>();
cList = updateList.Take(j).Skip(i).ToList();
j += 10;
listGroup.Add(cList);
}
return listGroup;
}
public void Run()
{
//Task.Run(() =>
{
using (var sender = new RequestSocket())
{
sender.Connect("tcp://127.0.0.1:5555");
this.Notify("connect to " + this.serverURL);
Console.WriteLine("Sending tasks to workers");
//RuntimeTypeModel.Default.MetadataTimeoutMilliseconds = 300000;
int taskNumber = 0;
foreach (var envelope in this.envelopeList)
{
taskNumber++;
Console.WriteLine("Workload : {0}", "dd");
var encoding = System.Text.Encoding.ASCII;
var message = GetEnvelopeContent(envelope);
var msg = new Msg();
byte[] byteArray = System.Text.Encoding.UTF8.GetBytes(message);
msg.InitGC(byteArray, byteArray.Length);
sender.Send(ref msg, false);
msg.Close();
var resultMessage = sender.ReceiveFrameString(System.Text.Encoding.UTF8);
this.Notify(string.Format("process : {0} finished.{1}", taskNumber, resultMessage));
}
}
// });
}
;
}
private string GetEnvelopeContent(MessageEnvelope envelope)
{
string output = JsonConvert.SerializeObject(envelope);
return output;
}
//private string GetContent(int taskNumber)
//{
// MessageEnvelope message = new MessageEnvelope();
// message.Body = new MessageBody();
// message.Header = new MessageHeader();
// AutUserDTO user;
// List<object> users = new List<object>();
// for (var i = 0; i < 200; i++)
// {
// user = new AutUserDTO();
// user.Password = "1233";
// user.UserId = System.Guid.NewGuid().ToString("N");
// user.UserName = "newlj" + i.ToString();
// user.PersonName = "lj1";
// user.TelNumber = "1";
// user.UserStatus = "1";
// user.ValMethod = "1";
// user.PositionId = "1";
// user.UserType = "1";
// user.OrgId = "1";
// user.ModifyUserId = "1";
// user.ModifyDate = DateTime.Now;
// user.EmployeeId = "1";
// user.CreateUserId = "1";
// user.AdminFlg = "1";
// user.Desp = "1";
// user.DeleteFlg = "1";
// user.Email = "testlj@mmmm.com";
// user.PsdErrCount = 0;
// user.Locked = "1";
// users.Add(user);
// }
// message.Body.Items = users;
// message.Version = "1.0";
// message.MetaDataClass = typeof(AutUserDTO).ToString();
// message.RepositoryMetaDataClass = typeof(IAutUserRepository).ToString();
// // message.CreatedTime = DateTime.Now.ToString();
// string output = JsonConvert.SerializeObject(message);
// //var message = "this is test" + taskNumber.ToString();
// return output;
//}
}
}
但是因為Web Service 的開放性和通用性,為了能夠保護信息系統的安全,對Web Service 的安全性提出了很高的要求。Web Service 迫切需要一個完整的安全服務框架,來為上層應用開發提供全面的安全服務。構建Web Service 的安全框架的困難在於:web service 是非常分散式的,並且關鍵的安全實現和演算法都是由不同提供商實現的。將各分散的業務部門和它們原先的異構的安全系統和架構統一集成到Web Service 安全和業務平台上,並且能夠以一種信任關係在各部門應用之間共享用戶信息、描述和許可權是一個擺在面前的巨大挑戰。
為什麼需要安全的可信的Web Services
與過去十年中客戶/伺服器和基於Web 的應用一樣,XML Web Services 給應用開發和信息系統的構建帶來了革命性的影響。通過使用標準協議,如XML、SOAP、WSDL 和UDDI,應用能夠更容易的相互通訊,並且更快、更便宜的進行應用集成,供應鏈集成,實現分散式的服務模型。
XML Web Service 介面是基於XML 和松耦合的。XML 和SOAP 允許任意系統間進行相互通訊,無論它是一個Office XP 桌面還是一個大型主機系統。隨著自動化業務流程集成的越來越普及,越來越多各式各樣的系統通過Web Service 加入到一個廣泛的Web Service 集成環境中去,因此出現了以下一些問題:
?非集中的架構
?非集中的管理
?用異構的技術實現
?多個部門間相互連接
?多個企業間相互連接
?天然的對等的架構
?有可能對Internet 開放
上面的每一個問題都是對系統安全的嚴峻挑戰。如何跨越多個異構系統在整個環境中實施一個安全策略?如何為一個不了解安全系統的外部提供商提供安全服務?如何監視和審計跨越多個異構系統的安全活動事件? 要解決上述問題,僅依賴於傳統的防火牆和入侵監測系統是不足夠的,即使加上了SSL 和VPN 也只是解決了數據在網路中安全傳輸的問題,並沒有解決跨系統的認證和訪問授權問題,也沒能解決面向Internet 的服務安全問題。要解決這些問題,需要提供一個完整的基於Web Service 的安全和企業應用集成架構。DCI 架構以及產品系列提供了對上述問題的完整解決方案(完整的架構說明請看另文)。
2.2 消息處理
傳輸消息處理對象介面:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ServerApp.Interface
{
public interface IMessageProcessService
{
bool Proccess(string message);
void Run();
}
}
using Newtonsoft.Json;
using RichFit.Drilling.Infrastructure.Interface.DTO;
using ServerApp.Common;
using ServerApp.Interface;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Practices.Unity;
using RichFit.Drilling.Infrastructure.Interface;
using RichFit.Drilling.RepositoryService.RepositoryImpl;
using RichFit.Foundation.Service.Repository;
namespace ServerApp.Service
{
public class MessageProcessService : IMessageProcessService
{
private readonly IMessageStorageService messageStorageService;
public MessageProcessService(IMessageStorageService messageStorageService)
{
this.messageStorageService = messageStorageService;
Run();
}
public void Run()
{
Task.Run(() =>
{
while (true)
{
string message = messageStorageService.TryDequeue();
if(message != null)
{
Task.Run(() =>
{
Proccess(message);
});
}
//Thread.Sleep(100);
}
});
}
public bool Proccess(string message)
{
MessageEnvelope deserializedMessage = JsonConvert.DeserializeObject<MessageEnvelope>(message);
var container = AppContainer.Container;
Type t = Assembly.GetAssembly(typeof(AutUserDTO)).GetType(deserializedMessage.MetaDataClass);
Type trep = Assembly.GetAssembly(typeof(AutUserDTO)).GetType(deserializedMessage.RepositoryMetaDataClass);
var userR = container.Resolve(trep, null); //container.Resolve<IAutUserRepository>();
var rep = userR as AutUserRepository;
var methodinfo = userR.GetType().GetMethod("Add");
(userR as RepositoryBase).UnitOfWork.Begin();
foreach (var item in deserializedMessage.Body.Items)
{
//Task.Run(() =>
//{
var user1 = JsonConvert.DeserializeObject(item.ToString(), t);
AutUserDTO u = user1 as AutUserDTO;
methodinfo.Invoke(userR, new object[] { u });
//rep.Add(u)
// });
}
(userR as RepositoryBase).UnitOfWork.Commit();
Console.WriteLine("processed------------------finished" + deserializedMessage.CreatedTime);
return true;
}
}
}
傳輸對象存儲介面:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ServerApp.Interface
{
public interface IMessageStorageService
{
void Save(string message);
string Remove();
string TryDequeue();
List<string> GetList();
}
}
using ServerApp.Interface;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using ServerApp.Common;
namespace ServerApp.Service
{
public class MessageStorageService : IMessageStorageService
{
internal static ConcurrentQueue<string> messageQueue = new ConcurrentQueue<string>();
public void Save(string message)
{
messageQueue.Enqueue(message);
}
public string TryDequeue()
{
string result;
if (messageQueue.TryDequeue(out result))
{
return result;
}
return null;
}
public string Remove()
{
string result;
if(messageQueue.TryDequeue(out result))
{
return result;
}
return null;
}
public List<string> GetList()
{
return messageQueue.ToList<string>();
}
}
}
2.3客戶端示例代碼
using Newtonsoft.Json;
using RichFit.Drilling.Infrastructure.Interface.DTO;
using ServerApp.Common;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
MessageEnvelope message = new MessageEnvelope();
message.Body = new MessageBody();
message.Header = new MessageHeader();
AutUserDTO user;
List<object> users = new List<object>();
for(var i=0;i<12;i++)
{
user = new AutUserDTO();
user.PersonName = "lj1";
users.Add(user);
}
message.Body.Items = users;
message.Version = "1.0";
message.MetaDataClass = typeof(AutUserDTO).ToString();
string output = JsonConvert.SerializeObject(message);
MessageEnvelope deserializedProduct = JsonConvert.DeserializeObject<MessageEnvelope>(output);
Type t = Assembly.GetAssembly(typeof(AutUserDTO)).GetType(message.MetaDataClass);
var user1 = JsonConvert.DeserializeObject(deserializedProduct.Body.Items[0].ToString(), t);
AutUserDTO u = user1 as AutUserDTO;
Console.WriteLine("====== VENTILATOR ======");
ClientSender client1 = new ClientSender("");
List<object> newl = new List<object>();
newl.Add("dddd");
newl.Add("dddd");
newl.Add("dddd");
newl.Add("dddd");
newl.Add("dddd");
client1.AddNewEnvelope(newl);
client1.Run();
Console.ReadLine();
Console.ReadLine();
}
}
}
?
這就需要各個系統能夠按照統一的系統管理標準進行遠程管理並提供系統的安全狀態信息。
推薦閱讀:
※歷史上有沒有什麼大家一直看好最後黃了的事情?
※開發一個 Windows 級別的操作系統難度有多大?
※我自己編寫了一個電腦小程序,如何讓它聯網呢?
※國內智能家居發展現狀如何?
※通信業的明日之星——網路虛擬化