Commit 4e5817fa authored by 潘栩锋's avatar 潘栩锋 🚴

添加 7E协议支持批量包发送,不等回复已经发送

parent 59014699
......@@ -11,13 +11,24 @@ using System.Windows.Threading;
namespace GeneralGommunication
{
/// <summary>
/// 独立线程运行。 固需要做线程同步。
/// 支持Push, Call
/// 其中Call, 一“问”,必须一“答”。 长时间没答,重发“问”,连续3次都没 "答" 触发通讯断开。
/// 正常模式,多个"问" 会储存在缓存区, 等上一条 "答" ,才能发下一条"问"。
/// 打包模式, 多个"问" 没有先后关系,把它们打包, 不等 "答" 就同时发送"问"。 但 还是必须等 全部 "答" 都回复,才能执行一个动作。
/// </summary>
public abstract class Dev7E:IDev7E
{
protected Logger logger = NLog.LogManager.GetCurrentClassLogger();
public Logger logger = NLog.LogManager.GetCurrentClassLogger();
public event PropertyChangedEventHandler PropertyChanged;
#region 测量速度
#region 测量通讯速度
/// <summary>
/// 通讯速度 测量中
/// </summary>
public bool IsMeasuring { get; private set; }
/// <summary>
......@@ -26,11 +37,10 @@ namespace GeneralGommunication
public double CommSpeed { get; private set; }
/// <summary>
/// 传输速度 单位 pack/s
/// 通讯速度 单位 pack/s
/// </summary>
public double PackSpeed { get; private set; }
#endregion
/// <summary>
......@@ -64,12 +74,12 @@ namespace GeneralGommunication
/// <summary>
/// 指令队列,必须等上1条指令回复了,才能发下条指令
/// </summary>
List<COMMREQ_Transaction> Transactions;
List<COMMREQ_TransactionBase> Transactions;
/// <summary>
/// 当前正在等待回复的指令
/// </summary>
COMMREQ_Transaction currTran;
COMMREQ_TransactionBase currTran;
/// <summary>
/// currTran 发送后,开始计时
/// </summary>
......@@ -79,8 +89,6 @@ namespace GeneralGommunication
/// </summary>
int retryCnt = 0;
/// <summary>
/// 指令前序 序号
/// </summary>
......@@ -88,7 +96,7 @@ namespace GeneralGommunication
public Dev7E()
{
Transactions = new List<COMMREQ_Transaction>();
Transactions = new List<COMMREQ_TransactionBase>();
stopwatch_timeOut = new Stopwatch();
}
......@@ -225,24 +233,84 @@ namespace GeneralGommunication
return null;//已经发送了,计时器都启动了
}
//找出 COMMREQ
var commReq = currTran.commReq;
List<byte> pack = new List<byte>();
pack.AddRange(commReq.Prefix);
if (currTran.datas != null)
pack.AddRange(currTran.datas);
if (currTran.datasObj == null)
logger.Debug($"REQ {commReq.PrefixString}");
if (currTran is COMMREQ_Transaction)
{
var tran = currTran as COMMREQ_Transaction;
//找出 COMMREQ
var commReq = tran.commReq;
List<byte> pack = new List<byte>();
//添加前序
pack.AddRange(commReq.Prefix);
//添加数据
if (tran.datas != null)
pack.AddRange(tran.datas);
//调试: 打印发送信息
if (tran.datasObj == null)
logger.Debug($"REQ {commReq.PrefixString}");
else
logger.Debug($"REQ {commReq.PrefixString} {Newtonsoft.Json.JsonConvert.SerializeObject(tran.datasObj)}");
//byte[] prefix = COMMREQ.ToPrefix("RCD", 0);
//if (prefix.SequenceEqual(commReq.Prefix))
//{
//}
//转为 7E格式
var buf = GetSendPack(pack);
//开始计时
stopwatch_timeOut.Restart();
return buf.ToArray();
}
else
logger.Debug($"REQ {commReq.PrefixString} {Newtonsoft.Json.JsonConvert.SerializeObject(currTran.datasObj)}");
{
//集群交易
List<byte> bufs = new List<byte>();
var _tran = currTran as COMMREQ_TransactionMulti;
//获取还没接收到数据的交易,不能没有交易
var trans = _tran.transactions.FindAll(t => !t.hasRet);
if (trans.Count() == 0)
throw new Exception("没有 [没接收到数据的交易],程序写错");
//获取需要发送出去的数据
var buf = GetSendPack(pack).ToArray();
foreach (var tran in trans)
{
//找出 COMMREQ
var commReq = tran.commReq;
//开始计时
stopwatch_timeOut.Restart();
List<byte> pack = new List<byte>();
return buf;
//添加前序
pack.AddRange(commReq.Prefix);
//添加数据
if (tran.datas != null)
pack.AddRange(tran.datas);
//转为 7E格式
var buf = GetSendPack(pack);
bufs.AddRange(buf);
//调试: 打印发送信息
if (tran.datasObj == null)
logger.Debug($"REQ multi {commReq.PrefixString}");
else
logger.Debug($"REQ multi {commReq.PrefixString} {Newtonsoft.Json.JsonConvert.SerializeObject(tran.datasObj)}");
}
//开始计时
stopwatch_timeOut.Restart();
return bufs.ToArray();
}
}
/// <summary>
......@@ -296,6 +364,15 @@ namespace GeneralGommunication
}
protected abstract void ParsePackAfterCheckCRC8(byte[] buf);
string bytes2hex(byte[] pack)
{
StringBuilder sb = new StringBuilder();
for (int i = 0; i < pack.Count(); i++)
{
sb.Append($"{pack[i]:X2} ");
}
return sb.ToString();
}
/// <summary>
/// 功能包解析
/// </summary>
......@@ -308,30 +385,58 @@ namespace GeneralGommunication
return;
}
var commReq = currTran.commReq;
if (currTran is COMMREQ_Transaction)
{
ParseFuncPack_Single(pack);
}
else
{
ParseFuncPack_Multi(pack);
}
}
protected bool ParseFuncPack_Single(byte[] pack)
{
object retData = null;
var tran = currTran as COMMREQ_Transaction;
var commReq = tran.commReq;
//还有B0
if (commReq.ReponseTotalLen + PrefixIndex > pack.Count())
{
//失败,指令长度不对!!
logger.Error($"ACK expect:{commReq.PrefixString} len={commReq.ReponseTotalLen + 1}, but reponse len ={pack.Count()} reponse: {Newtonsoft.Json.JsonConvert.SerializeObject(pack)}");
logger.Error($"ACK expect:{commReq.PrefixString} len={commReq.ReponseTotalLen + 1}, but reponse len ={pack.Count()} reponse: {bytes2hex(pack)}");
ErrCnt++;
return;
return false;
}
if (!IsMatch(commReq, pack))
if (commReq.IsMatch != null)
{
//回复对不上请求
logger.Error($"ACK expect:{commReq.PrefixString} ,but reponse: {Newtonsoft.Json.JsonConvert.SerializeObject(pack)}");
return;
if (!commReq.IsMatch(commReq, pack, PrefixIndex, commReq.IsMatchContext, ref retData))
{
//回复对不上请求
logger.Error($"ACK expect:{commReq.PrefixString} len={commReq.ReponseTotalLen + 1}, but reponse len ={pack.Count()} reponse: {bytes2hex(pack)}");
ErrCnt++;
return false;
}
}
else
{
if (!IsMatch(commReq, pack))
{
//回复对不上请求
logger.Error($"ACK expect:{commReq.PrefixString} len={commReq.ReponseTotalLen + 1}, but reponse len ={pack.Count()} reponse: {bytes2hex(pack)}");
ErrCnt++;
return false;
}
//处理指令
//还有B0
object retData = null;
if (commReq.ParseFuncPack != null)//解析回复数据
retData = commReq.ParseFuncPack(pack, commReq.Prefix.Count() + PrefixIndex);
//处理指令
//还有B0
if (commReq.ParseFuncPack != null)//解析回复数据
retData = commReq.ParseFuncPack(pack, commReq.Prefix.Count() + PrefixIndex);
}
tran.hasRet = true;
tran.retData = retData;
if (retData == null)
logger.Debug($"ACK {commReq.PrefixString}");
......@@ -340,28 +445,154 @@ namespace GeneralGommunication
//有很多指令是没有回复数据的, 回调只是通知 指令已经执行了而已
//调用回调
if (currTran.asyncDelegate != null)
if (tran.asyncDelegate != null)
{
if (Dispatcher != null)//线程同步执行
{
Dispatcher.BeginInvoke(currTran.asyncDelegate, currTran.asyncContext, retData);
Dispatcher.BeginInvoke(tran.asyncDelegate, tran.asyncContext, retData);
}
else
{
currTran.asyncDelegate.Invoke(currTran.asyncContext, retData);
tran.asyncDelegate.Invoke(tran.asyncContext, retData);
}
}
//停止超时检测
stopwatch_timeOut.Stop();
//空出当前交易位置
currTran = null;
if (Transactions.Count() > 0)
{
//队列还有需要发送的指令
//队列还有需要发送的指令,通知外部获取指令发送
SendMsgEvent?.Invoke(this);
}
return true;
}
bool IsMatch2(COMMREQ commReq, byte[] pack, ref object retData)
{
//还有B0
if (commReq.ReponseTotalLen + PrefixIndex > pack.Count())
{
//失败,指令长度不对!!
//logger.Error($"ACK expect:{commReq.PrefixString} len={commReq.ReponseTotalLen + 1}, but reponse len ={pack.Count()} reponse: {bytes2hex(pack)}");
//ErrCnt++;
return false;
}
if (commReq.IsMatch != null)
{
//自定义IsMatch规则
if (!commReq.IsMatch(commReq, pack, PrefixIndex, commReq.IsMatchContext, ref retData))
{
//回复对不上请求
//logger.Error($"ACK expect:{commReq.PrefixString} ,but reponse: {bytes2hex(pack)}");
return false;
}
}
else
{
//通用IsMatch规则
if (!IsMatch(commReq, pack))
{
//回复对不上请求
//logger.Error($"ACK expect:{commReq.PrefixString} ,but reponse: {bytes2hex(pack)}");
return false;
}
//处理指令
//还有B0
if (commReq.ParseFuncPack != null)//解析回复数据
retData = commReq.ParseFuncPack(pack, commReq.Prefix.Count() + PrefixIndex);
}
return true;
}
protected bool ParseFuncPack_Multi(byte[] pack)
{
object retData = null;
var _tran = currTran as COMMREQ_TransactionMulti;
//寻找合适的commReq
var trans = _tran.transactions.FindAll(t => !t.hasRet);
COMMREQ commReq = null;
COMMREQ_Transaction tran = null;
foreach (var t in trans)
{
tran = t;
commReq = tran.commReq;
if (IsMatch2(commReq, pack, ref retData))//匹配正确,就是这个tran的回复
break;
commReq = null;
tran = null;
}
if (commReq == null) {
//不能解析
logger.Error($"ACK multi reponse len ={pack.Count()} reponse: {bytes2hex(pack)}");
ErrCnt++;
return false;
}
tran.hasRet = true;
tran.retData = retData;
//解析成功
if (retData == null)
logger.Debug($"ACK multi {commReq.PrefixString}");
else
logger.Debug($"ACK multi {commReq.PrefixString} {Newtonsoft.Json.JsonConvert.SerializeObject(retData)}");
//有很多指令是没有回复数据的, 回调只是通知 指令已经执行了而已
//调用回调
if (tran.asyncDelegate != null)
{
if (Dispatcher != null)//线程同步执行
{
Dispatcher.BeginInvoke(tran.asyncDelegate, tran.asyncContext, retData);
}
else
{
tran.asyncDelegate.Invoke(tran.asyncContext, retData);
}
}
if (_tran.transactions.All(t => t.hasRet))
{
//完成, 全部回复
if (_tran.asyncDelegate != null)
{
if (Dispatcher != null)//线程同步执行
{
Dispatcher.BeginInvoke(_tran.asyncDelegate, _tran.asyncContext, _tran);
}
else
{
tran.asyncDelegate.Invoke(_tran.asyncContext, _tran);
}
}
//停止超时检测
stopwatch_timeOut.Stop();
//空出当前交易位置
currTran = null;
if (Transactions.Count() > 0)
{
//队列还有需要发送的指令,通知外部获取指令发送
SendMsgEvent?.Invoke(this);
}
return true;
}
return false;
}
/// <summary>
/// 与指令前序比较,一定要完全一样
......@@ -405,8 +636,55 @@ namespace GeneralGommunication
}
protected void AddTran(COMMREQ_Transaction tran)
{
if (transactionMulti!=null)
{
transactionMulti.transactions.Add(tran);
}
else
{
//放入 交易队列
Transactions.Add(tran);
if (currTran == null)
{
//当前没有指令正在发送
SendMsgEvent?.Invoke(this);
}
}
}
COMMREQ_TransactionMulti transactionMulti;
void BuildMultiTransAsync()
{
if (transactionMulti!=null) {
throw new Exception("已经是 BuildMultiTrans, 不能再次执行, 必须调用 AddMultiTrans");
}
transactionMulti = new COMMREQ_TransactionMulti();
}
public void BuildMultiTrans(Action action)
{
BuildMultiTrans(action, null, null);
}
public void BuildMultiTrans(Action action, CallBackHandler asyncDelegate, object asyncContext)
{
BuildMultiTransAsync();
action();
AddMultiTransAsync(asyncDelegate, asyncContext);
}
void AddMultiTransAsync(CallBackHandler asyncDelegate, object asyncContext)
{
if (transactionMulti.transactions.Count() == 0) {
throw new Exception("MultiTrans 为空");
}
transactionMulti.asyncDelegate = asyncDelegate;
transactionMulti.asyncContext = asyncContext;
//放入 交易队列
Transactions.Add(tran);
Transactions.Add(transactionMulti);
transactionMulti = null;
if (currTran == null)
{
//当前没有指令正在发送
......
......@@ -190,7 +190,7 @@ namespace GeneralGommunication
if (!IsRunning)
return;
IsRunning = false;
IsConnected = false;
cancellation.Cancel();
sp.Close();
}
......
......@@ -48,9 +48,9 @@ namespace GeneralGommunication
public event PropertyChangedEventHandler PropertyChanged;
Socket sock;
CancellationTokenSource cancellation;
CancellationTokenSource cancellation_waitforSend;
CancellationTokenSource cts_readTask;
CancellationTokenSource cts_waitForSend;
CancellationTokenSource cts_sendTask;
public GComm_TcpClient()
{
......@@ -64,9 +64,9 @@ namespace GeneralGommunication
if (IsRunning)
return;
IsRunning = true;
cancellation = new CancellationTokenSource();
cts_readTask = new CancellationTokenSource();
Task.Factory.StartNew(OnTask, cancellation.Token);
Task.Factory.StartNew(OnTask, cts_readTask.Token);
}
/// <summary>
......@@ -77,8 +77,8 @@ namespace GeneralGommunication
if (!IsRunning)
return;
IsRunning = false;
cancellation.Cancel();
IsConnected = false;
cts_readTask.Cancel();
if (sock != null && sock.Connected)
{
sock.Close();
......@@ -88,7 +88,7 @@ namespace GeneralGommunication
void OnTask()
{
while (!cancellation.IsCancellationRequested)
while (!cts_readTask.IsCancellationRequested)
{
ConnectTask();
......@@ -96,17 +96,27 @@ namespace GeneralGommunication
{
sendBuf.Clear();
//启动发送task
Task.Factory.StartNew(SendTask, cancellation.Token);
cts_sendTask = new CancellationTokenSource();
var sendtask = Task.Factory.StartNew(SendTask, cts_sendTask.Token);
//进入接收task
ReceiveTask();
//退出了,肯定连接断开了
//需要等待 SendTask 也退出了
cts_sendTask.Cancel();
sendtask.Wait();
if (sock != null)
{
sock.Close();
}
}
//休息一会儿,在重连
try
{
Task.Delay(2000, cancellation.Token).Wait();
Task.Delay(2000, cts_readTask.Token).Wait();
}
catch (Exception e)
{
......@@ -126,14 +136,12 @@ namespace GeneralGommunication
sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
sock.Blocking = true;
//1秒内,需要收到信息
sock.ReceiveTimeout = 1000;
//1秒内,必须发送完
sock.SendTimeout = 1000;
while (!cancellation.IsCancellationRequested)
while (!cts_readTask.IsCancellationRequested)
{
try
{
......@@ -150,7 +158,7 @@ namespace GeneralGommunication
}
try
{
Task.Delay(2000, cancellation.Token).Wait();
Task.Delay(2000, cts_readTask.Token).Wait();
}
catch (Exception e)
{
......@@ -166,7 +174,7 @@ namespace GeneralGommunication
void ReceiveTask()
{
byte[] buf = new byte[0x10000];
while (!cancellation.IsCancellationRequested)
while (!cts_readTask.IsCancellationRequested)
{
int len;
try
......@@ -196,10 +204,7 @@ namespace GeneralGommunication
DataReceived?.Invoke(this, buf.Take(len).ToArray());
}
if (sock != null)
{
sock.Close();
}
IsConnected = false;
}
......@@ -208,14 +213,24 @@ namespace GeneralGommunication
/// </summary>
void SendTask()
{
while (!cancellation.IsCancellationRequested)
while (!cts_sendTask.IsCancellationRequested)
{
while (sendBuf.Count > 0)
CancellationTokenSource cts3;
while (true)
{
byte[] buffer;
lock (sendBuf)
{
if (sendBuf.Count <= 0)
break;
buffer = sendBuf.ToArray();
}
int slen;
try
{
slen = sock.Send(sendBuf.ToArray());
slen = sock.Send(buffer);
}
catch (Exception e)
{
......@@ -223,26 +238,42 @@ namespace GeneralGommunication
IsConnected = false;
break;
}
if (slen > 0)
sendBuf.RemoveRange(0, slen);
else if (slen <= 0)
if (slen <= 0)
{
//连接断开了
IsConnected = false;
break;
}
else
{
lock (sendBuf)
{
sendBuf.RemoveRange(0, slen);
}
}
}
//重新等 下次被呼醒
cancellation_waitforSend = new CancellationTokenSource();
lock (sendBuf)
{
// 呼醒 发送task
if (cts_waitForSend == null)
{
cts_waitForSend = new CancellationTokenSource();
}
//cts_waitForSend 与 cts_sendTask 合体, 任意一个都会呼醒 delay
cts3 = CancellationTokenSource.CreateLinkedTokenSource(cts_waitForSend.Token, cts_sendTask.Token);
}
try
{
Task.Delay(-1, cancellation_waitforSend.Token).Wait();
Task.Delay(-1, cts3.Token).Wait();
}
catch (Exception e)
{
//被打断, 有数据需要发送
}
cts_waitForSend = null;
}
}
/// <summary>
......@@ -259,13 +290,19 @@ namespace GeneralGommunication
if (!IsConnected)
return;
lock (sendBuf)
{
//放入,发送缓存区
sendBuf.AddRange(buf);
//放入,发送缓存区
sendBuf.AddRange(buf);
//呼醒 发送task
cancellation_waitforSend.Cancel();
//呼醒 发送task
if (cts_waitForSend == null)
{
cts_waitForSend = new CancellationTokenSource();
}
cts_waitForSend.Cancel();
}
}
/// <summary>
......
......@@ -9,7 +9,10 @@ namespace GeneralGommunication
{
public interface IDev7E : INotifyPropertyChanged
{
#region 测量速度
#region 测量通讯速度
/// <summary>
/// 通讯速度 测量中
/// </summary>
bool IsMeasuring { get; }
/// <summary>
......@@ -18,7 +21,7 @@ namespace GeneralGommunication
double CommSpeed { get; }
/// <summary>
/// 传输速度 单位 pack/s
/// 通讯速度 单位 pack/s
/// </summary>
double PackSpeed { get; }
......@@ -100,26 +103,31 @@ namespace GeneralGommunication
/// </summary>
public class COMMREQ
{
private byte[] prefix;
/// <summary>
/// 指令 前缀
/// </summary>
public byte[] Prefix;
public string PrefixString
public byte[] Prefix
{
get { return prefix; }
set {
prefix = value;
updatePrefixString();
}
}
public string PrefixString { get; private set; }
void updatePrefixString()
{
get
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < Prefix.Count(); i++)
{
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < Prefix.Count(); i++)
{
if (Prefix[i] >= 33 || Prefix[i] <= 126)
stringBuilder.Append((char)Prefix[i]);
else
stringBuilder.Append($" 0x{Prefix[i]:X2} ");
}
return stringBuilder.ToString();
if (Prefix[i] >= 33 && Prefix[i] <= 126)
stringBuilder.Append((char)Prefix[i]);
else
stringBuilder.Append($" 0x{Prefix[i]:X2} ");
}
PrefixString = stringBuilder.ToString();
}
/// <summary>
/// 回复数据 长度, 不含前序
/// </summary>
......@@ -133,9 +141,9 @@ namespace GeneralGommunication
public delegate object ParseFuncPackHandler(byte[] pack, int dataIdx);
public ParseFuncPackHandler ParseFuncPack;
public delegate bool IsMatchHandler(COMMREQ commReq, byte[] pack, int dataIdx, object context, ref object retData);
public IsMatchHandler IsMatch;
public object IsMatchContext;
public static byte[] ToPrefix(params object[] commands)
{
List<byte> bytes = new List<byte>();
......@@ -176,13 +184,9 @@ namespace GeneralGommunication
}
public class COMMREQ_Transaction
{
/// <summary>
/// 请求
/// </summary>
public COMMREQ commReq;
public abstract class COMMREQ_TransactionBase
{
/// <summary>
/// 回复 callback
/// </summary>
......@@ -192,6 +196,25 @@ namespace GeneralGommunication
/// 上下文
/// </summary>
public object asyncContext;
}
/// <summary>
/// 交易集合包
/// </summary>
public class COMMREQ_TransactionMulti : COMMREQ_TransactionBase
{
/// <summary>
/// 交易集合
/// </summary>
public List<COMMREQ_Transaction> transactions = new List<COMMREQ_Transaction>();
}
public class COMMREQ_Transaction: COMMREQ_TransactionBase
{
/// <summary>
/// 请求
/// </summary>
public COMMREQ commReq;
/// <summary>
/// 数据
......@@ -202,5 +225,15 @@ namespace GeneralGommunication
/// 数据的object 表达,只用于调试而已
/// </summary>
public object datasObj;
/// <summary>
/// 数据已经回复
/// </summary>
public bool hasRet;
/// <summary>
/// 回复的数据,只用于调试而已
/// </summary>
public object retData;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment