You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

700 lines
18 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package FLXNetworkController
import (
"flx/Common"
FLXDevice "flx/Device"
"fmt"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
// 网络通信管理
var FLXNetConnect FlxNetworkComController
type FlxNetworkComController struct {
mu sync.Mutex
ServerIP string //服务器IP
ServerPort int
LocalIP net.IP
BackUpServerIP string //服务器端口
Reader FlxNetworkCmdReader //接收线程
ClietsTCP []*FlxNetworkCmdTCPLisen //所有连接客户端接收线程
Sender FLXNetworkCmdSender //发送线程
NetSocket *net.TCPConn
BackUpNetSocket *net.TCPConn
TCPListerner *net.Listener
TCPKeeping bool //tcp连接
HandlerBack bool //
IsOpened bool //是否开始设备
IsStartAgenda bool //是否开始会议 //是否打开设备
ComPackets chan Compacket //信息通道
Event_SeatInfos []FLXDevice.SeatChangeEventArg //事件集合
SpeakFirstCloseClient bool //优先发言键按下时是否关闭其他发言设备
InvokeDeviceEventHandler func(FLXDevice.EventArgSource, FLXDevice.EventArgType, []FLXDevice.SeatChangeEventArg) //外部链接事件
}
func (para *FlxNetworkComController) InitFlxNetControl() {
defer func() {
if r := recover(); r != nil {
log.Error("InitFlxNetControl:", r)
}
}()
lp, _ := Common.ExternalIP()
para.LocalIP = lp
para.ServerPort = 9667
cfig := Common.LoadConfig()
para.ServerIP = cfig.SeverData.ServerIP
para.BackUpServerIP = cfig.SeverData.BackUpServerIP
para.ComPackets = make(chan Compacket, 600)
para.Sender.InitPara()
para.Sender.InitCoon(para.NetSocket)
para.Sender.ReConnectSocket = para.Sender_ReConnectSocket
// var Reader FlxNetCCUCmdReader
para.Reader.InitPara()
para.Reader.InitCoon(para.NetSocket, para.BackUpNetSocket)
para.Reader.InvokeOnReviceData = para.Reader_OnDataReciveEventHandler
para.StartUDP()
}
func (para *FlxNetworkComController) Reader_OnDataReciveEventHandler(comPacket Compacket) {
defer func() {
if r := recover(); r != nil {
log.Error("Reader_OnDataReciveEventHandler:", r)
}
}()
// comPackets.Enqueue(comPacket)
para.ComPackets <- comPacket
}
func (para *FlxNetworkComController) Sender_ReConnectSocket(ind int) bool {
defer func() {
if r := recover(); r != nil {
log.Error("Sender_ReConnectSocket:", r)
}
}()
fmt.Println("连接中断,重新连接。。。")
if ind == 0 {
para.SocketConnected()
} else {
para.SocketBackUpConnected()
}
return true
}
func (para *FlxNetworkComController) StartUDP() bool {
defer func() {
if r := recover(); r != nil {
log.Error("StartUDP:", r)
}
}()
para.IsOpened = true
para.CreatEventHandleThread()
go para.TimerCallbackHandler()
// para.Sender.CurrentSocket = para.NetSocket
// para.Reader.CurrentSocket = para.NetSocket
para.Reader.Start()
para.Sender.IsConnect = true
return true
}
func (para *FlxNetworkComController) StartTCPListen() {
defer func() {
if r := recover(); r != nil {
log.Error("StartTCPListen:", r)
}
}()
// 绑定监听地址
listener, err := net.Listen("tcp", ":9667")
if err != nil {
fmt.Println("listen err: ", err)
}
defer func() {
listener.Close()
}()
fmt.Println("bind:start listening...")
para.TCPListerner = &listener
para.TCPKeeping = true
for para.TCPKeeping {
// Accept 会一直阻塞直到有新的连接建立或者listen中断才会返回
conn, err := listener.Accept()
if err != nil {
// 通常是由于listener被关闭无法继续监听导致的错误
fmt.Println("accept err: ", err)
continue
}
var ClietTCP FlxNetworkCmdTCPLisen
para.ClietsTCP = append(para.ClietsTCP, &ClietTCP)
fmt.Println("客户端链接", conn.RemoteAddr().String())
ClietTCP.InvokeOnReviceData = para.Reader_OnDataReciveEventHandler
// 开启新的 goroutine 处理该连接
go ClietTCP.Handle(conn)
}
}
// func (para *FlxNetworkComController) Handle(conn net.Conn) {
// defer conn.Close()
// var clientReader FlxNetworkCmdReader
// clientReader.InvokeOnReviceData = para.Reader_OnDataReciveEventHandler
// para.ClietsReader = append(para.ClietsReader, clientReader)
// for {
// var buf [65535]byte
// n, err := conn.Read(buf[:])
// if err != nil {
// fmt.Println("Read from tcp server failed,err:", err)
// break
// }
// data := string(buf[:n])
// fmt.Printf("Recived from client,data:%s\n", data)
// }
// }
func (para *FlxNetworkComController) StartTCP(Contype int) bool {
defer func() {
if r := recover(); r != nil {
log.Error("StartTCP:", r)
}
}()
if Contype == 0 {
//22222222222
if para.SocketConnected() {
para.IsOpened = true
para.Sender.CurrentSocket = para.NetSocket
para.Reader.CurrentSocket = para.NetSocket
return true
} else {
para.IsOpened = false
return false
}
} else if Contype == 1 {
if para.SocketBackUpConnected() {
para.IsOpened = true
para.Sender.BackUpCurrentSocket = para.BackUpNetSocket
para.Reader.CurrentSocket = para.BackUpNetSocket
return true
} else {
para.IsOpened = false
return false
}
} else if Contype == 2 {
if para.SocketConnected() && para.SocketBackUpConnected() {
para.IsOpened = true
para.Sender.CurrentSocket = para.NetSocket
para.Sender.BackUpCurrentSocket = para.BackUpNetSocket
para.Reader.CurrentSocket = para.NetSocket
return true
} else {
para.IsOpened = false
return false
}
}
return false
}
func (para *FlxNetworkComController) Close() bool {
defer func() {
if r := recover(); r != nil {
log.Error("Close:", r)
}
}()
para.KillEventHandleThread()
para.IsOpened = false
para.Reader.Stop()
return true
}
func (para *FlxNetworkComController) TimerCallbackHandler() {
defer func() {
if r := recover(); r != nil {
log.Error("TimerCallbackHandler:", r)
}
}()
for para.HandlerBack {
fmt.Println("循环执行中")
para.Event_SeatInfos = para.Event_SeatInfos[0:0]
comPacket := <-para.ComPackets
seat := CopySeatChangeEventArg(comPacket)
// if seat.CMD == FlxNetworkComPara_Main_Start_Register {
// fmt.Println(seat.CMD)
// }
// fmt.Println(seat.CMD)
//本机测试先注释掉
// if seat.IPAddress != para.LocalIP.String() {
if seat.IPAddress == para.ServerIP || seat.IPAddress == para.BackUpServerIP {
para.Event_SeatInfos = append(para.Event_SeatInfos, seat)
para.InvokeDeviceEventHandler(FLXDevice.EventArgSource_SeverDevice, FLXDevice.EventArgType_ServerToClient, para.Event_SeatInfos)
} else {
para.Event_SeatInfos = append(para.Event_SeatInfos, seat)
para.InvokeDeviceEventHandler(FLXDevice.EventArgSource_ClientDevice, FLXDevice.EventArgType_ClientToServer, para.Event_SeatInfos)
}
// }
}
}
func CopySeatChangeEventArg(comPacket Compacket) FLXDevice.SeatChangeEventArg {
defer func() {
if r := recover(); r != nil {
log.Error("CopySeatChangeEventArg:", r)
}
}()
var seatInfo FLXDevice.SeatChangeEventArg
seatInfo.CMD = comPacket.Cmd
seatInfo.Value_ExtraInt = int(comPacket.Const)
seatInfo.IPAddress = comPacket.IPAddress
seatInfo.Value_ExtraStr = string(comPacket.Data)
seatInfo.Data = comPacket.Data
return seatInfo
}
func (para *FlxNetworkComController) SocketConnected() bool {
defer func() {
if r := recover(); r != nil {
log.Error("SocketConnected:", r)
}
}()
if para.NetSocket != nil {
fmt.Println("关闭原有tcp连接")
para.NetSocket.Close()
}
// if para.ServerIP == "" {
// para.InitFlxNetControl()
// }
// lp, _ := Common.ExternalIP()
// para.LocalIP = lp
// para.ServerPort = 9667
// cfig := Common.LoadConfig()
// para.ServerIP = cfig.SeverData.ServerIP
// para.BackUpServerIP = cfig.SeverData.BackUpServerIP
// para.ComPackets = make(chan Compacket, 600)
// para.Sender.InitPara()
// para.Sender.ReConnectSocket = para.Sender_ReConnectSocket
// // var Reader FlxNetCCUCmdReader
// para.Reader.InitPara()
// para.Reader.InitCoon(para.NetSocket, para.BackUpNetSocket)
// para.Reader.InvokeOnReviceData = para.Reader_OnDataReciveEventHandler
// para.StartUDP()
// para.InvokeDeviceEventHandler = OnDeviceEventHandlers
// serveradd := "192.168.8.108" + ":" + strconv.Itoa(9667)
serveradd := para.ServerIP + ":" + strconv.Itoa(para.ServerPort)
tcpAddr, err := net.ResolveTCPAddr("tcp4", serveradd)
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
return false
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
return false
}
time.Sleep(3 * time.Second)
// taoConn := tao.NewServerConn(0, CTcpServer.Server, conn)
// onConnect := tao.OnConnectOption(func(conn tao.WriteCloser) bool {
// holmes.Infoln("on connect")
// // 记录NetID与IP映射
// // MIPNetID[conn.Name()] = conn.(*tao.ServerConn).NetID()
// return true
// })
// onConnect(taoConn)
// _, errR := conn.Write([]byte("OK"))
// if errR != nil {
// fmt.Println(time.Now().Format("2006/1/2 15:04:05"), "TCP发送错误", err)
// }
if para.Sender.CurrentSocket != nil {
fmt.Println("22222222222222222222222222222222222", para.Sender.CurrentSocket)
}
para.IsOpened = true
para.Sender.IsConnect = true
para.NetSocket = conn
para.Sender.CurrentSocket = conn
para.Reader.CurrentSocket = conn
para.Sender.IsSendToServer = true
// para.Reader.StopTCP()
time.Sleep(10 * time.Millisecond)
para.Reader.StartTCP()
// para.Reader.Start()
return true
}
func (para *FlxNetworkComController) SocketBackUpConnected() bool {
defer func() {
if r := recover(); r != nil {
log.Error("SocketBackUpConnected:", r)
}
}()
if para.NetSocket != nil {
fmt.Println("关闭原有tcp连接")
para.NetSocket.Close()
}
serveradd := para.BackUpServerIP + ":" + strconv.Itoa(para.ServerPort)
tcpAddr, err := net.ResolveTCPAddr("tcp4", serveradd)
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
return false
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
return false
}
_, errR := conn.Write([]byte("OK"))
if errR != nil {
fmt.Println(time.Now().Format("2006/1/2 15:04:05"), "TCP发送错误", err)
}
if para.Sender.BackUpCurrentSocket != nil {
fmt.Println(para.Sender.BackUpCurrentSocket)
}
para.IsOpened = true
para.Sender.IsConnect = true
para.Sender.IsSendToBackUpServer = true
para.BackUpNetSocket = conn
para.Sender.BackUpCurrentSocket = conn
para.Sender.IsSendToBackUpServer = true
// para.Reader.CurrentSocket = conn
// para.Reader.StopTCP()
time.Sleep(10 * time.Millisecond)
para.Reader.StartBackUpTCP()
// para.Reader.Start()
return true
}
func (para *FlxNetworkComController) CreatEventHandleThread() bool {
// para.mu.Lock()
// defer para.mu.Unlock()
defer func() {
if r := recover(); r != nil {
log.Error("CreatEventHandleThread:", r)
}
}()
para.HandlerBack = true
return true
}
func (para *FlxNetworkComController) KillEventHandleThread() bool {
defer func() {
if r := recover(); r != nil {
log.Error("KillEventHandleThread:", r)
}
}()
para.HandlerBack = false
if para.IsOpened {
para.NetSocket.Close()
}
return true
}
// 关闭全部tcp连接
func (para *FlxNetworkComController) TCPListernerClose() bool {
defer func() {
if r := recover(); r != nil {
log.Error("TCPListernerClose:", r)
}
}()
para.TCPKeeping = false
if para.TCPListerner != nil {
for i := 0; i < len(para.ClietsTCP); i++ {
if para.ClietsTCP[i].TCPConn != nil {
para.ClietsTCP[i].TCPConn.Close()
}
}
para.ClietsTCP = para.ClietsTCP[0:0]
(*para.TCPListerner).Close()
}
return true
}
// 开服务器发送开始会议广播指令
func (para *FlxNetworkComController) SendCongressIDByUDP(Ag_ID string) {
defer func() {
if r := recover(); r != nil {
log.Error("SendCongressIDByUDP:", r)
}
}()
// 这里设置发送者的IP地址自己查看一下自己的IP自行设定
laddr := net.UDPAddr{
IP: para.LocalIP,
Port: 9000,
}
// 这里设置接收者的IP地址为广播地址
raddr := net.UDPAddr{
IP: net.IPv4(255, 255, 255, 255),
Port: 9668,
}
conn, err := net.DialUDP("udp4", &laddr, &raddr)
if err != nil {
println(err.Error())
return
}
//如果要打开tcp连接首先要确保tcp连接在关闭状态
defer conn.Close()
sendValue, _ := UTF8GBK([]byte(Ag_ID))
slen := len(sendValue) + 7
var buff = make([]byte, slen)
buff[0] = FlxNetworkComPara_CMD_HEAD
buff[1] = FlxNetworkComPara_Meeting
buff[2] = FlxNetworkComPara_Meeting_Start
if len(sendValue) > 255 {
buff[3] = (byte)(len(sendValue) / 255)
buff[4] = (byte)(len(sendValue) % 255)
} else {
buff[4] = (byte)(len(sendValue))
}
buff = insertSlice(5, sendValue, buff)
buff = FullCheckSum(buff)
para.IsStartAgenda = true
for para.IsStartAgenda {
_, err := conn.Write(buff)
if err != nil {
fmt.Println(err)
}
time.Sleep(time.Second * 5)
}
}
// 服务器发送结束会议指令
func (para *FlxNetworkComController) StopAgenda() {
defer func() {
if r := recover(); r != nil {
log.Error("StopAgenda:", r)
}
}()
para.IsStartAgenda = false
//确保关闭tcp连接
para.TCPListernerClose()
}
// 服务器发送开始报到指令
func (para *FlxNetworkComController) StartCheckInStart() {
defer func() {
if r := recover(); r != nil {
log.Error("StartCheckInStart:", r)
}
}()
for i := 0; i < len(para.ClietsTCP); i++ {
para.ClietsTCP[i].StartCheckInStart()
}
}
func (para *FlxNetworkComController) ServerSendToApointClientCheckInStart(ClientIP string) {
defer func() {
if r := recover(); r != nil {
log.Error("ServerSendToApointClientCheckInStart:", r)
}
}()
for i := 0; i < len(para.ClietsTCP); i++ {
if strings.Split(para.ClietsTCP[i].TCPConn.RemoteAddr().String(), ":")[0] == ClientIP {
para.ClietsTCP[i].StartCheckInStart()
}
}
}
// 服务器发送结束报到指令
func (para *FlxNetworkComController) StartCheckInStop() {
defer func() {
if r := recover(); r != nil {
log.Error("StartCheckInStop:", r)
}
}()
for i := 0; i < len(para.ClietsTCP); i++ {
para.ClietsTCP[i].StartCheckInStop()
}
}
// 服务器发送人员报到指令
func (para *FlxNetworkComController) SendToWithOutClientPersonCheckIn(AP_ID string, ClientIP string) {
defer func() {
if r := recover(); r != nil {
log.Error("SendToWithOutClientPersonCheckIn:", r)
}
}()
for i := 0; i < len(para.ClietsTCP); i++ {
if strings.Split(para.ClietsTCP[i].TCPConn.RemoteAddr().String(), ":")[0] != ClientIP {
para.ClietsTCP[i].SendToClientPersonCheckIn(AP_ID)
}
}
}
// 服务器发送人员报到指令
func (para *FlxNetworkComController) SendToClientPersonCheckIn(AP_ID string, ClientIP string) {
defer func() {
if r := recover(); r != nil {
log.Error("SendToClientPersonCheckIn:", r)
}
}()
for i := 0; i < len(para.ClietsTCP); i++ {
if strings.Split(para.ClietsTCP[i].TCPConn.RemoteAddr().String(), ":")[0] == ClientIP {
para.ClietsTCP[i].SendToClientPersonCheckIn(AP_ID)
}
}
}
// 服务器发送人员消报指令
func (para *FlxNetworkComController) SendToWithOutClientPersonCheckOut(AP_ID string, ClientIP string) {
defer func() {
if r := recover(); r != nil {
log.Error("SendToWithOutClientPersonCheckOut:", r)
}
}()
for i := 0; i < len(para.ClietsTCP); i++ {
if strings.Split(para.ClietsTCP[i].TCPConn.RemoteAddr().String(), ":")[0] != ClientIP {
para.ClietsTCP[i].SendToClientPersonCheckOut(AP_ID)
}
}
}
// 客户端发送报到指令
func (para *FlxNetworkComController) SendToServerPersonCheckIn(AP_ID string) {
defer func() {
if r := recover(); r != nil {
log.Error("SendToServerPersonCheckIn:", r)
}
}()
para.Sender.CkeckInSendRegister(AP_ID)
}
// 客户端发送消报指令
func (para *FlxNetworkComController) SendToServerPersonCheckOut(AP_ID string) {
defer func() {
if r := recover(); r != nil {
log.Error("SendToServerPersonCheckOut:", r)
}
}()
para.Sender.CkeckInSendUnRegister(AP_ID)
}
// 客户端发送获取会议状态指令
func (para *FlxNetworkComController) SendToServerGetCongressStatus() {
defer func() {
if r := recover(); r != nil {
log.Error("SendToServerGetCongressStatus:", r)
}
}()
para.Sender.CkeckInSendGetCongressStatus()
}
// 客户端发送接收会议通知反馈
func (para *FlxNetworkComController) SendToServerMeetingFeedback() {
defer func() {
if r := recover(); r != nil {
log.Error("SendToServerMeetingFeedback:", r)
}
}()
para.Sender.CkeckInSendMeetingFeedback(para.LocalIP.String())
}
func (para *FlxNetworkComController) AsClicent_DatagramReceived(endPoint string, SoNetID int64, datagram []byte) {
defer func() {
if r := recover(); r != nil {
log.Error("AsClicent_DatagramReceived:", r)
}
}()
if datagram[0] == 0x7E {
if len(datagram) > 4 && int(datagram[3])*256+int(datagram[4])+7 < len(datagram) {
for len(datagram) > 4 && int(datagram[3])*256+int(datagram[4])+7 <= len(datagram) {
Newbuff := datagram[:int(datagram[3])*256+int(datagram[4])+7]
comP := CreatCMDPack(endPoint, Newbuff)
para.ComPackets <- comP
if len(datagram)-len(Newbuff) > 0 {
LastNewbuff := datagram[len(Newbuff):]
datagram = datagram[0:0]
datagram = LastNewbuff
} else {
datagram = datagram[0:0]
}
}
} else {
comP := CreatCMDPack(endPoint, datagram)
fmt.Println(comP)
para.ComPackets <- comP
// FlxnetController.ComPackets <- &comP
}
}
}