소스 검색

回复消息 bug修改

zjs 4 일 전
부모
커밋
b68e515225

+ 8 - 1
pom.xml

@@ -204,7 +204,14 @@
204 204
 			<artifactId>xmlbeans</artifactId>
205 205
 			<version>5.1.1</version>
206 206
 		</dependency>
207
-    </dependencies>
207
+		<!-- sharding-jdbc依赖 -->
208
+		<dependency>
209
+			<groupId>org.apache.shardingsphere</groupId>
210
+			<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
211
+			<version>4.0.0-RC1</version>
212
+		</dependency>
213
+
214
+	</dependencies>
208 215
 
209 216
 
210 217
 	<dependencyManagement>

+ 21 - 0
src/main/java/com/cn/esermis/config/RedisQueueService.java

@@ -2,13 +2,18 @@ package com.cn.esermis.config;
2 2
 
3 3
 import org.springframework.beans.factory.annotation.Autowired;
4 4
 import org.springframework.data.redis.core.ListOperations;
5
+import org.springframework.data.redis.core.RedisTemplate;
5 6
 import org.springframework.stereotype.Service;
6 7
 
8
+import java.util.concurrent.TimeUnit;
9
+
7 10
 @Service
8 11
 public class RedisQueueService {
9 12
 
10 13
     @Autowired
11 14
     private ListOperations<String, Object> listOps;
15
+    @Autowired
16
+    private RedisTemplate<String, String> redisTemplate;
12 17
 
13 18
     public void sendMessage(String queueKey, Object message) {
14 19
         listOps.rightPush(queueKey, message);
@@ -18,4 +23,20 @@ public class RedisQueueService {
18 23
         return listOps.leftPop(queueKey);
19 24
     }
20 25
 
26
+    public void addElementWithExpire(String key, String element, long expireTime, TimeUnit timeUnit) {
27
+        // 将元素添加到 Hash 中
28
+        redisTemplate.opsForHash().put("dataKey:"+key, element, element);
29
+        // 设置过期时间
30
+        redisTemplate.expire("dataKey:"+key, expireTime, timeUnit);
31
+    }
32
+
33
+    public boolean isElementExist(String key, String element) {
34
+        // 检查元素是否存在
35
+        return redisTemplate.opsForHash().hasKey("dataKey:"+key, element);
36
+    }
37
+
38
+    public void removeElement(String key, String element) {
39
+        // 删除元素
40
+        redisTemplate.opsForHash().delete("dataKey:"+key, element);
41
+    }
21 42
 }

+ 15 - 2
src/main/java/com/cn/esermis/netty/BootNettyChannelInboundHandlerAdapter.java

@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
23 23
 import java.util.Date;
24 24
 import java.util.HashMap;
25 25
 import java.util.Map;
26
+import java.util.concurrent.TimeUnit;
26 27
 
27 28
 @Log4j2
28 29
 @Component
@@ -91,7 +92,13 @@ public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandler
91 92
                             ||(jsonObject!=null && jsonObject.getString(Constant.CN).equals(Constant.HOUR))){
92 93
 //                        adapter.publisherService.pubMsg(jsonObject);
93 94
                         if (ObjectUtils.notEqual(jsonObject.get("CN"),Constant.MINUTE_STR)){
94
-                            adapter.redisQueueService.sendMessage(Constant.SERVER, jsonObject);
95
+                            String CN = jsonObject.getString("CN");
96
+                            String MN = jsonObject.getString("MN");
97
+                            String dataTime = jsonObject.getString("DataTime");
98
+                            if (!adapter.redisQueueService.isElementExist(CN + MN + dataTime, dataTime)){
99
+                                adapter.redisQueueService.addElementWithExpire(CN+MN+dataTime, dataTime, 1, TimeUnit.HOURS);
100
+                                adapter.redisQueueService.sendMessage(Constant.SERVER, jsonObject);
101
+                            }
95 102
                         }
96 103
                     }
97 104
                 } else {
@@ -102,7 +109,13 @@ public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandler
102 109
                         if ((jsonObject!=null &&  jsonObject.getString(Constant.CN).equals(Constant.MINUTE))
103 110
                                 || (jsonObject!=null && jsonObject.getString(Constant.CN).equals(Constant.HOUR))){
104 111
                             if (ObjectUtils.notEqual(jsonObject.get("CN"),Constant.MINUTE_STR)) {
105
-                                adapter.redisQueueService.sendMessage(Constant.SERVER, jsonObject);
112
+                                String CN = jsonObject.getString("CN");
113
+                                String MN = jsonObject.getString("MN");
114
+                                String dataTime = jsonObject.getString("DataTime");
115
+                                if (!adapter.redisQueueService.isElementExist(CN + MN + dataTime, dataTime)){
116
+                                    adapter.redisQueueService.addElementWithExpire(CN+MN+dataTime, dataTime, 1, TimeUnit.HOURS);
117
+                                    adapter.redisQueueService.sendMessage(Constant.SERVER, jsonObject);
118
+                                }
106 119
                             }
107 120
 //                            adapter.publisherService.pubMsg(jsonObject);
108 121
                         }

+ 9 - 2
src/main/java/com/cn/esermis/server/Consumer.java

@@ -8,7 +8,9 @@ import com.cn.esermis.dpld.business.MonitorBusiness;
8 8
 import org.springframework.beans.factory.annotation.Autowired;
9 9
 import org.springframework.scheduling.annotation.Async;
10 10
 import org.springframework.stereotype.Component;
11
+import redis.clients.jedis.Jedis;
11 12
 
13
+import java.util.UUID;
12 14
 import java.util.concurrent.ExecutorService;
13 15
 import java.util.concurrent.Executors;
14 16
 import java.util.concurrent.TimeUnit;
@@ -23,7 +25,10 @@ public class Consumer {
23 25
     private MonitorBusiness monitorBusiness;
24 26
     @Autowired
25 27
     private RedisQueueService redisQueueService;
26
-
28
+//    @Autowired
29
+//    private Jedis jedis;
30
+//    private String lockKey = "serverlockey";
31
+//    private int lockTimeout = 10;
27 32
 
28 33
     @Async
29 34
     public void receive() throws InterruptedException {
@@ -32,7 +37,9 @@ public class Consumer {
32 37
         int numberOfThreads = 8;
33 38
         int tolot = 0;
34 39
         ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
35
-
40
+//        String identifier = UUID.randomUUID().toString();
41
+//        String lockValue = identifier;
42
+//        long end = System.currentTimeMillis() + lockTimeout;
36 43
         while (true){
37 44
             //取出消息
38 45
             Object message = redisQueueService.receiveMessage(Constant.SERVER);

+ 56 - 83
src/main/resources/application.yml

@@ -14,19 +14,62 @@ spring:
14 14
 #    url: ${DB_URL:jdbc:oracle:thin:@120.77.183.31:1521:xe}
15 15
 #    username: ${DB_USERNAME:CNOOC_JNJP}
16 16
 #    password: ${DB_PASSWORD:cnooc}
17
-  datasource:
18
-    dynamic:
19
-      primary: master
20
-      strict: false
21
-      datasource:
22
-        master:
23
-          driver-class-name: com.mysql.cj.jdbc.Driver
24
-          url: jdbc:mysql://39.105.121.97:13306/scy?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
25
-#          url: jdbc:mysql://192.168.3.19:13306/scy?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
26
-          username: root
17
+#  datasource:
18
+#    dynamic:
19
+#      primary: master
20
+#      strict: false
21
+#      datasource:
22
+#        master:
23
+#          driver-class-name: com.mysql.cj.jdbc.Driver
24
+#          url: jdbc:mysql://39.105.121.97:13306/scy?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
25
+##          url: jdbc:mysql://192.168.3.19:13306/scy?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
26
+#          username: root
27
+##          password: Eitc@gnhz702
27 28
 #          password: Eitc@gnhz702
28
-          password: Eitc@gnhz702
29
-
29
+  shardingsphere:
30
+    props:
31
+      #d打印Sql语句
32
+      sql-show: true
33
+    datasource:
34
+      #创建我们的ds0数据源
35
+      scy:
36
+        #下边这些都是老套路了
37
+        driver-class-name: com.mysql.cj.jdbc.Driver
38
+        jdbc-url: jdbc:mysql://192.168.3.19:3366/scy?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
39
+#        jdbc-url: jdbc:mysql://39.105.121.97:13306/scy?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
40
+        password: Eitc@gnhz702
41
+        type: com.zaxxer.hikari.HikariDataSource
42
+        username: root
43
+      #创建我们的ds1数据源
44
+#      ds1:
45
+#        #一样的老套路
46
+#        driver-class-name: com.mysql.cj.jdbc.Driver
47
+#        jdbc-url: jdbc:mysql://39.105.121.97:13306/scy?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
48
+#        password: Eitc@gnhz702
49
+#        type: com.zaxxer.hikari.HikariDataSource
50
+#        username: root
51
+      names: scy
52
+#指定 course 表分布情况,配置表在哪个数据库里面,表名称都是什么 m1.course_1 ,m1.course_2
53
+    sharding:
54
+      tables:
55
+        #scs_product可以任意命名,sql中一致,为了方便理解,这里一般写分表共有的
56
+        # 比如我的scs_product_1  和 scs_product_2  那就写scs_product好区分
57
+        # 当执行sql中出现scs_product,sharding-jdbc会将其操作到对应的表
58
+        air_monitor_minute_record:
59
+          actual-data-nodes: scy.air_monitor_minute_record_$->{1..2}
60
+          # 指定主键生成策略为雪花id,全局主键
61
+          key-generator:
62
+            column: id
63
+            type: SNOWFLAKE
64
+          #指定scs_product表的分片策略,  分片键和分片算法 用于计算真正的表名
65
+          table-strategy:
66
+            inline:
67
+              # 偶数进到scs_product_1  奇数进到scs_product_2
68
+              # 对于根据分片字段为条件的,会先判断是否涉及两张表,
69
+              # 如何是两张表则会两个表都查,如果只涉及单表,则只查询一张表。
70
+              algorithm-expression: air_monitor_minute_record_$->{id % 2 + 1}
71
+              # 指定分片键为PRODUCT_ID
72
+              sharding-column: id
30 73
   # Redis 配置
31 74
   redis:
32 75
     timeout: 30000 # 连接超时时间(毫秒)
@@ -47,74 +90,4 @@ spring:
47 90
 mybatis-plus:
48 91
   mapper-locations: classpath:static/**/*.xml
49 92
   configuration:
50
-    log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl # 指定 MyBatis 使用 SLF4J 日志实现
51
-#logging:
52
-#  level:
53
-#    cnooc.esermis: DEBUG
54
-#    com.baomidou.mybatisplus.core.mapper: debug # 设置 Mapper 接口的日志级别为 DEBUG
55
-  ## 工程设置
56
-#project:
57
-#  debug: false
58
-#  # 上传文件目录
59
-#  uploadDir: ${UPLOAD_DIR:./uploads}
60
-#  userspaceDir: ${USERSPACE_DIR:./userspace}
61
-#
62
-## 全局常量
63
-#constants:
64
-#  defaultPassword: P@ssw0rd
65
-#mybatis
66
-#mybatis-plus:
67
-#  mapper-locations: classpath:/static/mybatis/*.xml,classpath:/static/mybatis/gf/*.xml,classpath:/static/mybatis/leg/*.xml
68
-#  #实体扫描,多个package用逗号或者分号分隔
69
-#  typeAliasesPackage: com.cnooc.esermis.entity,com.cnooc.esermis.gf.entity,com.cnooc.esermis.gf.sewage.entity,com.cnooc.esermis.leg.entity
70
-#  typeEnumsPackage: com.cnooc.esermis.entity.enums,com.cnooc.esermis.gf.state
71
-#  typeHandlersPackage: com.cnooc.esermis.gf.handler
72
-#  global-config:
73
-#    # 数据库相关配置
74
-#    db-config:
75
-#      #主键类型  AUTO:"数据库ID自增", INPUT:"用户输入ID",ID_WORKER:"全局唯一ID (数字类型唯一ID)", ASSIGN_UUID:"全局唯一ID ASSIGN_UUID";
76
-#      id-type: INPUT # 李原:将ID_WORKER修改为INPUT 以配合每个表自己配置ID生成策略
77
-#      #字段策略 IGNORED:"忽略判断",NOT_NULL:"非 NULL 判断"),NOT_EMPTY:"非空判断"
78
-#      field-strategy: NOT_NULL
79
-#      #驼峰下划线转换
80
-#      column-underline: false
81
-#      #数据库大写下划线转换
82
-#      capital-mode: false
83
-#      #逻辑删除配置
84
-#      logic-delete-value: 0
85
-#      logic-not-delete-value: 1
86
-#      db-type: oracle
87
-#    #刷新mapper 调试神器
88
-#    refresh: true
89
-#  # 原生配置
90
-#  configuration:
91
-#    map-underscore-to-camel-case: true
92
-#    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
93
-#    cache-enabled: false
94
-
95
-  ##日志配置
96
-
97
-
98
-#线程池
99
-##线程池维护线程的最少数量 -
100
-#consumer:
101
-#  corePoolSize: 5
102
-#  #线程池维护线程的最大数量
103
-#  maxPoolSize: 150
104
-#  #线程池维护线程所允许的空闲时间
105
-#  keepAliveSeconds: 500
106
-#  #线程池所使用的缓冲队列
107
-#  queueCapacity: 80
108
-## 邮件通知
109
-#emailsettings:
110
-#  # 设置为debug模式, 可以查看详细的发送 log
111
-#  debug: true
112
-#  #发送人账户服务器信息
113
-#  host: cmail.cnooc.com.cn #10.68.81.34
114
-#  port: 25
115
-#  senderAccount: sy_huanbao
116
-#  senderPassword: swu5!DHW11
117
-#  from: 环保系统自动邮件<sy_huanbao@cnooc.com.cn>
118
-
119
-
120
-
93
+    log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl # 指定 MyBatis 使用 SLF4J 日志实现