package FLXNetworkController import ( "flx/Common" "flx/cite/holmes" "flx/cite/tao" "flx/localcache" "fmt" "net" "strconv" "strings" "sync" "time" log "github.com/sirupsen/logrus" ) type FLXNetworkCmdSender struct { /// 是否打印实时收发数据的日志 // IsInvokeRealData bool /// 当前所用串口 CurrentSocket *net.TCPConn BackUpCurrentSocket *net.TCPConn IsConnect bool IsBackUpConnect bool IsSendToServer bool IsSendToBackUpServer bool ReConnectSocket func(int) bool } func (para *FLXNetworkCmdSender) InitCoon(socket *net.TCPConn) { defer func() { if r := recover(); r != nil { log.Error("InitCoon:", r) } }() // go CTcpServerStart() StartTCP() para.CurrentSocket = socket } func (para *FLXNetworkCmdSender) InitPara() { defer func() { if r := recover(); r != nil { log.Error("InitPara:", r) } }() /// 是否打印实时收发数据的日志 // para.IsInvokeRealData = true para.IsConnect = false } // var log log4go.Logger // func init() { // log = log4go.NewLogger() // log.AddFilter("stdout", log4go.DEBUG, log4go.NewConsoleLogWriter()) // } type Message struct { Content []byte } // 实现tao.Message接口方法 func (m Message) Serialize() ([]byte, error) { defer func() { if r := recover(); r != nil { log.Error("Serialize:", r) } }() return m.Content, nil // 直接返回原始字节 } func (m Message) MessageNumber() int32 { defer func() { if r := recover(); r != nil { log.Error("MessageNumber:", r) } }() return 1 // 自定义消息类型ID,需与服务端一致 } type TCPServer struct { *tao.Server // D锁 } var ServerPort int = 9667 var MPerIDNetID map[string]int64 var MIPPerID map[string]string var MIPNetID map[string]int64 func NewTcpServer() *TCPServer { defer func() { if r := recover(); r != nil { log.Error("NewTcpServer:", r) } }() MIPNetID = make(map[string]int64) MPerIDNetID = make(map[string]int64) onConnectOption := tao.OnConnectOption(func(conn tao.WriteCloser) bool { holmes.Infoln("on connect") fmt.Println((conn.(*tao.ServerConn)).Name()) fmt.Println((conn.(*tao.ServerConn)).NetID()) MIPNetID[strings.Split((conn.(*tao.ServerConn)).Name(), ":")[0]] = (conn.(*tao.ServerConn)).NetID() for k, v := range MIPPerID { if k == strings.Split((conn.(*tao.ServerConn)).Name(), ":")[0] { MPerIDNetID[v] = (conn.(*tao.ServerConn)).NetID() } } return true }) onErrorOption := tao.OnErrorOption(func(conn tao.WriteCloser) { holmes.Infoln("on error") }) onCloseOption := tao.OnCloseOption(func(conn tao.WriteCloser) { holmes.Infoln("close client") }) //接收的地方 onMsgOption := tao.OnMessageOption(func(msg tao.Message, conn tao.WriteCloser) { fmt.Println("receive msg") Remote_IP := strings.Split((conn.(*tao.ServerConn)).Name(), ":")[0] fmt.Println(Remote_IP) // AsClicent_DatagramReceived(Remote_IP, conn.(*tao.ServerConn).NetID(), msg.(Message).Content) }) return &TCPServer{ tao.NewServer(onConnectOption, onCloseOption, onErrorOption, onMsgOption), } } var CTcpServer *TCPServer var AsClicent *tao.ClientConn var reconnectInterval = 5 * time.Second var IsConnect bool = false //是否连接服务器 func ConnectToServer() { defer func() { if r := recover(); r != nil { log.Error("ConnectToServer:", r) } }() secondaryAddr := Common.LoadConfig().SeverData.BackUpServerIP + ":" + strconv.Itoa(ServerPort) for { c, err := net.Dial("tcp", Common.LoadConfig().SeverData.ServerIP+":"+strconv.Itoa(ServerPort)) if err != nil { holmes.Errorf("连接失败,%v 后重试... 错误: %v\n", reconnectInterval, err) // 主IP失败时尝试备选IP c, err = net.Dial("tcp", secondaryAddr) if err != nil { holmes.Errorf("备选IP连接失败,%v 后重试主IP... 错误: %v\n", reconnectInterval, err) time.Sleep(reconnectInterval) continue } } onConnect := tao.OnConnectOption(func(conn tao.WriteCloser) bool { holmes.Infoln("on connect") strall := (conn.(*tao.ClientConn)).Name() strip := strings.Split(strall, ":")[0] strid := (conn.(*tao.ClientConn)).NetID() MIPNetID[strip] = strid for k, v := range MIPPerID { if k == strip { MPerIDNetID[v] = strid } } IsConnect = true if strip == Common.LoadConfig().SeverData.ServerIP { localcache.LineStatus = "在线:连接主机" } if strip == Common.LoadConfig().SeverData.BackUpServerIP { localcache.LineStatus = "在线:连接备机" } for i := 0; i < len(UnSendToServerData); { AsClicent.Write(UnSendToServerData[i]) UnSendToServerData = append(UnSendToServerData[:i], UnSendToServerData[i+1:]...) } return true }) onError := tao.OnErrorOption(func(c tao.WriteCloser) { localcache.LineStatus = "离线" holmes.Infoln("on error") IsConnect = false }) onClose := tao.OnCloseOption(func(c tao.WriteCloser) { localcache.LineStatus = "离线" holmes.Infoln("连接关闭,尝试重连...") IsConnect = false go ConnectToServer() // 异步触发重连 }) onMessage := tao.OnMessageOption(func(msg tao.Message, c tao.WriteCloser) { Remote_IP := strings.Split((c.(*tao.ClientConn)).Name(), ":")[0] IsConnect = true fmt.Println(msg.(Message).Content, Remote_IP) FLXNetConnect.AsClicent_DatagramReceived(Remote_IP, (c.(*tao.ClientConn)).NetID(), msg.(Message).Content) }) AsClicent = tao.NewClientConn(0, c, onConnect, onError, onClose, onMessage) AsClicent.Start() return // 连接成功后退出循环 } } const ( // ChatMessage is the message number of chat message. ChatMessage int32 = 1 ) // DeserializeMessage deserializes bytes into Message. func DeserializeMessage(data []byte) (message tao.Message, err error) { defer func() { if r := recover(); r != nil { log.Error("DeserializeMessage:", r) } }() if data == nil { return nil, tao.ErrNilData } content := (data) msg := Message{ Content: content, } return msg, nil } func StartTCP() { defer func() { if r := recover(); r != nil { log.Error("StartTCP:", r) } }() MIPNetID = make(map[string]int64) MPerIDNetID = make(map[string]int64) tao.Register(ChatMessage, DeserializeMessage, nil) go ConnectToServer() // //TCP连接 // if true { // // System.Net.IPEndPoint EndPoint = new System.Net.IPEndPoint(IPAddress.Parse(ServerIP), ServerPort); // // AsClicent = new FLXAsyncClient(EndPoint); // c, err := net.Dial("tcp", Common.LoadConfig().SeverData.ServerIP+":"+strconv.Itoa(ServerPort)) // if err != nil { // holmes.Fatalln(err) // } // onConnect := tao.OnConnectOption(func(conn tao.WriteCloser) bool { // holmes.Infoln("on connect") // strall := (conn.(*tao.ClientConn)).Name() // strip := strings.Split(strall, ":")[0] // strid := (conn.(*tao.ClientConn)).NetID() // MIPNetID[strip] = strid // for k, v := range MIPPerID { // if k == strings.Split((conn.(*tao.ClientConn)).Name(), ":")[0] { // MPerIDNetID[v] = (conn.(*tao.ClientConn)).NetID() // } // } // return true // }) // onError := tao.OnErrorOption(func(c tao.WriteCloser) { // holmes.Infoln("on error") // }) // onClose := tao.OnCloseOption(func(c tao.WriteCloser) { // holmes.Infoln("on close") // go StartTCP() // 异步触发重连 // }) // onMessage := tao.OnMessageOption(func(msg tao.Message, c tao.WriteCloser) { // Remote_IP := strings.Split((c.(*tao.ClientConn)).Name(), ":")[0] // // AsClicent_DatagramReceived(Remote_IP, (c.(*tao.ClientConn)).NetID(), msg.(Message).Content) // fmt.Println(msg.(Message).Content, Remote_IP) // }) // AsClicent = tao.NewClientConn(0, c, onConnect, onError, onClose, onMessage) // AsClicent.Start() // return // } } func CTcpServerStart() { defer func() { if r := recover(); r != nil { log.Error("CTcpServerStart:", r) } }() l, err := net.Listen("tcp", ":"+strconv.Itoa(ServerPort)) if err != nil { holmes.Fatalln("listen error", err) } CTcpServer = NewTcpServer() err = CTcpServer.Start(l) if err != nil { holmes.Fatalln("start error", err) } } // var CTcpServer *TCPServer var SendLock sync.RWMutex var UnSendToServerData = make([]Message, 0) // / 发送命令 // / 命令内容 func (para *FLXNetworkCmdSender) SendToServer(dataMsg []byte) { defer func() { if r := recover(); r != nil { log.Error("SendToServer:", r) } }() if dataMsg == nil { fmt.Printf("不能向远程端口发送空数据!") return } if len(dataMsg) < 5 { fmt.Printf("不能向远程端口发送不完整的数据5!") return } // dataMsg = FullCheckSum(dataMsg) // if para.IsSendToServer { if para.IsConnect { // _, err := para.CurrentSocket.Write([]byte{0x7e}) // if err != nil { // para.IsConnect = false // 立即标记为断开 // } // _, err := para.CurrentSocket.Write(dataMsg) // if err != nil { // fmt.Println("tcp发送失败", err.Error()) // } // fmt.Println(dataMsg) if IsConnect { SendLock.Lock() // 伪代码示例:需确保遍历所有连接 msg := Message{Content: dataMsg} // CTcpServer.Broadcast(msg) error := AsClicent.Write(msg) if error != nil { if !Common.Contain(UnSendToServerData, msg) { UnSendToServerData = append(UnSendToServerData, msg) } log.Error("SendToServer发送卡号失败:", msg) } SendLock.Unlock() } else { msg := Message{Content: dataMsg} if !Common.Contain(UnSendToServerData, msg) { UnSendToServerData = append(UnSendToServerData, msg) } log.Error("SendToServer发送卡号失败:", msg) } } else { fmt.Printf("服务器没有连接!") if para.TryConnect(0) { para.CurrentSocket.Write(dataMsg) } } // } if !para.IsSendToBackUpServer { return } if para.IsBackUpConnect { para.BackUpCurrentSocket.Write(dataMsg) } else { fmt.Printf("备份服务器没有连接!") if para.TryConnect(1) { para.BackUpCurrentSocket.Write(dataMsg) } } } func (sender *FLXNetworkCmdSender) TryConnect(SocketIndex int) bool { defer func() { if r := recover(); r != nil { log.Error("TryConnect:", r) } }() if sender.ReConnectSocket(SocketIndex) { sender.IsConnect = true time.Sleep(150 * time.Millisecond) return true } return false // } else { // time.Sleep(time.Second) // return sender.TryConnect(SocketIndex) // } } // / // / 报到门回复服务器开始会议指令 // / // / func (sender *FLXNetworkCmdSender) CkeckInSendMeetingFeedback(IPAddress string) { defer func() { if r := recover(); r != nil { log.Error("CkeckInSendMeetingFeedback:", r) } }() sendValue := []byte(IPAddress) slen := len(sendValue) + 7 var buff = make([]byte, slen) buff[0] = FlxNetworkComPara_CMD_HEAD buff[1] = FlxNetworkComPara_Meeting buff[2] = FlxNetworkComPara_Meeting_Start_CheckInDoorReturn if len(sendValue) > 255 { buff[3] = (byte)(len(sendValue) / 255) buff[4] = (byte)(len(sendValue) % 255) // buff[3] = (byte)((len(sendValue) & 0xFF00) >> 8) // buff[4] = (byte)(len(sendValue) & 0x00ff) } else { buff[4] = (byte)(len(sendValue)) } buff = insertSlice(5, sendValue, buff) sender.SendToServer(buff) } // / // / 报到门客户端获取会议状态 // / func (sender *FLXNetworkCmdSender) CkeckInSendGetCongressStatus() { defer func() { if r := recover(); r != nil { log.Error("CkeckInSendGetCongressStatus:", r) } }() var ary = make([]byte, 7) ary[0] = FlxNetworkComPara_CMD_HEAD ary[1] = FlxNetworkComPara_Server_Congress_Status //cmd ary[2] = FlxNetworkComPara_Client_ReqStatus sender.SendToServer(ary) } func (sender *FLXNetworkCmdSender) CkeckInSendGetRegister() { defer func() { if r := recover(); r != nil { log.Error("CkeckInSendGetRegister:", r) } }() var ary = make([]byte, 7) ary[0] = FlxNetworkComPara_CMD_HEAD ary[1] = FlxNetworkComPara_Client_State //cmd ary[2] = FlxNetworkComPara_CheckInDoor_Refrush sender.SendToServer(ary) } // / // / 报到门客户端报到 // / func (sender *FLXNetworkCmdSender) CkeckInSendRegister(PerID string) { defer func() { if r := recover(); r != nil { log.Error("CkeckInSendRegister:", r) } }() // sendValue := []byte(Common.LoadConfig().CheckInDoor.DoorName + "|" + PerID) sendValue, _ := UTF8GBK([]byte(PerID)) slen := len(sendValue) + 7 var buff = make([]byte, slen) buff[0] = FlxNetworkComPara_CMD_HEAD buff[1] = FlxNetworkComPara_Client_Request_Register buff[2] = FlxNetworkComPara_DoorCheckIn_Sub if len(sendValue) > 255 { buff[3] = (byte)(len(sendValue) / 255) buff[4] = (byte)(len(sendValue) % 255) } else { buff[4] = (byte)(len(sendValue)) } buff = insertSlice(5, sendValue, buff) sender.SendToServer(buff) } // / // / 报到门客户端销报 // / func (sender *FLXNetworkCmdSender) CkeckInSendUnRegister(PerID string) { defer func() { if r := recover(); r != nil { log.Error("CkeckInSendUnRegister:", r) } }() sendValue, _ := UTF8GBK([]byte(PerID)) slen := len(sendValue) + 7 var buff = make([]byte, slen) buff[0] = FlxNetworkComPara_CMD_HEAD buff[1] = FlxNetworkComPara_Client_Request_Register buff[2] = FlxNetworkComPara_DoorCheckOut_Sub if len(sendValue) > 255 { buff[3] = (byte)(len(sendValue) / 255) buff[4] = (byte)(len(sendValue) % 255) } else { buff[4] = (byte)(len(sendValue)) } buff = insertSlice(5, sendValue, buff) sender.SendToServer(buff) }