Browse Source

防止重复数据 入队列

zjs 4 days ago
parent
commit
c41d60b989

+ 21 - 0
src/main/java/com/cn/esermis/config/RedisQueueService.java

@@ -2,13 +2,18 @@ package com.cn.esermis.config;
2
 
2
 
3
 import org.springframework.beans.factory.annotation.Autowired;
3
 import org.springframework.beans.factory.annotation.Autowired;
4
 import org.springframework.data.redis.core.ListOperations;
4
 import org.springframework.data.redis.core.ListOperations;
5
+import org.springframework.data.redis.core.RedisTemplate;
5
 import org.springframework.stereotype.Service;
6
 import org.springframework.stereotype.Service;
6
 
7
 
8
+import java.util.concurrent.TimeUnit;
9
+
7
 @Service
10
 @Service
8
 public class RedisQueueService {
11
 public class RedisQueueService {
9
 
12
 
10
     @Autowired
13
     @Autowired
11
     private ListOperations<String, Object> listOps;
14
     private ListOperations<String, Object> listOps;
15
+    @Autowired
16
+    private RedisTemplate<String, String> redisTemplate;
12
 
17
 
13
     public void sendMessage(String queueKey, Object message) {
18
     public void sendMessage(String queueKey, Object message) {
14
         listOps.rightPush(queueKey, message);
19
         listOps.rightPush(queueKey, message);
@@ -18,4 +23,20 @@ public class RedisQueueService {
18
         return listOps.leftPop(queueKey);
23
         return listOps.leftPop(queueKey);
19
     }
24
     }
20
 
25
 
26
+    public void addElementWithExpire(String key, String element, long expireTime, TimeUnit timeUnit) {
27
+        // 将元素添加到 Hash 中
28
+        redisTemplate.opsForHash().put("dataKey:"+key, element, element);
29
+        // 设置过期时间
30
+        redisTemplate.expire("dataKey:"+key, expireTime, timeUnit);
31
+    }
32
+
33
+    public boolean isElementExist(String key, String element) {
34
+        // 检查元素是否存在
35
+        return redisTemplate.opsForHash().hasKey("dataKey:"+key, element);
36
+    }
37
+
38
+    public void removeElement(String key, String element) {
39
+        // 删除元素
40
+        redisTemplate.opsForHash().delete("dataKey:"+key, element);
41
+    }
21
 }
42
 }

+ 15 - 2
src/main/java/com/cn/esermis/netty/BootNettyChannelInboundHandlerAdapter.java

@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
23
 import java.util.Date;
23
 import java.util.Date;
24
 import java.util.HashMap;
24
 import java.util.HashMap;
25
 import java.util.Map;
25
 import java.util.Map;
26
+import java.util.concurrent.TimeUnit;
26
 
27
 
27
 @Log4j2
28
 @Log4j2
28
 @Component
29
 @Component
@@ -91,7 +92,13 @@ public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandler
91
                             ||(jsonObject!=null && jsonObject.getString(Constant.CN).equals(Constant.HOUR))){
92
                             ||(jsonObject!=null && jsonObject.getString(Constant.CN).equals(Constant.HOUR))){
92
 //                        adapter.publisherService.pubMsg(jsonObject);
93
 //                        adapter.publisherService.pubMsg(jsonObject);
93
                         if (ObjectUtils.notEqual(jsonObject.get("CN"),Constant.MINUTE_STR)){
94
                         if (ObjectUtils.notEqual(jsonObject.get("CN"),Constant.MINUTE_STR)){
94
-                            adapter.redisQueueService.sendMessage(Constant.SERVER, jsonObject);
95
+                            String CN = jsonObject.getString("CN");
96
+                            String MN = jsonObject.getString("MN");
97
+                            String dataTime = jsonObject.getString("DataTime");
98
+                            if (!adapter.redisQueueService.isElementExist(MN +":"+ CN +":"+ dataTime, dataTime)){
99
+                                adapter.redisQueueService.addElementWithExpire(MN +":"+ CN +":"+ dataTime, dataTime, 2, TimeUnit.HOURS);
100
+                                adapter.redisQueueService.sendMessage(Constant.SERVER, jsonObject);
101
+                            }
95
                         }
102
                         }
96
                     }
103
                     }
97
                 } else {
104
                 } else {
@@ -102,7 +109,13 @@ public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandler
102
                         if ((jsonObject!=null &&  jsonObject.getString(Constant.CN).equals(Constant.MINUTE))
109
                         if ((jsonObject!=null &&  jsonObject.getString(Constant.CN).equals(Constant.MINUTE))
103
                                 || (jsonObject!=null && jsonObject.getString(Constant.CN).equals(Constant.HOUR))){
110
                                 || (jsonObject!=null && jsonObject.getString(Constant.CN).equals(Constant.HOUR))){
104
                             if (ObjectUtils.notEqual(jsonObject.get("CN"),Constant.MINUTE_STR)) {
111
                             if (ObjectUtils.notEqual(jsonObject.get("CN"),Constant.MINUTE_STR)) {
105
-                                adapter.redisQueueService.sendMessage(Constant.SERVER, jsonObject);
112
+                                String CN = jsonObject.getString("CN");
113
+                                String MN = jsonObject.getString("MN");
114
+                                String dataTime = jsonObject.getString("DataTime");
115
+                                if (!adapter.redisQueueService.isElementExist(MN +":"+ CN +":"+ dataTime, dataTime)){
116
+                                    adapter.redisQueueService.addElementWithExpire(MN +":"+ CN +":"+ dataTime, dataTime, 2, TimeUnit.HOURS);
117
+                                    adapter.redisQueueService.sendMessage(Constant.SERVER, jsonObject);
118
+                                }
106
                             }
119
                             }
107
 //                            adapter.publisherService.pubMsg(jsonObject);
120
 //                            adapter.publisherService.pubMsg(jsonObject);
108
                         }
121
                         }