qianxb 4 mesi fa
parent
commit
ab9b548fd2
22 ha cambiato i file con 847 aggiunte e 283 eliminazioni
  1. 5 0
      blade-service-api/blade-meter-api/pom.xml
  2. 1 1
      blade-service-api/blade-meter-api/src/main/java/org/springblade/meter/entity/TaskRepealMessage.java
  3. 5 4
      blade-service-api/blade-meter-api/src/main/java/org/springblade/meter/feign/MeterWebSocketClient.java
  4. 31 0
      blade-service-api/blade-websocket-api/pom.xml
  5. 15 0
      blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/entity/WebSocketClientInfo.java
  6. 18 0
      blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/feign/WebSocketClient.java
  7. 33 0
      blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/vo/MsgVO.java
  8. 54 0
      blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/vo/UserInfoVO.java
  9. 1 0
      blade-service-api/pom.xml
  10. 3 1
      blade-service/blade-meter/src/main/java/org/springblade/meter/controller/MessageBizController.java
  11. 0 51
      blade-service/blade-meter/src/main/java/org/springblade/meter/feign/MeterTaskClientImpl.java
  12. 19 0
      blade-service/blade-meter/src/main/java/org/springblade/meter/feign/MeterWebSocketClientImpl.java
  13. 5 5
      blade-service/blade-meter/src/main/java/org/springblade/meter/service/ITaskRepealMessageService.java
  14. 33 8
      blade-service/blade-meter/src/main/java/org/springblade/meter/service/impl/TaskRepealMessageServiceImpl.java
  15. 213 213
      blade-service/blade-rabbitmq-consumer/src/main/java/org/springblade/consumer/socket/WebSocketEndpoint.java
  16. 54 0
      blade-service/blade-websocket/pom.xml
  17. 20 0
      blade-service/blade-websocket/src/main/java/org/springblade/websocket/WebsocketApplication.java
  18. 25 0
      blade-service/blade-websocket/src/main/java/org/springblade/websocket/config/WebSocketConfig.java
  19. 39 0
      blade-service/blade-websocket/src/main/java/org/springblade/websocket/controller/WebsocketController.java
  20. 22 0
      blade-service/blade-websocket/src/main/java/org/springblade/websocket/feign/WebSocketClientImpl.java
  21. 250 0
      blade-service/blade-websocket/src/main/java/org/springblade/websocket/service/WebSocketService.java
  22. 1 0
      blade-service/pom.xml

+ 5 - 0
blade-service-api/blade-meter-api/pom.xml

@@ -22,6 +22,11 @@
             <groupId>org.springblade</groupId>
             <artifactId>blade-starter-excel</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springblade</groupId>
+            <artifactId>blade-websocket-api</artifactId>
+            <version>${bladex.project.version}</version>
+        </dependency>
     </dependencies>
     <packaging>jar</packaging>
 

+ 1 - 1
blade-service-api/blade-meter-api/src/main/java/org/springblade/meter/entity/TaskRepealMessage.java

@@ -9,7 +9,7 @@ import org.springblade.core.mp.base.BaseEntity;
 import java.time.LocalDate;
 
 /**
- * @Param
+ * @Param   任务废除信息
  * @Author wangwl
  * @Date 2024/6/25 16:46
  **/

+ 5 - 4
blade-service-api/blade-meter-api/src/main/java/org/springblade/meter/feign/MeterWebSocketClient.java

@@ -1,18 +1,19 @@
 package org.springblade.meter.feign;
 
 import org.springblade.common.constant.LauncherConstant;
+import org.springblade.websocket.vo.UserInfoVO;
 import org.springframework.cloud.openfeign.FeignClient;
 import org.springframework.stereotype.Component;
 import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestParam;
 
-import java.util.Map;
-
 @FeignClient(value = LauncherConstant.APPLICATION_METER_NAME)
 @Component
 public interface MeterWebSocketClient {
 
-    @GetMapping(value = "/meter/getWebsocketMsg")
-    Map<String, String> getWebsocketMsg(@RequestParam String projectId, @RequestParam String contractId, @RequestParam String userIdResult);
+    @PostMapping(value = "/meter/pushMsg")
+    void pushMsg(@RequestBody UserInfoVO vo);
 
 }

+ 31 - 0
blade-service-api/blade-websocket-api/pom.xml

@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>org.springblade</groupId>
+        <artifactId>blade-service-api</artifactId>
+        <version>2.9.1.RELEASE</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>blade-websocket-api</artifactId>
+    <name>${project.artifactId}</name>
+    <version>${bladex.project.version}</version>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springblade</groupId>
+            <artifactId>blade-common</artifactId>
+        </dependency>
+    </dependencies>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+</project>

+ 15 - 0
blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/entity/WebSocketClientInfo.java

@@ -0,0 +1,15 @@
+package org.springblade.websocket.entity;
+
+import lombok.Data;
+
+import javax.websocket.Session;
+
+@Data
+public class WebSocketClientInfo {
+    // 与某个客户端的连接会话,需要通过它来给客户端发送数据
+    private Session session;
+ 
+    //连接的uri
+    private String uri;
+
+}

+ 18 - 0
blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/feign/WebSocketClient.java

@@ -0,0 +1,18 @@
+package org.springblade.websocket.feign;
+
+import org.springblade.common.constant.LauncherConstant;
+import org.springblade.websocket.vo.UserInfoVO;
+import org.springframework.cloud.openfeign.FeignClient;
+import org.springframework.stereotype.Component;
+import org.springframework.web.bind.annotation.*;
+
+/**
+ * @Param
+ * @Author wangwl
+ * @Date 2024/7/18 16:17
+ **/
+@FeignClient(value = LauncherConstant.APPLICATION_WEBSOCKET_NAME)
+public interface WebSocketClient {
+    @PostMapping(value = "/socket/sendMsg")
+    void sendMsg(@RequestBody UserInfoVO vo);
+}

