Commit 73a08e4b authored by 潘栩锋's avatar 潘栩锋 🚴

1.修复 Dev7E 线程安全优化

2.修复 flyad2021B2,连接断开恢复优化
parent 8593d013
...@@ -43,9 +43,9 @@ namespace GeneralGommunication ...@@ -43,9 +43,9 @@ namespace GeneralGommunication
/// <summary> /// <summary>
/// 有数据需要发送 /// 有数据需要发送
/// </summary> /// </summary>
public event SendDataEventHander SendMsgEvent; public event SendDataEventHandler SendMsgEvent;
public event TimeOutEventHandler TimeOutEvent;
...@@ -125,6 +125,8 @@ namespace GeneralGommunication ...@@ -125,6 +125,8 @@ namespace GeneralGommunication
{ {
//已经重试了3次,放弃 //已经重试了3次,放弃
ResetMsg(); ResetMsg();
//连接断开
TimeOutEvent?.Invoke(this);
return; return;
} }
else else
...@@ -140,25 +142,30 @@ namespace GeneralGommunication ...@@ -140,25 +142,30 @@ namespace GeneralGommunication
//IsConnected = true; //IsConnected = true;
csm.IncRec(recBuf.Count()); csm.IncRec(recBuf.Count());
for (int i = 0; i < recBuf.Count(); i++) List<byte[]> packs = new List<byte[]>();
lock (currPack)
{ {
if (recBuf[i] == 0x7e) for (int i = 0; i < recBuf.Count(); i++)
{ {
//找到头了 if (recBuf[i] == 0x7e)
//结束之前的包
if (currPack.Count > 0)
{ {
var pack = currPack.ToArray(); //找到头了
ParsePack(pack); //结束之前的包
currPack.Clear(); if (currPack.Count > 0)
csm.IncPack(1); {
var pack = currPack.ToArray();
packs.Add(pack);
currPack.Clear();
csm.IncPack(1);
}
} }
currPack.Add(recBuf[i]);
} }
currPack.Add(recBuf[i]);
} }
for (int i = 0; i < packs.Count(); i++) {
//OnPoll_TimeOut(); ParsePack(packs[i]);
}
} }
......
...@@ -10,354 +10,6 @@ using System.Threading.Tasks; ...@@ -10,354 +10,6 @@ using System.Threading.Tasks;
namespace GeneralGommunication namespace GeneralGommunication
{ {
//public class GComm_TcpClient : IGeneralComm
//{
// public Logger logger = NLog.LogManager.GetCurrentClassLogger();
// /// <summary>
// /// IP 地址 and 端口号
// /// </summary>
// public string Addr { get; set; }
// IPEndPoint RemoteEp => Misc.StringConverter.ToIPEndPoint(Addr);
// /// <summary>
// /// 是否异常
// /// </summary>
// public bool IsError => !string.IsNullOrEmpty(ErrMsg);
// /// <summary>
// /// 异常信息
// /// </summary>
// public string ErrMsg { get; private set; }
// /// <summary>
// /// 运行中
// /// </summary>
// public bool IsRunning { get; private set; }
// /// <summary>
// /// 连接成功
// /// </summary>
// public bool IsConnected { get; private set; }
// /// <summary>
// /// 接收task调用
// /// </summary>
// public event IGeneralCommDataReceivedHandler DataReceived;
// public event PropertyChangedEventHandler PropertyChanged;
// Socket sock;
// CancellationTokenSource cts_readTask;
// CancellationTokenSource cts_waitForSend;
// CancellationTokenSource cts_sendTask;
// public GComm_TcpClient()
// {
// }
// /// <summary>
// /// 开始
// /// </summary>
// public void Start()
// {
// if (IsRunning)
// return;
// IsRunning = true;
// cts_readTask = new CancellationTokenSource();
// Task.Factory.StartNew(OnTask, cts_readTask.Token);
// }
// /// <summary>
// /// 接收
// /// </summary>
// public void Stop()
// {
// if (!IsRunning)
// return;
// IsRunning = false;
// IsConnected = false;
// cts_readTask.Cancel();
// if (sock != null && sock.Connected)
// {
// sock.Close();
// sock = null;
// }
// }
// void OnTask()
// {
// while (!cts_readTask.IsCancellationRequested)
// {
// ConnectTask();
// if (IsConnected)
// {
// sendBuf.Clear();
// //启动发送task
// cts_sendTask = new CancellationTokenSource();
// var sendtask = Task.Factory.StartNew(SendTask, cts_sendTask.Token);
// //进入接收task
// ReceiveTask();
// //退出了,肯定连接断开了
// //需要等待 SendTask 也退出了
// cts_sendTask.Cancel();
// sendtask.Wait();
// if (sock != null)
// {
// sock.Close();
// }
// }
// //休息一会儿,在重连
// try
// {
// Task.Delay(2000, cts_readTask.Token).Wait();
// }
// catch (Exception e)
// {
// //被打断了
// break;
// }
// }
// IsConnected = false;
// }
// /// <summary>
// /// 连接任务
// /// </summary>
// void ConnectTask()
// {
// sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
// sock.Blocking = true;
// //1秒内,需要收到信息
// sock.ReceiveTimeout = 1000;
// //1秒内,必须发送完
// sock.SendTimeout = 1000;
// while (!cts_readTask.IsCancellationRequested)
// {
// try
// {
// sock.Connect(RemoteEp);
// IsConnected = true;
// ErrMsg = null;
// return;
// }
// catch (SocketException e)
// {
// //连接出错
// //等待重试
// ErrMsg = e.Message;
// }
// try
// {
// Task.Delay(2000, cts_readTask.Token).Wait();
// }
// catch (Exception e)
// {
// //被打断
// return;
// }
// }
// }
// /// <summary>
// /// 接收任务
// /// </summary>
// void ReceiveTask()
// {
// byte[] buf = new byte[sock.ReceiveBufferSize];
// while (!cts_readTask.IsCancellationRequested)
// {
// int len;
// try
// {
// len = sock.Receive(buf);
// if (len == 0)
// {
// continue;//没收到数据,继续等
// }
// }
// catch (SocketException e)
// {
// if (e.SocketErrorCode == SocketError.TimedOut)
// continue;//超时而已
// else if ((e.SocketErrorCode == SocketError.ConnectionAborted)
// || (e.SocketErrorCode == SocketError.ConnectionRefused)
// || (e.SocketErrorCode == SocketError.ConnectionReset))
// {
// IsConnected = false;
// return;
// }
// //Console.WriteLine($"ReceiveTask() {e}");
// //肯定断开连接了
// IsConnected = false;
// break;
// }
// logger.Debug($"ReceiveTask len={len}");
// DataReceived?.Invoke(this, buf.Take(len).ToArray());
// }
// IsConnected = false;
// }
// /// <summary>
// /// 发送任务
// /// </summary>
// void SendTask()
// {
// while (!cts_sendTask.IsCancellationRequested)
// {
// CancellationTokenSource cts3;
// while (true)
// {
// byte[] buffer;
// lock (sendBuf)
// {
// if (sendBuf.Count <= 0)
// break;
// buffer = sendBuf.ToArray();
// }
// int slen;
// try
// {
// slen = sock.Send(buffer);
// logger.Debug($"SendTask len={slen}");
// }
// catch (Exception e)
// {
// //连接断开了
// IsConnected = false;
// break;
// }
// if (slen <= 0)
// {
// //连接断开了
// IsConnected = false;
// break;
// }
// else
// {
// lock (sendBuf)
// {
// if (slen == sendBuf.Count())
// {
// sendBuf.Clear();
// break;
// }
// else
// {
// sendBuf.RemoveRange(0, slen);
// }
// }
// }
// }
// //重新等 下次被呼醒
// lock (sendBuf)
// {
// // 呼醒 发送task
// if (cts_waitForSend == null)
// {
// cts_waitForSend = new CancellationTokenSource();
// }
// //cts_waitForSend 与 cts_sendTask 合体, 任意一个都会呼醒 delay
// cts3 = CancellationTokenSource.CreateLinkedTokenSource(cts_waitForSend.Token, cts_sendTask.Token);
// }
// try
// {
// Task.Delay(-1, cts3.Token).Wait();
// }
// catch (Exception e)
// {
// //被打断, 有数据需要发送
// }
// cts_waitForSend = null;
// }
// }
// /// <summary>
// /// 无限大,发送缓存
// /// </summary>
// List<byte> sendBuf = new List<byte>();
// ///// <summary>
// ///// 发送数据
// ///// </summary>
// ///// <param name="buf"></param>
// //public void Write(IEnumerable<byte> buf)
// //{
// // if (!IsConnected)
// // return;
// // lock (sendBuf)
// // {
// // //放入,发送缓存区
// // sendBuf.AddRange(buf);
// // //呼醒 发送task
// // if (cts_waitForSend == null)
// // {
// // cts_waitForSend = new CancellationTokenSource();
// // }
// // cts_waitForSend.Cancel();
// // }
// //}
// /// <summary>
// /// 发送数据
// /// </summary>
// /// <param name="buf"></param>
// public void Write(IEnumerable<byte> buf)
// {
// if (!IsConnected)
// return;
// int slen;
// try
// {
// slen = sock.Send(buf.ToArray());
// logger.Debug($"SendTask len={slen}");
// }
// catch (Exception e)
// {
// //连接断开了
// IsConnected = false;
// return;
// }
// if (slen <= 0)
// {
// //连接断开了
// IsConnected = false;
// return;
// }
// }
// /// <summary>
// /// 清空输入缓存
// /// </summary>
// public void DiscardInBuffer()
// {
// //TODO
// }
//}
public class GComm_TcpClient : IGeneralComm public class GComm_TcpClient : IGeneralComm
{ {
public Logger logger;// = NLog.LogManager.GetCurrentClassLogger(); public Logger logger;// = NLog.LogManager.GetCurrentClassLogger();
...@@ -399,6 +51,11 @@ namespace GeneralGommunication ...@@ -399,6 +51,11 @@ namespace GeneralGommunication
Socket sock; Socket sock;
CancellationTokenSource cts_readTask; CancellationTokenSource cts_readTask;
/// <summary>
/// sock 的线程锁
/// </summary>
object sock_lock = new object();
Task task = null;
public GComm_TcpClient() public GComm_TcpClient()
{ {
...@@ -412,9 +69,21 @@ namespace GeneralGommunication ...@@ -412,9 +69,21 @@ namespace GeneralGommunication
if (IsRunning) if (IsRunning)
return; return;
IsRunning = true; IsRunning = true;
cts_readTask = new CancellationTokenSource();
Task.Factory.StartNew(OnTask, cts_readTask.Token); if (task == null || task.IsCompleted)
{
//线程已经完成, 创建个新的
cts_readTask = new CancellationTokenSource();
task = Task.Factory.StartNew(OnTask, cts_readTask.Token);
}
else
{
//线程还没结束,等待结束后,再创建新的
task.Wait();
cts_readTask = new CancellationTokenSource();
task = Task.Factory.StartNew(OnTask, cts_readTask.Token);
}
} }
/// <summary> /// <summary>
...@@ -427,13 +96,37 @@ namespace GeneralGommunication ...@@ -427,13 +96,37 @@ namespace GeneralGommunication
IsRunning = false; IsRunning = false;
IsConnected = false; IsConnected = false;
cts_readTask.Cancel(); cts_readTask.Cancel();
if (sock != null && sock.Connected) closeSock();
}
/// <summary>
/// 关闭sock,打断接收,发送中的等待,非OnTask调用
/// </summary>
void closeSock()
{
lock (sock_lock)
{ {
sock.Close(); if (sock != null && sock.Connected)
sock = null; {
sock.Close();
}
} }
Console.WriteLine($"{DateTime.Now} closeSock");
}
/// <summary>
/// 释放sock, 只能在OnTask中调用
/// </summary>
void disposeSock()
{
lock (sock_lock)//lock住,防止外部线程在 sock==null 时,调用sock.Close()
{
if (sock != null)
{
sock.Close();
sock = null;
}
}
Console.WriteLine($"{DateTime.Now} disposeSock");
} }
void OnTask() void OnTask()
{ {
while (!cts_readTask.IsCancellationRequested) while (!cts_readTask.IsCancellationRequested)
...@@ -446,14 +139,10 @@ namespace GeneralGommunication ...@@ -446,14 +139,10 @@ namespace GeneralGommunication
ReceiveTask(); ReceiveTask();
//退出了,肯定连接断开了 //退出了,肯定连接断开了
disposeSock();
if (sock != null)
{
sock.Close();
}
} }
//休息一会儿,重连 //休息一会儿,重连
try try
{ {
Task.Delay(2000, cts_readTask.Token).Wait(); Task.Delay(2000, cts_readTask.Token).Wait();
...@@ -464,7 +153,7 @@ namespace GeneralGommunication ...@@ -464,7 +153,7 @@ namespace GeneralGommunication
break; break;
} }
} }
disposeSock();
IsConnected = false; IsConnected = false;
} }
...@@ -473,12 +162,11 @@ namespace GeneralGommunication ...@@ -473,12 +162,11 @@ namespace GeneralGommunication
/// </summary> /// </summary>
void ConnectTask() void ConnectTask()
{ {
while (!cts_readTask.IsCancellationRequested) while (!cts_readTask.IsCancellationRequested)
{ {
try try
{ {
//sock 的 创建,销毁,只能在 这个线程操作
sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
sock.Blocking = true; sock.Blocking = true;
...@@ -494,22 +182,26 @@ namespace GeneralGommunication ...@@ -494,22 +182,26 @@ namespace GeneralGommunication
if (sucess) if (sucess)
{ {
sock.EndConnect(result); sock.EndConnect(result);
Console.WriteLine($"{DateTime.Now} connect success");
IsConnected = true; IsConnected = true;
ErrMsg = null; ErrMsg = null;
return; return;
} }
else else
{ {
sock.Close(); disposeSock();
Console.WriteLine($"{DateTime.Now} connect timeout");
ErrMsg = "连接超时"; ErrMsg = "连接超时";
} }
} }
catch (Exception e) catch (Exception e)
{ {
disposeSock();
//连接出错 //连接出错
//等待重试 //等待重试
ErrMsg = e.Message; ErrMsg = e.Message;
} }
try try
{ {
Task.Delay(2000, cts_readTask.Token).Wait(); Task.Delay(2000, cts_readTask.Token).Wait();
...@@ -543,24 +235,26 @@ namespace GeneralGommunication ...@@ -543,24 +235,26 @@ namespace GeneralGommunication
{ {
if (e.SocketErrorCode == SocketError.TimedOut) if (e.SocketErrorCode == SocketError.TimedOut)
continue;//超时而已 continue;//超时而已
else if ((e.SocketErrorCode == SocketError.ConnectionAborted) //else if ((e.SocketErrorCode == SocketError.ConnectionAborted)
|| (e.SocketErrorCode == SocketError.ConnectionRefused) // || (e.SocketErrorCode == SocketError.ConnectionRefused)
|| (e.SocketErrorCode == SocketError.ConnectionReset)) // || (e.SocketErrorCode == SocketError.ConnectionReset))
{ //{
IsConnected = false; // IsConnected = false;
ErrMsg = e.Message; // ErrMsg = e.Message;
return; // return;
} //}
//Console.WriteLine($"ReceiveTask() {e}"); //Console.WriteLine($"ReceiveTask() {e}");
//肯定断开连接了 //肯定断开连接了
IsConnected = false; IsConnected = false;
ErrMsg = e.Message; ErrMsg = e.Message;
Console.WriteLine($"{DateTime.Now} ReceiveTask end for (Exception {e.SocketErrorCode})");
return; return;
} }
logger?.Debug($"tcp R len={len}"); logger?.Debug($"tcp R len={len}");
DataReceived?.Invoke(this, buf.Take(len).ToArray()); DataReceived?.Invoke(this, buf.Take(len).ToArray());
} }
Console.WriteLine($"{DateTime.Now} ReceiveTask end for (cts_readTask.IsCancellationRequested == true)");
IsConnected = false; IsConnected = false;
} }
......
...@@ -28,7 +28,7 @@ namespace GeneralGommunication ...@@ -28,7 +28,7 @@ namespace GeneralGommunication
/// <summary> /// <summary>
/// 有数据需要发送 /// 有数据需要发送
/// </summary> /// </summary>
event SendDataEventHander SendMsgEvent; event SendDataEventHandler SendMsgEvent;
#region 模块运行接口 #region 模块运行接口
......
...@@ -21,7 +21,7 @@ namespace GeneralGommunication ...@@ -21,7 +21,7 @@ namespace GeneralGommunication
/// <summary> /// <summary>
/// 有数据需要发送 /// 有数据需要发送
/// </summary> /// </summary>
event SendDataEventHander SendMsgEvent; event SendDataEventHandler SendMsgEvent;
/// <summary> /// <summary>
/// 设备连接状态改变 /// 设备连接状态改变
......
...@@ -37,7 +37,7 @@ namespace GeneralGommunication ...@@ -37,7 +37,7 @@ namespace GeneralGommunication
/// <summary> /// <summary>
/// 有数据需要发送 /// 有数据需要发送
/// </summary> /// </summary>
public event SendDataEventHander SendMsgEvent; public event SendDataEventHandler SendMsgEvent;
public event DeviceConnectEventHander DeviceConnectEvent; public event DeviceConnectEventHander DeviceConnectEvent;
......
...@@ -36,7 +36,7 @@ namespace GeneralGommunication ...@@ -36,7 +36,7 @@ namespace GeneralGommunication
/// <summary> /// <summary>
/// 有数据需要发送 /// 有数据需要发送
/// </summary> /// </summary>
public event SendDataEventHander SendMsgEvent; public event SendDataEventHandler SendMsgEvent;
/// <summary> /// <summary>
/// 对于全部 有返回的函数调用,都使用Dispatcher,使线程同步 /// 对于全部 有返回的函数调用,都使用Dispatcher,使线程同步
......
...@@ -101,5 +101,8 @@ namespace GeneralGommunication ...@@ -101,5 +101,8 @@ namespace GeneralGommunication
public string errMsg; public string errMsg;
} }
public delegate void SendDataEventHander(object sender, byte[] data); public delegate void SendDataEventHandler(object sender, byte[] data);
public delegate void TimeOutEventHandler(object sender);
} }
...@@ -10,6 +10,7 @@ using System; ...@@ -10,6 +10,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Collections.ObjectModel; using System.Collections.ObjectModel;
using System.ComponentModel; using System.ComponentModel;
using System.Diagnostics;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
...@@ -276,6 +277,10 @@ namespace FlyADBase ...@@ -276,6 +277,10 @@ namespace FlyADBase
IsReadyContext isReadyContext = new IsReadyContext(); IsReadyContext isReadyContext = new IsReadyContext();
SysTickContext sysTickContext = new SysTickContext(); SysTickContext sysTickContext = new SysTickContext();
CalSpeed calSpeed = new CalSpeed(); CalSpeed calSpeed = new CalSpeed();
/// <summary>
/// 检查推送周期 10ms 有一个。 1秒内都没收到,应该断开了, 重连
/// </summary>
Stopwatch stopwatch_pushIntervalCheck = new Stopwatch();
public FlyAd2021B2Core Core => core; public FlyAd2021B2Core Core => core;
...@@ -334,6 +339,9 @@ namespace FlyADBase ...@@ -334,6 +339,9 @@ namespace FlyADBase
core.PushDataEvent += (sender, e) => core.PushDataEvent += (sender, e) =>
{ {
//喂狗
stopwatch_pushIntervalCheck.Restart();
//这个线程非主线程, 数据接收完,应该快速返回。 //这个线程非主线程, 数据接收完,应该快速返回。
//事件触发,都放在主线程操作。 //事件触发,都放在主线程操作。
//要有线程锁!!!!! //要有线程锁!!!!!
...@@ -434,6 +442,34 @@ namespace FlyADBase ...@@ -434,6 +442,34 @@ namespace FlyADBase
} }
}, TimeSpan.FromSeconds(1)); }, TimeSpan.FromSeconds(1));
PollModule.Current.Poll_Config(onPoll_CheckPush);
}
/// <summary>
/// 推送检查 ,没推送,就是连接断开
/// </summary>
private void onPoll_CheckPush()
{
if (!IsConnected)
{
if (stopwatch_pushIntervalCheck.IsRunning)
stopwatch_pushIntervalCheck.Stop();
return;
}
if (!stopwatch_pushIntervalCheck.IsRunning)
{
stopwatch_pushIntervalCheck.Restart();
}
else
{
if (stopwatch_pushIntervalCheck.ElapsedMilliseconds > 1000)
{
//1000ms 都没收到推送,连接断开
ReConnect();
}
}
} }
void GetRunResult() void GetRunResult()
...@@ -608,6 +644,8 @@ namespace FlyADBase ...@@ -608,6 +644,8 @@ namespace FlyADBase
} }
void _core_PushDataEvent(PushDataEventArgs e) { void _core_PushDataEvent(PushDataEventArgs e) {
Now = sysTickContext.ToDateTime(e.SysTick); Now = sysTickContext.ToDateTime(e.SysTick);
if (e.ENC1 != null) if (e.ENC1 != null)
...@@ -649,16 +687,6 @@ namespace FlyADBase ...@@ -649,16 +687,6 @@ namespace FlyADBase
advPushData( advPushData(
Now, e.AD, e.AD2); Now, e.AD, e.AD2);
} }
private void Core_PushDataEvent(object sender, PushDataEventArgs _e)
{
//这个线程非主线程, 数据接收完,应该快速返回。
//事件触发,都放在主线程操作。
//要有线程锁!!!!!
lock (pushEventArgs) {
pushEventArgs.Add(_e);
}
}
/// <summary> /// <summary>
/// 连接后初始化 /// 连接后初始化
...@@ -826,12 +854,20 @@ namespace FlyADBase ...@@ -826,12 +854,20 @@ namespace FlyADBase
} }
comm.DataReceived += Comm_DataReceived; comm.DataReceived += Comm_DataReceived;
comm.PropertyChanged += Comm_PropertyChanged; comm.PropertyChanged += Comm_PropertyChanged;
core.TimeOutEvent += Core_TimeOutEvent;
core.ResetMsg(); core.ResetMsg();
} }
private void Core_TimeOutEvent(object sender)
{
//回复超时,可能连接断开, 重新连接
ReConnect();
}
void disposeComm() void disposeComm()
{ {
comm.DataReceived -= Comm_DataReceived; comm.DataReceived -= Comm_DataReceived;
comm.PropertyChanged -= Comm_PropertyChanged; comm.PropertyChanged -= Comm_PropertyChanged;
core.TimeOutEvent -= Core_TimeOutEvent;
core.ResetMsg(); core.ResetMsg();
} }
private void Comm_DataReceived(IGeneralComm sender, byte[] msg) private void Comm_DataReceived(IGeneralComm sender, byte[] msg)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment