package websocketservers import ( "encoding/json" "flx/Common" "fmt" "net" "os" "strings" "sync" "time" "github.com/gorilla/websocket" log "github.com/sirupsen/logrus" ) var messageList [][]byte //储存因为不在线未发送的信息 var IsMaster bool = true var MasterSendID []string = []string{"meetman", "electron"} // ClientManager is a websocketservers manager // 客户端管理 type ClientManager struct { MuLock sync.RWMutex //客户端 map 储存并管理所有的长连接client,在线的为true,不在的为false Clients map[*Client]bool //web端发送来的的message我们用broadcast来接收,并最后分发给所有的client Broadcast chan []byte //新创建的长连接client Register chan *Client //新注销的长连接client Unregister chan *Client InvokeEventHandler func(uid string, ctype string, content string) //外部链接事件 InvokeEventHandlers func() } // Client is a websocketservers client // 客户端 Client type Client struct { //用户id Id string //连接的socket Socket *websocket.Conn //发送的消息 Send chan []byte } var MClientID map[string]string var FileData []map[string]interface{} // Message is an object for websocketservers message which is mapped to json type // 会把Message格式化成json type Message struct { //消息struct ContentType string `json:"contenttype,omitempty"` //文本类型 Sender string `json:"sender,omitempty"` //发送者 Recipient string `json:"recipient,omitempty"` //接收者 Content interface{} `json:"datarecord,omitempty"` //内容 ContentTime string `json:"contenttime,omitempty"` //时间 } // Manager define a ws server manager // 创建客户端管理者 var Manager = ClientManager{ Broadcast: make(chan []byte), Register: make(chan *Client), Unregister: make(chan *Client), Clients: make(map[*Client]bool), } func (manager *ClientManager) ManagerInit(invokeEventHandler func(uid string, ctype string, content string)) { defer func() { if r := recover(); r != nil { log.Error("ManagerInit:", r) } }() manager.InvokeEventHandler = invokeEventHandler } func (manager *ClientManager) ManagerInits(invokeEventHandler func()) { defer func() { if r := recover(); r != nil { log.Error("ManagerInits:", r) } }() manager.InvokeEventHandlers = invokeEventHandler } // Start is to start a ws server func (manager *ClientManager) Start() { defer func() { if r := recover(); r != nil { log.Error("websocketserversStart:", r) } }() for { select { //如果有新的连接接入,就通过channel把连接传递给conn case conn := <-manager.Register: func() { //把客户端的连接设置为true manager.MuLock.Lock() defer func() { if err := recover(); err != nil { fmt.Println("manager.MuLock:", err) // 打印错误信息 } manager.MuLock.Unlock() }() manager.Clients[conn] = true }() //把返回连接成功的消息json格式化 // jsonMessage, _ := json.Marshal(&Message{ContentType: Common.WebSocketOpen, Content: "id:" + conn.Id + " has connected.", ContentTime: time.Now().Format("2006-01-02 15:04:05")}) tcpConn, ok := conn.Socket.UnderlyingConn().(*net.TCPConn) if ok { // log.Fatal("Failed to cast net.Conn to *net.TCPConn") tcpConn.SetKeepAlive(true) tcpConn.SetKeepAlivePeriod(3 * time.Second) } //调用客户端的send方法,发送消息 // manager.Send(jsonMessage, conn) if manager.InvokeEventHandler != nil { manager.InvokeEventHandler(conn.Id, "Online", "") // nethelper.ServerSendSeatDataChange(conn.Id, "Online") // 测试无纸化在线离线 } //如果连接断开了 case conn := <-manager.Unregister: //判断连接的状态,如果是true,就关闭send,删除连接client的值 if _, ok := manager.Clients[conn]; ok { if manager.InvokeEventHandler != nil { manager.InvokeEventHandler(conn.Id, "Offline", "") // nethelper.ServerSendSeatDataChange(conn.Id, "Offline") // 测试无纸化在线离线 } close(conn.Send) func() { //把客户端的连接设置为true manager.MuLock.Lock() defer func() { if err := recover(); err != nil { fmt.Println("manager.MuLock:", err) // 打印错误信息 } manager.MuLock.Unlock() }() delete(manager.Clients, conn) }() // jsonMessage, _ := json.Marshal(&Message{ContentType: Common.WebSocketClose, Content: "id:" + conn.Id + " has disconnected.", ContentTime: time.Now().Format("2006-01-02 15:04:05")}) // manager.Send(jsonMessage, conn) } //广播 case message := <-manager.Broadcast: //遍历已经连接的客户端,把消息发送给他们 fmt.Println("33333333333-------") func() { //把客户端的连接设置为true manager.MuLock.Lock() defer func() { if err := recover(); err != nil { fmt.Println("manager.MuLock:", err) // 打印错误信息 } manager.MuLock.Unlock() }() for conn := range manager.Clients { select { case conn.Send <- message: default: close(conn.Send) delete(manager.Clients, conn) } } }() fmt.Println("333333333333333333333") } } } // Send is to send ws message to ws client // 定义客户端管理的send方法,屏蔽不需要发送的客户 func (manager *ClientManager) Send(message []byte, ignore *Client) { defer func() { if r := recover(); r != nil { log.Error("WebSocketserverSend:", r) // 可以在这里处理错误,比如设置 tempresultmap 之类的 } }() // fmt.Println("444444444----------") manager.MuLock.RLock() defer func() { if err := recover(); err != nil { fmt.Println("manager.MuLock:", err) // 打印错误信息 } manager.MuLock.RUnlock() }() //只遍历键时,使用下面的形式: for conn := range manager.Clients { //不给屏蔽的连接发送消息 if conn != ignore { conn.Send <- message } } // fmt.Println("4444444444444444") } // 定义客户端管理的send方法,屏蔽不需要发送的客户 func (manager *ClientManager) SendMessageByClientId(message []byte, uid string) { // fmt.Println("555555555555555-----------") manager.MuLock.RLock() defer func() { if err := recover(); err != nil { fmt.Println("manager.MuLock:", err) // 打印错误信息 log.Error("SendMessageByClientId:", err) } manager.MuLock.RUnlock() // fmt.Println("55555555555555555") }() //只遍历键时,使用下面的形式: for conn := range manager.Clients { //发给指定用户的连接发送消息 if conn.Id == uid { // // 打印消息发送前的日志,记录目标 IP 和 UID // fmt.Printf("Sending message to client with UID: %s, IP: %s\n", uid, "打印消息发送前的日志") // // 发送消息 // select { // case conn.Send <- message: // // 成功发送 // fmt.Println("Message sent successfully成功发送.") // default: // // 发送失败,管道阻塞 // fmt.Println("Failed to send message: Channel is blocked or client is not responsive发送失败,管道阻塞.") // } // // 发送后,确认已发送的消息 // fmt.Printf("发送后,确认已发送的消息Message sent to %s: %s\n", uid, string(message)) conn.Send <- message } } } // 定义客户端管理的send方法,屏蔽不需要发送的客户 func (manager *ClientManager) SendMessageByClientType(message []byte, uid string) { defer func() { if err := recover(); err != nil { fmt.Println("manager.MuLock:", err) // 打印错误信息 log.Error("SendMessageByClientType:", err) } manager.MuLock.RUnlock() // fmt.Println("55555555555555555") }() // fmt.Println("6666666666----------") manager.MuLock.RLock() //只遍历键时,使用下面的形式: for conn := range manager.Clients { //发给指定用户的连接发送消息 if strings.Contains(conn.Id, uid) { conn.Send <- message } } // fmt.Println("666666666666666666666") } // 定义客户端结构体的read方法 func (c *Client) Read() { defer func() { //触发关闭 Manager.Unregister <- c c.Socket.Close() if err := recover(); err != nil { log.WithFields(log.Fields{ "function": "Read", "error": err, }).Error("Read") log.Error("websocketserversRead:", err) // log.Printf("OpenDevice: %v", err) // 记录错误信息到日志 fmt.Println("Read:", err) // 打印错误信息 } }() for { // c.Socket.SetReadDeadline(time.Now().Add(10 * time.Second)) // 设置读取超时为10秒 //读取消息 _, message, err := c.Socket.ReadMessage() //如果有错误信息,就注销这个连接然后关闭 if err != nil { Manager.Unregister <- c c.Socket.Close() break } //如果没有错误信息就把信息放入broadcast // var jsonMessage []byte // if message != nil && string(message) == Common.WebSocketHeartBeat { // jsonMessage, _ = json.Marshal(&Message{ContentType: Common.WebSocketHeartBeat, Sender: c.Id, Content: string(message), ContentTime: time.Now().Format("2006-01-02 15:04:05")}) // } else { // jsonMessage, _ = json.Marshal(&Message{ContentType: Common.WebSocketBroadCast, Sender: c.Id, Content: string(message), ContentTime: time.Now().Format("2006-01-02 15:04:05")}) // } tempMap := make(map[string]interface{}) json.Unmarshal(message, &tempMap) if tempMap["ContentType"] == Common.WebSocketChat { jsonMessage, _ := json.Marshal(&Message{ContentType: Common.WebSocketChat, Sender: c.Id, Recipient: tempMap["Recipient"].(string), Content: tempMap["Content"].(string), ContentTime: time.Now().Format("2006-01-02 15:04:05")}) //Content 有要发送给谁 //发给李四 fmt.Println("77777777777----------") func() { //把客户端的连接设置为true Manager.MuLock.RLock() defer func() { if err := recover(); err != nil { fmt.Println("Manager.MuLock:", err) // 打印错误信息 } Manager.MuLock.RUnlock() }() for k, v := range Manager.Clients { if k.Id == tempMap["Recipient"].(string) { for true { if v == true { messageList = append(messageList, jsonMessage) for _, m := range messageList { k.Send <- m } break } } } } }() fmt.Println("777777777777777777777") } else if tempMap["ContentType"] == Common.WebSocketRefresh { onlineList := make([]string, 0) fmt.Println("88888888888888----------") func() { //把客户端的连接设置为true Manager.MuLock.RLock() defer func() { if err := recover(); err != nil { fmt.Println("Manager.MuLock:", err) // 打印错误信息 } Manager.MuLock.RUnlock() }() for k, v := range Manager.Clients { cli := k if v { onlineList = append(onlineList, cli.Id) } jsonMessage, _ := json.Marshal(&Message{ContentType: Common.WebSocketRefresh, Sender: c.Id, Content: onlineList, ContentTime: time.Now().Format("2006-01-02 15:04:05")}) c.Send <- jsonMessage } }() fmt.Println("888888888888888888888888") } else if tempMap["ContentType"] == Common.WebSocketCloseExe { //浏览器关闭时调用退出会议 // localcache.DisposeContent() // ServerSendElectronScreenClose() if Manager.InvokeEventHandlers != nil { Manager.InvokeEventHandlers() } time.Sleep(time.Second * 2) os.Exit(0) } else if tempMap["ContentType"] == Common.WebSocketCloseElectron { //浏览器关闭时调用退出会议 // localcache.DisposeContent() ServerSendElectronScreenClose() if Manager.InvokeEventHandlers != nil { Manager.InvokeEventHandlers() } } else if tempMap["ContentType"] == Common.WebSocketFileRate { FileData = append(FileData, tempMap) } // log.Println(string(jsonMessage)) //触发消息发送 //Manager.Broadcast <- jsonMessage } } func (c *Client) Write() { defer func() { c.Socket.Close() if err := recover(); err != nil { log.WithFields(log.Fields{ "function": "Write", "error": err, }).Error("Write") log.Error("websocketserversWrite:", err) // log.Printf("OpenDevice: %v", err) // 记录错误信息到日志 fmt.Println("Write:", err) // 打印错误信息 } }() for { select { //从send里读消息 case message, ok := <-c.Send: //如果没有消息 if !ok { c.Socket.WriteMessage(websocket.CloseMessage, []byte{}) fmt.Println("发送信息错误") return } //有消息就写入,发送给web端 // log.Println(string(message)) if IsMaster { c.Socket.WriteMessage(websocket.TextMessage, message) //fmt.Println("发送信息111:", MasterSendID, string(message)) } else { for i := 0; i < len(MasterSendID); i++ { if MasterSendID[i] == c.Id { c.Socket.WriteMessage(websocket.TextMessage, message) //fmt.Println("发送信息222:", MasterSendID, string(message)) break } } } // c.Socket.WriteMessage(websocket.TextMessage, message) } } }