+ 33 - 0
blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/vo/MsgVO.java

@@ -0,0 +1,33 @@
+package org.springblade.websocket.vo;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @Param   推送到前端的消息格式
+ * @Author wangwl
+ * @Date 2024/7/17 14:08
+ **/
+@Data
+public class MsgVO<T> implements Serializable {
+
+    /**
+     * 每个系统独立拥有消息类型
+     *  计量:msgRemind(消息提醒)
+     *  档案:
+     */
+    @ApiModelProperty("消息类型")
+    private String type;
+    @ApiModelProperty("承载数据")
+    private T data;
+
+    public MsgVO() {
+    }
+
+    public MsgVO(String type, T data) {
+        this.type = type;
+        this.data = data;
+    }
+}

+ 54 - 0
blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/vo/UserInfoVO.java

@@ -0,0 +1,54 @@
+package org.springblade.websocket.vo;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * @Param   封装用户的系统-项目-合同-id信息
+ * @Author wangwl
+ * @Date 2024/7/18 16:26
+ **/
+@Data
+public class UserInfoVO implements Serializable {
+
+    @ApiModelProperty("system")
+    private String system;
+
+    @ApiModelProperty("projectId")
+    private Long projectId;
+
+    @ApiModelProperty("contractId")
+    private Long contractId;
+
+    @ApiModelProperty("userId")
+    private Long userId;
+
+    @ApiModelProperty("msg")
+    private String msg;
+
+    public UserInfoVO() {
+    }
+
+    public UserInfoVO(String system, Long projectId, Long contractId, Long userId) {
+        this.system = system;
+        this.projectId = projectId;
+        this.contractId = contractId;
+        this.userId = userId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        UserInfoVO that = (UserInfoVO) o;
+        return Objects.equals(system, that.system) && Objects.equals(projectId, that.projectId) && Objects.equals(contractId, that.contractId) && Objects.equals(userId, that.userId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(system, projectId, contractId, userId);
+    }
+}

+ 1 - 0
blade-service-api/pom.xml

@@ -29,6 +29,7 @@
         <module>blade-land-api</module>
         <module>blade-meter-api</module>
         <module>blade-rabbitmq-producer-api</module>
+        <module>blade-websocket-api</module>
     </modules>
 
     <dependencies>

+ 3 - 1
blade-service/blade-meter/src/main/java/org/springblade/meter/controller/MessageBizController.java

@@ -6,6 +6,7 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import lombok.AllArgsConstructor;
 import org.springblade.core.boot.ctrl.BladeController;
+import org.springblade.core.secure.utils.AuthUtil;
 import org.springblade.core.tool.api.R;
 import org.springblade.meter.dto.IdsDTO;
 import org.springblade.meter.dto.TaskRepealMessageDTO;
@@ -31,7 +32,8 @@ public class MessageBizController extends BladeController {
     @ApiOperationSupport(order = 1)
     @ApiOperation(value = "前端轮询获取未读消息数量", notes = "返回数字,0为没有未读消息")
     public R<Integer> getUnreadMessage(Long contractId) {
-        Integer total = taskRepealMessageService.getUnreadMessage(contractId);
+        Long userId = AuthUtil.getUserId();
+        Integer total = taskRepealMessageService.getUnreadMessage(contractId,userId);
         return R.data(total);
     }
 

+ 0 - 51
blade-service/blade-meter/src/main/java/org/springblade/meter/feign/MeterTaskClientImpl.java

@@ -1,51 +0,0 @@
-package org.springblade.meter.feign;
-
-import lombok.AllArgsConstructor;
-import org.apache.commons.lang.StringUtils;
-import org.springblade.business.entity.MessageWarning;
-import org.springblade.business.entity.Task;
-import org.springblade.business.entity.TaskParallel;
-import org.springframework.jdbc.core.BeanPropertyRowMapper;
-import org.springframework.jdbc.core.JdbcTemplate;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-
-@AllArgsConstructor
-public class MeterTaskClientImpl implements MeterWebSocketClient {
-
-    private final JdbcTemplate jdbcTemplate;
-
-    @Override
-    public Map<String, String> getWebsocketMsg(String projectId, String contractId, String userIdResult) {
-        List<Task> tasks = jdbcTemplate.query("SELECT process_instance_id FROM u_task WHERE project_id = " + projectId + " AND contract_id = " + contractId + " AND status = 1 AND approval_type = 5 AND is_deleted = 0", new BeanPropertyRowMapper<>(Task.class));
-        List<String> taskCountIds = tasks.stream().map(Task::getProcessInstanceId).collect(Collectors.toList());
-        long taskAllCount = 0;
-        if (taskCountIds.size() > 0) {
-            List<TaskParallel> query = jdbcTemplate.query("SELECT id FROM u_task_parallel WHERE process_instance_id IN (" + StringUtils.join(taskCountIds, ",") + ") AND task_user = " + userIdResult + " AND status = 1", new BeanPropertyRowMapper<>(TaskParallel.class));
-            taskAllCount = query.size();
-        }
-        List<MessageWarning> messageWarnings = jdbcTemplate.query("SELECT type FROM u_message_warning WHERE project_id = " + projectId + " AND contract_id = " + contractId + " AND push_user = " + userIdResult + " AND is_read = 0 AND is_deleted = 0", new BeanPropertyRowMapper<>(MessageWarning.class));
-        long messageCount1 = messageWarnings.stream().filter(f -> f.getType().equals(1)).count();
-        long messageCount2 = messageWarnings.stream().filter(f -> f.getType().equals(2)).count();
-        long messageCount3 = messageWarnings.stream().filter(f -> f.getType().equals(3)).count();
-        long messageCount4 = messageWarnings.stream().filter(f -> f.getType().equals(4)).count();
-        long messageCount5 = messageWarnings.stream().filter(f -> f.getType().equals(5)).count();
-        Map<String, String> map = new HashMap<>();
-        map.put("allCount", String.valueOf(taskAllCount + messageWarnings.size()));
-        map.put("taskCount", String.valueOf(taskAllCount));
-        map.put("messageCount", String.valueOf(messageWarnings.size()));
-        map.put("messageCount_1", String.valueOf(messageCount1));
-        map.put("messageCount_2", String.valueOf(messageCount2));
-        map.put("messageCount_3", String.valueOf(messageCount3));
-        map.put("messageCount_4", String.valueOf(messageCount4));
-        map.put("messageCount_5", String.valueOf(messageCount5));
-        map.put("userId", userIdResult);
-        return map;
-    }
-
-
-}

+ 19 - 0
blade-service/blade-meter/src/main/java/org/springblade/meter/feign/MeterWebSocketClientImpl.java

@@ -0,0 +1,19 @@
+package org.springblade.meter.feign;
+
+import lombok.AllArgsConstructor;
+import org.springblade.meter.service.ITaskRepealMessageService;
+import org.springblade.websocket.vo.UserInfoVO;
+import org.springframework.web.bind.annotation.RestController;
+
+
+@RestController
+@AllArgsConstructor
+public class MeterWebSocketClientImpl implements MeterWebSocketClient {
+
+    private final ITaskRepealMessageService repealMessageService;
+    @Override
+    public void pushMsg(UserInfoVO vo) {
+        //推送废除通知
+        repealMessageService.pushUnreadMessage(vo);
+    }
+}

+ 5 - 5
blade-service/blade-meter/src/main/java/org/springblade/meter/service/ITaskRepealMessageService.java

@@ -20,25 +20,25 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
 import org.springblade.core.mp.base.BaseService;
 import org.springblade.meter.dto.IdsDTO;
 import org.springblade.meter.dto.TaskRepealMessageDTO;
-import org.springblade.meter.entity.AttachmentForm;
 import org.springblade.meter.entity.TaskRepealMessage;
 import org.springblade.meter.vo.TaskRepealMessageVO;
-
-import java.util.Set;
+import org.springblade.websocket.vo.UserInfoVO;
 
 /**
- * 附件表 服务类
+ * 任务废除信息 服务类
  *
  * @author BladeX
  * @since 2023-11-29
  */
 public interface ITaskRepealMessageService extends BaseService<TaskRepealMessage> {
 
-    Integer getUnreadMessage(Long contractId);
+    Integer getUnreadMessage(Long contractId,Long userId);
 
     IPage<TaskRepealMessageVO> repealPage(TaskRepealMessageDTO dto);
 
     void batchRead(IdsDTO dto);
 
     void batchDelete(IdsDTO dto);
+
+    void pushUnreadMessage(UserInfoVO vo);
 }

+ 33 - 8
blade-service/blade-meter/src/main/java/org/springblade/meter/service/impl/TaskRepealMessageServiceImpl.java

@@ -19,39 +19,43 @@ package org.springblade.meter.service.impl;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.AllArgsConstructor;
 import org.apache.commons.lang.StringUtils;
 import org.springblade.core.log.exception.ServiceException;
 import org.springblade.core.mp.base.BaseServiceImpl;
 import org.springblade.core.secure.utils.AuthUtil;
+import org.springblade.core.tool.api.R;
+import org.springblade.core.tool.jackson.JsonUtil;
 import org.springblade.core.tool.utils.Func;
 import org.springblade.meter.dto.IdsDTO;
 import org.springblade.meter.dto.TaskRepealMessageDTO;
-import org.springblade.meter.entity.AttachmentForm;
 import org.springblade.meter.entity.TaskRepealMessage;
-import org.springblade.meter.mapper.AttachmentFormMapper;
 import org.springblade.meter.mapper.TaskRepealMessageMapper;
-import org.springblade.meter.service.IAttachmentFormService;
 import org.springblade.meter.service.ITaskRepealMessageService;
-import org.springblade.meter.vo.MeterPeriodVO;
 import org.springblade.meter.vo.TaskRepealMessageVO;
+import org.springblade.websocket.feign.WebSocketClient;
+import org.springblade.websocket.vo.MsgVO;
+import org.springblade.websocket.vo.UserInfoVO;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
-import java.util.Set;
 
 /**
- * 附件表 服务实现类
+ * 任务废除信息 服务实现类
  *
  * @author BladeX
  * @since 2023-11-29
  */
 @Service
+@AllArgsConstructor
 public class TaskRepealMessageServiceImpl extends BaseServiceImpl<TaskRepealMessageMapper, TaskRepealMessage> implements ITaskRepealMessageService {
 
+    private final WebSocketClient webSocketClient;
+
 
     @Override
-    public Integer getUnreadMessage(Long contractId) {
-        Long userId = AuthUtil.getUserId();
+    public Integer getUnreadMessage(Long contractId,Long userId) {
         long count = this.count(new LambdaQueryWrapper<TaskRepealMessage>()
                 .eq(TaskRepealMessage::getContractId, contractId)
                 .eq(TaskRepealMessage::getUserId, userId)
@@ -94,4 +98,25 @@ public class TaskRepealMessageServiceImpl extends BaseServiceImpl<TaskRepealMess
         }
         this.removeBatchByIds(list);
     }
+
+    @Override
+    @Async
+    public void pushUnreadMessage(UserInfoVO vo) {
+        try {
+            Thread.sleep(10L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        long count = this.count(new LambdaQueryWrapper<TaskRepealMessage>()
+                .eq(TaskRepealMessage::getContractId, vo.getContractId())
+                .eq(TaskRepealMessage::getUserId, vo.getUserId())
+                //1为未读,2为已读
+                .eq(TaskRepealMessage::getStatus, 1));
+        MsgVO<Integer> msgVO = new MsgVO<>();
+        msgVO.setType("msgRemind");
+        msgVO.setData(Math.toIntExact(count));
+        String msg = JsonUtil.toJson(R.data(msgVO));
+        vo.setMsg(msg);
+        webSocketClient.sendMsg(vo);
+    }
 }

+ 213 - 213
blade-service/blade-rabbitmq-consumer/src/main/java/org/springblade/consumer/socket/WebSocketEndpoint.java

@@ -1,213 +1,213 @@
-package org.springblade.consumer.socket;
-
-import cn.hutool.core.util.ObjectUtil;
-import cn.hutool.core.util.StrUtil;
-import com.alibaba.fastjson.JSON;
-import org.apache.commons.lang.StringUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.springblade.business.feign.BusinessWebSocketClient;
-import org.springblade.common.constant.ClientIdConstant;
-import org.springblade.feign.ArchiveWebSocketClient;
-import org.springblade.manager.feign.ManagerWebSocketClient;
-import org.springblade.meter.feign.MeterWebSocketClient;
-import org.springframework.beans.BeansException;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
-import org.springframework.web.bind.annotation.CrossOrigin;
-
-import javax.websocket.*;
-import javax.websocket.server.PathParam;
-import javax.websocket.server.ServerEndpoint;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-@Component
-@CrossOrigin
-@ServerEndpoint(value = "/websocket/{userId}")
-public class WebSocketEndpoint implements ApplicationContextAware {
-
-    private static final ConcurrentHashMap<String, WebSocketEndpoint> webSocketMap = new ConcurrentHashMap<>();
-    private static final ConcurrentHashMap<String, String> webSocketMsgMap = new ConcurrentHashMap<>();
-    private static final Logger logger = LogManager.getLogger(WebSocketEndpoint.class);
-    private static int onlineCount = 0;
-    private Session session;
-    private String userId;
-
-    private static ArchiveWebSocketClient archiveWebSocketClient;
-    private static BusinessWebSocketClient businessWebSocketClient;
-    private static ManagerWebSocketClient managerWebSocketClient;
-    private static MeterWebSocketClient meterWebSocketClient;
-
-    @Override
-    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
-        WebSocketEndpoint.archiveWebSocketClient = applicationContext.getBean(ArchiveWebSocketClient.class);
-        WebSocketEndpoint.businessWebSocketClient = applicationContext.getBean(BusinessWebSocketClient.class);
-        WebSocketEndpoint.managerWebSocketClient = applicationContext.getBean(ManagerWebSocketClient.class);
-        WebSocketEndpoint.meterWebSocketClient = applicationContext.getBean(MeterWebSocketClient.class);
-    }
-
-    @OnOpen
-    public void onOpen(Session session, @PathParam("userId") String userId) {
-        this.session = session;
-        this.userId = userId;
-
-        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
-        executorService.schedule(() -> {
-
-            webSocketMap.put(userId, this);
-
-            addOnlineCount();
-            logger.info("用户:{}连接成功,当前在线人数为{}人", userId, getOnlineCount());
-            if (ObjectUtil.isNotEmpty(this.session.getQueryString())) {
-                try {
-                    sendMessageByUserId(userId, this.session.getQueryString());
-                } catch (IOException e) {
-                    logger.error("IO异常");
-                } finally {
-                    executorService.shutdown();
-                }
-            }
-        }, 1, TimeUnit.SECONDS);
-    }
-
-
-    @OnClose
-    public void onClose() {
-        webSocketMap.remove(userId);
-        webSocketMsgMap.remove(userId);
-        subOnlineCount();
-        logger.info("用户:{}关闭了连接!当前在线人数为{}人", userId, getOnlineCount());
-    }
-
-    @OnMessage
-    public void onMessage(String message, Session session) {
-        if (StringUtils.isNotEmpty(userId)) {
-            String projectId, contractId, clientId;
-            if (!message.contains(",")) {
-                clientId = ClientIdConstant.MANAGER_CLIENT_ID;
-                projectId = "1";
-                contractId = "1";
-            } else {
-                projectId = message.split(",")[0];
-                contractId = message.split(",")[1];
-                clientId = message.split(",")[2];
-            }
-
-            if (StringUtils.isNotEmpty(clientId) && StringUtils.isNotEmpty(projectId) && StringUtils.isNotEmpty(contractId)) {
-
-                logger.info("来自用户:{} 消息:{}", userId, message);
-                webSocketMsgMap.put(userId, message);
-
-                Map<String, String> stringMap = new HashMap<>();
-                switch (clientId) {
-                    case ClientIdConstant.ARCHIVE_CLIENT_ID:
-                        stringMap = archiveWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
-                        break;
-                    case ClientIdConstant.MANAGER_CLIENT_ID:
-                        stringMap = managerWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
-                        break;
-                    case ClientIdConstant.BUSINESS_CLIENT_ID:
-                        stringMap = businessWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
-                        break;
-                    case ClientIdConstant.METER_CLIENT_ID:
-                        stringMap = meterWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
-                        break;
-                }
-                try {
-                    sendMessageByUserId(userId, JSON.toJSONString(stringMap));
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        } else {
-            logger.info("未获取到用户信息,接收消息失败");
-        }
-    }
-
-    @OnError
-    public void onError(Session session, Throwable error) {
-        logger.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
-        error.printStackTrace();
-    }
-
-    public void sendMessageByUserId(String userId, String message) throws IOException {
-        logger.info("服务端发送消息到用户:{},消息:{}", userId, message);
-        if (StrUtil.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
-            webSocketMap.get(userId).sendMessage(message);
-        } else {
-            logger.error("用户{}不在线", userId);
-        }
-    }
-
-    public void sendMessage(String message) {
-        this.session.getAsyncRemote().sendText(message);
-    }
-
-    public static synchronized int getOnlineCount() {
-        return onlineCount;
-    }
-
-    public static synchronized void addOnlineCount() {
-        WebSocketEndpoint.onlineCount++;
-    }
-
-    public static synchronized void subOnlineCount() {
-        WebSocketEndpoint.onlineCount--;
-    }
-
-    @Scheduled(cron = "0 0/5 * * * ?")
-    public void reSendMessage() {
-        if (webSocketMap.isEmpty() || webSocketMsgMap.isEmpty()) {
-            return;
-        }
-
-        logger.info("************************ 定时重发消息,reSendMessage()方法执行开始 ************************");
-
-        Set<Map.Entry<String, String>> messageMaps = webSocketMsgMap.entrySet();
-        for (Map.Entry<String, String> message : messageMaps) {
-            String userId = message.getKey();
-            String values = message.getValue();
-            if (values.contains(",") && StringUtils.isNotEmpty(userId)) {
-                String[] splitValues = values.split(",");
-                if (splitValues.length == 3) {
-                    String projectId = splitValues[0];
-                    String contractId = splitValues[1];
-                    String clientId = splitValues[2];
-                    if (StringUtils.isNotEmpty(clientId) && StringUtils.isNotEmpty(contractId) && StringUtils.isNotEmpty(projectId)) {
-                        Map<String, String> stringMap = new HashMap<>();
-                        switch (clientId) {
-                            case ClientIdConstant.ARCHIVE_CLIENT_ID:
-                                stringMap = archiveWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
-                                break;
-                            case ClientIdConstant.MANAGER_CLIENT_ID:
-                                stringMap = managerWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
-                                break;
-                            case ClientIdConstant.BUSINESS_CLIENT_ID:
-                                stringMap = businessWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
-                                break;
-                            case ClientIdConstant.METER_CLIENT_ID:
-                                stringMap = meterWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
-                                break;
-                        }
-                        if (stringMap.size() > 0) {
-                            try {
-                                sendMessageByUserId(userId, JSON.toJSONString(stringMap));
-                            } catch (IOException e) {
-                                e.printStackTrace();
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        logger.info("************************ 定时重发消息,reSendMessage()方法执行结束 ************************");
-    }
-
-}
+//package org.springblade.consumer.socket;
+//
+//import cn.hutool.core.util.ObjectUtil;
+//import cn.hutool.core.util.StrUtil;
+//import com.alibaba.fastjson.JSON;
+//import org.apache.commons.lang.StringUtils;
+//import org.apache.logging.log4j.LogManager;
+//import org.apache.logging.log4j.Logger;
+//import org.springblade.business.feign.BusinessWebSocketClient;
+//import org.springblade.common.constant.ClientIdConstant;
+//import org.springblade.feign.ArchiveWebSocketClient;
+//import org.springblade.manager.feign.ManagerWebSocketClient;
+//import org.springblade.meter.feign.MeterWebSocketClient;
+//import org.springframework.beans.BeansException;
+//import org.springframework.context.ApplicationContext;
+//import org.springframework.context.ApplicationContextAware;
+//import org.springframework.scheduling.annotation.Scheduled;
+//import org.springframework.stereotype.Component;
+//import org.springframework.web.bind.annotation.CrossOrigin;
+//
+//import javax.websocket.*;
+//import javax.websocket.server.PathParam;
+//import javax.websocket.server.ServerEndpoint;
+//import java.io.IOException;
+//import java.util.*;
+//import java.util.concurrent.ConcurrentHashMap;
+//import java.util.concurrent.Executors;
+//import java.util.concurrent.ScheduledExecutorService;
+//import java.util.concurrent.TimeUnit;
+//
+//@Component
+//@CrossOrigin
+//@ServerEndpoint(value = "/websocket/{userId}")
+//public class WebSocketEndpoint implements ApplicationContextAware {
+//
+//    private static final ConcurrentHashMap<String, WebSocketEndpoint> webSocketMap = new ConcurrentHashMap<>();
+//    private static final ConcurrentHashMap<String, String> webSocketMsgMap = new ConcurrentHashMap<>();
+//    private static final Logger logger = LogManager.getLogger(WebSocketEndpoint.class);
+//    private static int onlineCount = 0;
+//    private Session session;
+//    private String userId;
+//
+//    private static ArchiveWebSocketClient archiveWebSocketClient;
+//    private static BusinessWebSocketClient businessWebSocketClient;
+//    private static ManagerWebSocketClient managerWebSocketClient;
+//    private static MeterWebSocketClient meterWebSocketClient;
+//
+//    @Override
+//    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+//        WebSocketEndpoint.archiveWebSocketClient = applicationContext.getBean(ArchiveWebSocketClient.class);
+//        WebSocketEndpoint.businessWebSocketClient = applicationContext.getBean(BusinessWebSocketClient.class);
+//        WebSocketEndpoint.managerWebSocketClient = applicationContext.getBean(ManagerWebSocketClient.class);
+//        WebSocketEndpoint.meterWebSocketClient = applicationContext.getBean(MeterWebSocketClient.class);
+//    }
+//
+//    @OnOpen
+//    public void onOpen(Session session, @PathParam("userId") String userId) {
+//        this.session = session;
+//        this.userId = userId;
+//
+//        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+//        executorService.schedule(() -> {
+//
+//            webSocketMap.put(userId, this);
+//
+//            addOnlineCount();
+//            logger.info("用户:{}连接成功,当前在线人数为{}人", userId, getOnlineCount());
+//            if (ObjectUtil.isNotEmpty(this.session.getQueryString())) {
+//                try {
+//                    sendMessageByUserId(userId, this.session.getQueryString());
+//                } catch (IOException e) {
+//                    logger.error("IO异常");
+//                } finally {
+//                    executorService.shutdown();
+//                }
+//            }
+//        }, 1, TimeUnit.SECONDS);
+//    }
+//
+//
+//    @OnClose
+//    public void onClose() {
+//        webSocketMap.remove(userId);
+//        webSocketMsgMap.remove(userId);
+//        subOnlineCount();
+//        logger.info("用户:{}关闭了连接!当前在线人数为{}人", userId, getOnlineCount());
+//    }
+//
+//    @OnMessage
+//    public void onMessage(String message, Session session) {
+//        if (StringUtils.isNotEmpty(userId)) {
+//            String projectId, contractId, clientId;
+//            if (!message.contains(",")) {
+//                clientId = ClientIdConstant.MANAGER_CLIENT_ID;
+//                projectId = "1";
+//                contractId = "1";
+//            } else {
+//                projectId = message.split(",")[0];
+//                contractId = message.split(",")[1];
+//                clientId = message.split(",")[2];
+//            }
+//
+//            if (StringUtils.isNotEmpty(clientId) && StringUtils.isNotEmpty(projectId) && StringUtils.isNotEmpty(contractId)) {
+//
+//                logger.info("来自用户:{} 消息:{}", userId, message);
+//                webSocketMsgMap.put(userId, message);
+//
+//                Map<String, String> stringMap = new HashMap<>();
+//                switch (clientId) {
+//                    case ClientIdConstant.ARCHIVE_CLIENT_ID:
+//                        stringMap = archiveWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
+//                        break;
+//                    case ClientIdConstant.MANAGER_CLIENT_ID:
+//                        stringMap = managerWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
+//                        break;
+//                    case ClientIdConstant.BUSINESS_CLIENT_ID:
+//                        stringMap = businessWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
+//                        break;
+//                    case ClientIdConstant.METER_CLIENT_ID:
+//                        stringMap = meterWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
+//                        break;
+//                }
+//                try {
+//                    sendMessageByUserId(userId, JSON.toJSONString(stringMap));
+//                } catch (IOException e) {
+//                    e.printStackTrace();
+//                }
+//            }
+//        } else {
+//            logger.info("未获取到用户信息,接收消息失败");
+//        }
+//    }
+//
+//    @OnError
+//    public void onError(Session session, Throwable error) {
+//        logger.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
+//        error.printStackTrace();
+//    }
+//
+//    public void sendMessageByUserId(String userId, String message) throws IOException {
+//        logger.info("服务端发送消息到用户:{},消息:{}", userId, message);
+//        if (StrUtil.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
+//            webSocketMap.get(userId).sendMessage(message);
+//        } else {
+//            logger.error("用户{}不在线", userId);
+//        }
+//    }
+//
+//    public void sendMessage(String message) {
+//        this.session.getAsyncRemote().sendText(message);
+//    }
+//
+//    public static synchronized int getOnlineCount() {
+//        return onlineCount;
+//    }
+//
+//    public static synchronized void addOnlineCount() {
+//        WebSocketEndpoint.onlineCount++;
+//    }
+//
+//    public static synchronized void subOnlineCount() {
+//        WebSocketEndpoint.onlineCount--;
+//    }
+//
+//    @Scheduled(cron = "0 0/5 * * * ?")
+//    public void reSendMessage() {
+//        if (webSocketMap.isEmpty() || webSocketMsgMap.isEmpty()) {
+//            return;
+//        }
+//
+//        logger.info("************************ 定时重发消息,reSendMessage()方法执行开始 ************************");
+//
+//        Set<Map.Entry<String, String>> messageMaps = webSocketMsgMap.entrySet();
+//        for (Map.Entry<String, String> message : messageMaps) {
+//            String userId = message.getKey();
+//            String values = message.getValue();
+//            if (values.contains(",") && StringUtils.isNotEmpty(userId)) {
+//                String[] splitValues = values.split(",");
+//                if (splitValues.length == 3) {
+//                    String projectId = splitValues[0];
+//                    String contractId = splitValues[1];
+//                    String clientId = splitValues[2];
+//                    if (StringUtils.isNotEmpty(clientId) && StringUtils.isNotEmpty(contractId) && StringUtils.isNotEmpty(projectId)) {
+//                        Map<String, String> stringMap = new HashMap<>();
+//                        switch (clientId) {
+//                            case ClientIdConstant.ARCHIVE_CLIENT_ID:
+//                                stringMap = archiveWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
+//                                break;
+//                            case ClientIdConstant.MANAGER_CLIENT_ID:
+//                                stringMap = managerWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
+//                                break;
+//                            case ClientIdConstant.BUSINESS_CLIENT_ID:
+//                                stringMap = businessWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
+//                                break;
+//                            case ClientIdConstant.METER_CLIENT_ID:
+//                                stringMap = meterWebSocketClient.getWebsocketMsg(projectId, contractId, userId);
+//                                break;
+//                        }
+//                        if (stringMap.size() > 0) {
+//                            try {
+//                                sendMessageByUserId(userId, JSON.toJSONString(stringMap));
+//                            } catch (IOException e) {
+//                                e.printStackTrace();
+//                            }
+//                        }
+//                    }
+//                }
+//            }
+//        }
+//        logger.info("************************ 定时重发消息,reSendMessage()方法执行结束 ************************");
+//    }
+//
+//}

+ 54 - 0
blade-service/blade-websocket/pom.xml

@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.springblade</groupId>
+        <artifactId>blade-service</artifactId>
+        <version>2.9.1.RELEASE</version>
+    </parent>
+
+    <artifactId>blade-websocket</artifactId>
+    <name>${project.artifactId}</name>
+    <version>${bladex.project.version}</version>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springblade</groupId>
+            <artifactId>blade-websocket-api</artifactId>
+            <version>${bladex.project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springblade</groupId>
+            <artifactId>blade-meter-api</artifactId>
+            <version>${bladex.project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springblade</groupId>
+            <artifactId>blade-core-boot</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.springframework.boot</groupId>
+                    <artifactId>spring-boot-starter-tomcat</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+</project>

+ 20 - 0
blade-service/blade-websocket/src/main/java/org/springblade/websocket/WebsocketApplication.java

@@ -0,0 +1,20 @@
+package org.springblade.websocket;
+
+import org.springblade.common.constant.LauncherConstant;
+import org.springblade.core.cloud.feign.EnableBladeFeign;
+import org.springblade.core.launch.BladeApplication;
+import org.springframework.cache.annotation.EnableCaching;
+import org.springframework.cloud.client.SpringCloudApplication;
+import org.springframework.scheduling.annotation.EnableAsync;
+
+@EnableBladeFeign
+@SpringCloudApplication
+@EnableAsync
+@EnableCaching
+public class WebsocketApplication {
+
+    public static void main(String[] args) {
+        BladeApplication.run(LauncherConstant.APPLICATION_WEBSOCKET_NAME, WebsocketApplication.class, args);
+    }
+
+}

+ 25 - 0
blade-service/blade-websocket/src/main/java/org/springblade/websocket/config/WebSocketConfig.java

@@ -0,0 +1,25 @@
+package org.springblade.websocket.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
+import org.springframework.messaging.simp.config.ChannelRegistration;
+import org.springframework.messaging.simp.config.MessageBrokerRegistry;
+import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
+import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
+import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+import java.util.List;
+
+@Configuration
+public class WebSocketConfig {
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter(){
+        return  new ServerEndpointExporter();
+    }
+
+}

+ 39 - 0
blade-service/blade-websocket/src/main/java/org/springblade/websocket/controller/WebsocketController.java

@@ -0,0 +1,39 @@
+package org.springblade.websocket.controller;
+
+import lombok.AllArgsConstructor;
+//import org.springblade.websocket.service.WebSocketService;
+import org.springblade.core.tool.api.R;
+import org.springblade.core.tool.jackson.JsonUtil;
+import org.springblade.websocket.service.WebSocketService;
+import org.springblade.websocket.vo.MsgVO;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @Param
+ * @Author wangwl
+ * @Date 2024/7/11 10:47
+ **/
+@RestController
+@AllArgsConstructor
+@RequestMapping("/socket")
+public class WebsocketController {
+//
+//    private final WebSocketService socketService;
+//
+//    /**
+//     * 发送消息
+//     */
+//    @GetMapping("/sendMsg")
+//    public void sendMsg(Long userId){
+//        MsgVO vo = new MsgVO();
+//        vo.setType("msgRemind");
+//        Integer total = 5;
+//        vo.setData(5);
+////        socketService.sendMessage(userId, JsonUtil.toJson(R.data(vo)));
+//    }
+}

+ 22 - 0
blade-service/blade-websocket/src/main/java/org/springblade/websocket/feign/WebSocketClientImpl.java

@@ -0,0 +1,22 @@
+package org.springblade.websocket.feign;
+
+import lombok.AllArgsConstructor;
+import org.springblade.websocket.service.WebSocketService;
+import org.springblade.websocket.vo.UserInfoVO;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * @Param
+ * @Author wangwl
+ * @Date 2024/7/18 16:51
+ **/
+@AllArgsConstructor
+@RestController
+public class WebSocketClientImpl implements WebSocketClient{
+
+    private final WebSocketService socketService;
+    @Override
+    public void sendMsg(UserInfoVO vo) {
+        socketService.sendMessage(vo);
+    }
+}

+ 250 - 0
blade-service/blade-websocket/src/main/java/org/springblade/websocket/service/WebSocketService.java

@@ -0,0 +1,250 @@
+package org.springblade.websocket.service;
+
+import lombok.Data;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springblade.common.constant.ClientIdConstant;
+import org.springblade.core.tool.api.R;
+import org.springblade.core.tool.jackson.JsonUtil;
+import org.springblade.core.tool.utils.SpringUtil;
+import org.springblade.meter.feign.MeterWebSocketClient;
+import org.springblade.websocket.entity.WebSocketClientInfo;
+import org.springblade.websocket.vo.MsgVO;
+import org.springblade.websocket.vo.UserInfoVO;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+@ServerEndpoint(value = "/websocket/{system}/{projectId}/{contractId}/{userId}")
+@Component
+@Data
+public class WebSocketService {
+
+    private static MeterWebSocketClient meterWebSocketClient = null;
+
+    private static final Logger log = LoggerFactory.getLogger(WebSocketService.class);
+
+    /**静态变量,用来记录当前在线用户数。应该把它设计成线程安全的。*/
+    private static int onlineUserCount = 0;
+    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
+    private static int onlineLinkCount = 0;
+    /**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象*/
+    private static ConcurrentHashMap<Long, Map<String, WebSocketClientInfo>> webSocketMap = new ConcurrentHashMap<>();
+    /**concurrent包的线程安全Set,用来存放每个用户所在合同对应的session对象*/
+    private static ConcurrentHashMap<UserInfoVO, List<Session>> userInfoMap = new ConcurrentHashMap<>();
+
+    /**锁的映射表*/
+    private static final ConcurrentHashMap<Long, ReentrantLock> locks = new ConcurrentHashMap<>();
+
+    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
+    private Session session;
+    /**接收userId*/
+    private Long userId;
+    /**用户系统项目合同信息*/
+    private UserInfoVO vo;
+
+
+    /**
+     * 连接建立成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(Session session, @PathParam("system") String system, @PathParam("projectId") Long projectId,
+                       @PathParam("contractId") Long contractId,@PathParam("userId") Long userId) {
+        if (meterWebSocketClient == null){
+            meterWebSocketClient = SpringUtil.getBean(MeterWebSocketClient.class);
+        }
+        ReentrantLock lock = getLock(userId);
+        lock.lock();
+        this.session = session;
+        this.userId= userId;
+        //用户的连接信息
+        WebSocketClientInfo client = new WebSocketClientInfo();
+        client.setSession(session);
+        client.setUri(session.getRequestURI().toString());
+        Map<String, WebSocketClientInfo> userMap = new HashMap<>();
+        if(webSocketMap.containsKey(userId))
+        {
+            //当前用户在线,则先获取到当前用户所有页面
+            userMap = webSocketMap.get(userId);
+        }else {
+            //当前用户不在线
+            addOnlineUserCount(); // 在线数 +1
+        }
+        userMap.put(session.getId(),client);
+        webSocketMap.put(userId, userMap);
+        addOnlineLinkCount();
+        log.info("--------------------------------建立连接-----------------------------------------");
+        log.info("用户连接:"+userId+",当前用户总连接数:"+userMap.size());
+        log.info("当前在线人数为:" + getOnlineUserCount()+",当前总连接数:"+getOnlineLinkCount());
+        try {
+            sendMessage(JsonUtil.toJson(R.data(new MsgVO("msgLink","来自后台的反馈:连接成功"))));
+            this.vo = new UserInfoVO(system,projectId,contractId,userId);
+//            int randomNumber = (int)(Math.random() * 11);
+//            Thread.sleep(randomNumber);
+            List<Session> sessions = userInfoMap.get(vo);
+            if (sessions == null){
+                sessions = new ArrayList<>();
+            }
+            sessions.add(session);
+            userInfoMap.put(vo,sessions);
+            /** 向指定系统发送通知,如果当前用户有需要推送的消息则推送 */
+            switch (system) {
+                case ClientIdConstant.METER_ID:
+                    meterWebSocketClient.pushMsg(vo);
+                    break;
+            }
+        } catch (Exception e) {
+            log.error("用户:"+userId+",网络异常:"+e.getMessage());
+        }finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * 连接关闭调用的方法
+     */
+    @OnClose
+    public void onClose() {
+        //同一个userId,只能有一个线程操作
+//        try {
+//            Thread.sleep(10);
+//        } catch (InterruptedException e) {
+//            throw new RuntimeException(e);
+//        }
+        ReentrantLock lock = getLock(userId);
+        lock.lock();
+        try {
+            Map<String, WebSocketClientInfo> userMap = webSocketMap.get(userId);
+            if (userMap != null && userMap.size() > 0) {
+                log.info("--------------------------------断开连接---------------------------------------");
+                List<Session> sessions = userInfoMap.get(vo);
+                if (sessions != null && sessions.size() > 0){
+                    sessions.remove(session);
+                }
+                //先移除用户的连接
+                userMap.remove(session.getId());
+                subOnlineLinkCount();
+                //当前用户所有的连接都关闭时,移除当前用户
+                if (userMap.size() == 0) {
+                    webSocketMap.remove(userId);
+                    subOnlineUserCount();
+                    log.info(userId + "用户退出,当前在线人数为:" + getOnlineUserCount());
+                } else {
+                    log.info(userId + "用户断开了一个连接,用户当前剩余连接:" + userMap.size()+",当前在线人数为:" + getOnlineUserCount());
+                }
+            }
+        }catch (Exception e){
+            log.info(userId + "断开连接时发生异常");
+        }finally {
+            lock.unlock();
+        }
+
+    }
+
+    /**
+     * 收到客户端消息后调用的方法
+     * @param message 客户端发送过来的消息*/
+    @OnMessage
+    public void onMessage(String message, Session session) {
+        log.info("收到用户消息:"+userId+",报文:"+message);
+        //可以群发消息
+        //消息保存到数据库、redis
+        if(StringUtils.isNotBlank(message)){
+
+        }
+    }
+
+    /**
+     * @param session
+     * @param error
+     */
+    @OnError
+    public void onError(Session session, Throwable error) {
+        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
+        error.printStackTrace();
+    }
+
+    /**
+     * 连接服务器成功后主动推送
+     */
+    public void sendMessage(String msg) throws Exception {
+        synchronized (session){
+            this.session.getBasicRemote().sendText(msg);
+        }
+    }
+
+    /**
+     * 向指定客户端推送指定用户消息
+     */
+    public void sendMessage(UserInfoVO vo2){
+            List<Session> sessions = userInfoMap.get(vo2);
+            if(sessions != null && sessions.size() > 0){
+                /** 此处不能使用迭代器,否则在推送消息时,当前用户又开启一个页面,集合发生变化迭代器会报错ConcurrentModificationException*/
+                /** 如果数组越界,则复制一份出来循环发送*/
+                for (int i = 0; i < sessions.size(); i++) {
+                    Session s = sessions.get(i);
+                    if (s == null){
+                        continue;
+                    }
+                    RemoteEndpoint.Basic basicRemote = s.getBasicRemote();
+                    /** 推送消息时可能因为nginx配置的连接时间导致通道关闭,此时跳过推送给这个窗口*/
+                    if (basicRemote != null){
+                        try {
+                            basicRemote.sendText(vo2.getMsg());
+                        } catch (IOException e) {
+                            log.error(vo2.getUserId()+"推送消息失败,消息内容["+ vo2.getMsg()+"],原因:"+e.getMessage());
+                        }
+                    }
+                }
+            }
+    }
+
+    /**
+     *      获取锁的方法,如果锁不存在,则创建一个新的锁
+      */
+    public static ReentrantLock getLock(Long value) {
+        ReentrantLock lock = locks.get(value);
+        if (lock == null) {
+            lock = new ReentrantLock();
+            locks.put(value, lock);
+        }
+        return lock;
+    }
+
+    public static synchronized int getOnlineUserCount() {
+        return onlineUserCount;
+    }
+
+    public static synchronized void addOnlineUserCount() {
+        WebSocketService.onlineUserCount++;
+    }
+
+    public static synchronized void subOnlineUserCount() {
+        WebSocketService.onlineUserCount--;
+    }
+
+    public static synchronized int getOnlineLinkCount() {
+        return onlineLinkCount;
+    }
+
+    public static synchronized void addOnlineLinkCount() {
+        WebSocketService.onlineLinkCount++;
+    }
+
+    public static synchronized void subOnlineLinkCount() {
+        WebSocketService.onlineLinkCount--;
+    }
+
+
+
+}

+ 1 - 0
blade-service/pom.xml

@@ -30,6 +30,7 @@
         <module>blade-meter</module>
         <module>blade-rabbitmq-producer</module>
         <module>blade-rabbitmq-consumer</module>
+        <module>blade-websocket</module>
     </modules>
 
     <dependencies>