Parcourir la source

多线程优化,小时均值修改

zjs il y a 2 jours
Parent
commit
294d431cfc

+ 6 - 0
src/main/java/com/cn/esermis/config/RedisQueueService.java

@@ -19,7 +19,13 @@ public class RedisQueueService {
19 19
         listOps.rightPush(queueKey, message);
20 20
     }
21 21
 
22
+    public boolean isQueueEmpty(String queueKey) {
23
+        Long size = listOps.size(queueKey);
24
+        return size == 0;
25
+    }
26
+
22 27
     public Object receiveMessage(String queueKey) {
28
+
23 29
         return listOps.leftPop(queueKey);
24 30
     }
25 31
 

+ 1 - 0
src/main/java/com/cn/esermis/constant/Constant.java

@@ -26,6 +26,7 @@ public class Constant {
26 26
 
27 27
         public static final  String CN = "CN"; //大气环境污染源
28 28
         public static final  String SERVER = "server"; //大气环境污染源
29
+        public static final  String RECEIVEREPEAT = "receiverepeat"; //大气环境污染源
29 30
 
30 31
         public static final List<String> COMMAND = new ArrayList<>(Arrays.asList(HOUR, MINUTE)); // 解析数据编码
31 32
 

+ 68 - 146
src/main/java/com/cn/esermis/dpld/business/MonitorBusiness.java

@@ -6,6 +6,7 @@ import com.cn.esermis.coding.PollutionGas;
6 6
 import com.cn.esermis.constant.Constant;
7 7
 import com.cn.esermis.datawarn.common.ExceptionDataDispose;
8 8
 import com.cn.esermis.dpld.entity.*;
9
+import com.cn.esermis.dpld.mapper.AirMonitorHourRecordMapper;
9 10
 import com.cn.esermis.dpld.model.OutletAndFactorVo;
10 11
 import com.cn.esermis.dpld.service.*;
11 12
 import com.cn.esermis.model.CP;
@@ -18,8 +19,10 @@ import org.springframework.stereotype.Component;
18 19
 
19 20
 import javax.annotation.PostConstruct;
20 21
 import javax.annotation.Resource;
22
+import java.math.BigDecimal;
21 23
 import java.text.ParseException;
22 24
 import java.text.SimpleDateFormat;
25
+import java.time.LocalDateTime;
23 26
 import java.util.ArrayList;
24 27
 import java.util.Date;
25 28
 import java.util.List;
@@ -44,6 +47,8 @@ public class MonitorBusiness {
44 47
     @Resource
45 48
     private IAirMonitorHourRecordService airMonitorHourRecordService;
46 49
     @Resource
50
+    private AirMonitorHourRecordMapper airMonitorHourRecordMapper;
51
+    @Resource
47 52
     private IPostMarkingService postMarkingService;
48 53
     @Resource
49 54
     private RedisTemplate<String, Object> redisTemplate;
@@ -104,13 +109,9 @@ public class MonitorBusiness {
104 109
                             throw new RuntimeException(e);
105 110
                         }
106 111
                         try {
112
+                            // 计算时间
107 113
                             airMonitorMinuteRecord.setIsExceed(exceptionDataDispose.ExceedData(factorModel.getMN(), x.getFactor(), x.getAvg()!=null?x.getAvg().doubleValue():0));
108
-                            airMonitorMinuteRecord.setIsLose(exceptionDataDispose.LoseData(x.getMax()!=null?x.getMax().doubleValue():0,
109
-                                    x.getMin()!=null?x.getMin().doubleValue():0,
110
-                                    x.getAvg()!=null?x.getAvg().doubleValue():0));
111
-                            if ((airMonitorMinuteRecord.getIsExceed()!=null && !airMonitorMinuteRecord.getIsExceed().equals("1") )||
112
-                                    (airMonitorMinuteRecord.getIsLose()!=null && !airMonitorMinuteRecord.getIsLose().equals("1") )||
113
-                                    (airMonitorMinuteRecord.getIsSix()!=null && !airMonitorMinuteRecord.getIsSix().equals("1"))){
114
+                            if ((airMonitorMinuteRecord.getIsExceed()!=null && !airMonitorMinuteRecord.getIsExceed().equals("1"))){
114 115
                                 saveExcepLog(x, airMonitorMinuteRecord);
115 116
                             }
116 117
                         } catch (Exception e) {
@@ -121,52 +122,6 @@ public class MonitorBusiness {
121 122
                     });
122 123
                     airMonitorMinuteRecordService.saveBatch(listData);
123 124
                 }
124
-//                else if (factorModel.getCN().equals(Constant.REAL)) {
125
-//                    // 分钟数据入库
126
-//                    List<AirMonitorMinuteRecord> listData = new ArrayList<>();
127
-//                    cp.forEach(x -> {
128
-//                        AirMonitorMinuteRecord airMonitorMinuteRecord = new AirMonitorMinuteRecord();
129
-//                        airMonitorMinuteRecord.setQn(factorModel.getQN());
130
-//                        airMonitorMinuteRecord.setSt(factorModel.getST());
131
-//                        airMonitorMinuteRecord.setCn(factorModel.getCN());
132
-//                        airMonitorMinuteRecord.setPw(factorModel.getPW());
133
-//                        airMonitorMinuteRecord.setMn(factorModel.getMN());
134
-//                        airMonitorMinuteRecord.setFlag(factorModel.getFlag());
135
-//                        airMonitorMinuteRecord.setDataTime(factorModel.getDataTime());
136
-//                        airMonitorMinuteRecord.setFactor(x.getFactor());
137
-//                        airMonitorMinuteRecord.setMin(x.getMin());
138
-//                        airMonitorMinuteRecord.setMax(x.getMax());
139
-//                        airMonitorMinuteRecord.setAvg(x.getAvg());
140
-//                        airMonitorMinuteRecord.setCou(x.getCou());
141
-//                        airMonitorMinuteRecord.setFactorFlag(x.getFlag());
142
-//                        airMonitorMinuteRecord.setZsAvg(x.getZsAvg());
143
-//                        airMonitorMinuteRecord.setZsMax(x.getZsMax());
144
-//                        airMonitorMinuteRecord.setZsMin(x.getZsMin());
145
-//                        airMonitorMinuteRecord.setRtd(x.getRtd());
146
-//                        try {
147
-//                            airMonitorMinuteRecord.setWorkCondition(sendMessage(factorModel.getDataTime(),factorModel.getMN()));
148
-//                        } catch (ParseException e) {
149
-//                            throw new RuntimeException(e);
150
-//                        }
151
-//                        try {
152
-//                            airMonitorMinuteRecord.setIsExceed(exceptionDataDispose.ExceedData(factorModel.getMN(), x.getFactor(), x.getAvg()!=null?x.getAvg().doubleValue():0));
153
-//                            airMonitorMinuteRecord.setIsLose(exceptionDataDispose.LoseData(x.getMax()!=null?x.getMax().doubleValue():0,
154
-//                                    x.getMin()!=null?x.getMin().doubleValue():0,
155
-//                                    x.getAvg()!=null?x.getAvg().doubleValue():0));
156
-//                            if ((airMonitorMinuteRecord.getIsExceed()!=null && !airMonitorMinuteRecord.getIsExceed().equals("Y") )||
157
-//                                    (airMonitorMinuteRecord.getIsLose()!=null && !airMonitorMinuteRecord.getIsLose().equals("Y") )||
158
-//                                    (airMonitorMinuteRecord.getIsSix()!=null && !airMonitorMinuteRecord.getIsSix().equals("Y"))){
159
-//                                saveExcepLog(x, airMonitorMinuteRecord);
160
-//                            }
161
-//                        } catch (Exception e) {
162
-//                            log.error("调用 ExceptionDataDispose2 方法异常{} 当前参数为:{}",e,airMonitorMinuteRecord);
163
-//                        }
164
-//                        listData.add(airMonitorMinuteRecord);
165
-//                    });
166
-//                    if (!listData.isEmpty()) {
167
-//                        airMonitorMinuteRecordService.saveBatch(listData);
168
-//                    }
169
-//                }
170 125
                 else if (factorModel.getCN().equals(Constant.HOUR)) {
171 126
                     // 小时数据入库
172 127
                     List<AirMonitorHourRecord> listData = new ArrayList<>();
@@ -200,7 +155,6 @@ public class MonitorBusiness {
200 155
                                     x.getMin()!=null?x.getMin().doubleValue():0,
201 156
                                     x.getAvg()!=null?x.getAvg().doubleValue():0));
202 157
                             airMonitorHourRecord.setIsSix(exceptionDataDispose.SixData(factorModel.getMN(), x.getFactor(),x.getAvg()!=null?x.getAvg().doubleValue():0, factorModel.getDataTime()));
203
-//                            if (!airMonitorHourRecord.getIsExceed().equals("Y") || !airMonitorHourRecord.getIsLose().equals("Y") || !airMonitorHourRecord.getIsSix().equals("Y")){
204 158
                             if ((airMonitorHourRecord.getIsExceed()!=null && !airMonitorHourRecord.getIsExceed().equals("1") )||
205 159
                                     (airMonitorHourRecord.getIsLose()!=null && !airMonitorHourRecord.getIsLose().equals("1") )||
206 160
                                     (airMonitorHourRecord.getIsSix()!=null && !airMonitorHourRecord.getIsSix().equals("1"))){
@@ -241,12 +195,8 @@ public class MonitorBusiness {
241 195
                         }
242 196
                         try {
243 197
                             waterMonitorMinuteRecord.setIsExceed(exceptionDataDispose.ExceedData(factorModel.getMN(), x.getFactor(), x.getAvg()!=null?x.getAvg().doubleValue():0));
244
-                            waterMonitorMinuteRecord.setIsLose(exceptionDataDispose.LoseData(x.getMax()!=null?x.getMax().doubleValue():0,
245
-                                    x.getMin()!=null?x.getMin().doubleValue():0,
246
-                                    x.getAvg()!=null?x.getAvg().doubleValue():0));
247
-                            if ((waterMonitorMinuteRecord.getIsExceed()!=null && !waterMonitorMinuteRecord.getIsExceed().equals("1") )||
248
-                                    (waterMonitorMinuteRecord.getIsLose()!=null && !waterMonitorMinuteRecord.getIsLose().equals("1") )||
249
-                                    (waterMonitorMinuteRecord.getIsSix()!=null && !waterMonitorMinuteRecord.getIsSix().equals("1"))){
198
+
199
+                            if ((waterMonitorMinuteRecord.getIsExceed()!=null && !waterMonitorMinuteRecord.getIsExceed().equals("1"))){
250 200
                                 saveExcepLog(x, waterMonitorMinuteRecord);
251 201
                             }
252 202
                         } catch (Exception e) {
@@ -256,50 +206,6 @@ public class MonitorBusiness {
256 206
                     });
257 207
                     waterMonitorMinuteRecordService.saveBatch(listData);
258 208
                 }
259
-//                else if (factorModel.getCN().equals(Constant.REAL)) {
260
-//                    // 分钟数据入库
261
-//                    List<WaterMonitorMinuteRecord> listData = new ArrayList<>();
262
-//                    cp.forEach(x -> {
263
-//                        WaterMonitorMinuteRecord waterMonitorMinuteRecord = new WaterMonitorMinuteRecord();
264
-//                        waterMonitorMinuteRecord.setQn(factorModel.getQN());
265
-//                        waterMonitorMinuteRecord.setSt(factorModel.getST());
266
-//                        waterMonitorMinuteRecord.setCn(factorModel.getCN());
267
-//                        waterMonitorMinuteRecord.setPw(factorModel.getPW());
268
-//                        waterMonitorMinuteRecord.setMn(factorModel.getMN());
269
-//                        waterMonitorMinuteRecord.setFlag(factorModel.getFlag());
270
-//                        waterMonitorMinuteRecord.setDataTime(factorModel.getDataTime());
271
-//                        waterMonitorMinuteRecord.setFactor(x.getFactor());
272
-//                        waterMonitorMinuteRecord.setMin(x.getMin());
273
-//                        waterMonitorMinuteRecord.setMax(x.getMax());
274
-//                        waterMonitorMinuteRecord.setAvg(x.getAvg());
275
-//                        waterMonitorMinuteRecord.setCou(x.getCou());
276
-//                        waterMonitorMinuteRecord.setFactorFlag(x.getFlag());
277
-//                        waterMonitorMinuteRecord.setRtd(x.getRtd());
278
-//                        try {
279
-//                            waterMonitorMinuteRecord.setWorkCondition(sendMessage(factorModel.getDataTime(),factorModel.getMN()));
280
-//                        } catch (ParseException e) {
281
-//                            throw new RuntimeException(e);
282
-//                        }
283
-//                        try {
284
-//                            waterMonitorMinuteRecord.setIsExceed(exceptionDataDispose.ExceedData(factorModel.getMN(), x.getFactor(), x.getAvg()!=null?x.getAvg().doubleValue():0));
285
-//                            waterMonitorMinuteRecord.setIsLose(exceptionDataDispose.LoseData(x.getMax()!=null?x.getMax().doubleValue():0,
286
-//                                    x.getMin()!=null?x.getMin().doubleValue():0,
287
-//                                    x.getAvg()!=null?x.getAvg().doubleValue():0));
288
-//                            if ((waterMonitorMinuteRecord.getIsExceed()!=null && !waterMonitorMinuteRecord.getIsExceed().equals("Y") )||
289
-//                                    (waterMonitorMinuteRecord.getIsLose()!=null && !waterMonitorMinuteRecord.getIsLose().equals("Y") )||
290
-//                                    (waterMonitorMinuteRecord.getIsSix()!=null && !waterMonitorMinuteRecord.getIsSix().equals("Y"))){
291
-//                                saveExcepLog(x, waterMonitorMinuteRecord);
292
-//                            }
293
-//                        } catch (Exception e) {
294
-//                            log.error("调用 ExceptionDataDispose5 方法异常{} 当前参数为:{}",e,waterMonitorMinuteRecord);
295
-//                        }
296
-//
297
-//                        listData.add(waterMonitorMinuteRecord);
298
-//                    });
299
-//                    if (!listData.isEmpty()) {
300
-//                        waterMonitorMinuteRecordService.saveBatch(listData);
301
-//                    }
302
-//                }
303 209
                 else if
304 210
                 (factorModel.getCN().equals(Constant.HOUR)) {
305 211
                     // 小时数据入库
@@ -352,7 +258,7 @@ public class MonitorBusiness {
352 258
 
353 259
     private void saveExcepLog(CP x, AirMonitorMinuteRecord record) {
354 260
         ExcepLog excepLog = new ExcepLog();
355
-        Optional<OutletAndFactorVo> first = getloadData().stream().filter(d ->d.getFactor()!=null && d.getFactor().equals(x.getFactor())).findFirst();
261
+        Optional<OutletAndFactorVo> first = getloadData().stream().filter(d ->d.getMnCode()!=null && d.getMnCode().equals(record.getMn()) &&  d.getFactor()!=null && d.getFactor().equals(x.getFactor())).findFirst();
356 262
         if (first.isPresent()) {
357 263
             excepLog.setOutletName(first.get().getOutletName());
358 264
             excepLog.setOutletCode(first.get().getOutletCode());
@@ -363,20 +269,11 @@ public class MonitorBusiness {
363 269
             excepLog.setLoseData(record.getIsLose());
364 270
             excepLogService.save(excepLog);
365 271
         }
366
-        if (record.getIsLose()!=null && !record.getIsLose().equals("1")){
367
-            addExcepLog(record.getFactor(), "1");
368
-        }
369
-        if (record.getIsSix()!=null && !record.getIsSix().equals("1")){
370
-            addExcepLog(record.getFactor(), "2");
371
-        }
372
-        if (record.getIsExceed()!=null && !record.getIsExceed().equals("1")){
373
-            addExcepLog(record.getFactor(), "3");
374
-        }
375 272
     }
376 273
 
377 274
     private void saveExcepLog(CP x, AirMonitorHourRecord record) {
378 275
         ExcepLog excepLog = new ExcepLog();
379
-        Optional<OutletAndFactorVo> first = getloadData().stream().filter(d -> d.getFactor()!=null && d.getFactor().equals(x.getFactor())).findFirst();
276
+        Optional<OutletAndFactorVo> first = getloadData().stream().filter(d -> d.getMnCode()!=null && d.getMnCode().equals(record.getMn()) &&  d.getFactor()!=null && d.getFactor().equals(x.getFactor())).findFirst();
380 277
         if (first.isPresent()) {
381 278
             excepLog.setOutletName(first.get().getOutletName());
382 279
             excepLog.setOutletCode(first.get().getOutletCode());
@@ -386,21 +283,37 @@ public class MonitorBusiness {
386 283
             excepLog.setSixLast(record.getIsSix());
387 284
             excepLog.setLoseData(record.getIsLose());
388 285
             excepLogService.save(excepLog);
286
+            if (record.getIsLose()!=null && !record.getIsLose().equals("1")){
287
+                addExcepLog(record.getFactor(), "1",record.getMn());
288
+            }
289
+            if (record.getIsSix()!=null && !record.getIsSix().equals("1")){
290
+                addExcepLog(record.getFactor(), "2",record.getMn());
291
+            }
292
+            // 小时均值超限
293
+            String range = first.get().getRange();
294
+            if (range!=null && !range.isEmpty()){
295
+                if (range.contains("-")){
296
+                    String[] split = range.split("-");
297
+                    if (split.length > 2
298
+                            && ((isNumeric(range)  && new BigDecimal(split[0]).compareTo(x.getAvg()) < 0)
299
+                            || (isNumeric(range) && new BigDecimal(split[2]).compareTo(x.getAvg()) < 0))) {
300
+                        addExcepLog(record.getFactor(), "3",record.getMn());
301
+                    }
302
+                }else{
303
+                    if (isNumeric(range) && new BigDecimal(range).compareTo(x.getAvg()) > 0){
304
+                        addExcepLog(record.getFactor(), "3",record.getMn());
305
+                    }
306
+                }
307
+            }
389 308
         }
390
-        if (record.getIsLose()!=null && !record.getIsLose().equals("1")){
391
-            addExcepLog(record.getFactor(), "1");
392
-        }
393
-        if (record.getIsSix()!=null && !record.getIsSix().equals("1")){
394
-            addExcepLog(record.getFactor(), "2");
395
-        }
396
-        if (record.getIsExceed()!=null && !record.getIsExceed().equals("1")){
397
-            addExcepLog(record.getFactor(), "3");
398
-        }
309
+
399 310
     }
400 311
 
401 312
     private void saveExcepLog(CP x, WaterMonitorMinuteRecord record) {
402 313
         ExcepLog excepLog = new ExcepLog();
403
-        Optional<OutletAndFactorVo> first = getloadData().stream().filter(d ->d.getFactor()!=null &&  d.getFactor().equals(x.getFactor())).findFirst();
314
+        Optional<OutletAndFactorVo> first = getloadData().stream().filter(d ->
315
+                d.getMnCode()!=null && d.getMnCode().equals(record.getMn()) &&
316
+                d.getFactor()!=null &&  d.getFactor().equals(x.getFactor())).findFirst();
404 317
         if (first.isPresent()) {
405 318
             excepLog.setOutletName(first.get().getOutletName());
406 319
             excepLog.setOutletCode(first.get().getOutletCode());
@@ -411,19 +324,10 @@ public class MonitorBusiness {
411 324
             excepLog.setLoseData(record.getIsLose());
412 325
             excepLogService.save(excepLog);
413 326
         }
414
-        if (record.getIsLose()!=null && !record.getIsLose().equals("1")){
415
-            addExcepLog(record.getFactor(), "1");
416
-        }
417
-        if (record.getIsSix()!=null && !record.getIsSix().equals("1")){
418
-            addExcepLog(record.getFactor(), "2");
419
-        }
420
-        if (record.getIsExceed()!=null && !record.getIsExceed().equals("1")){
421
-            addExcepLog(record.getFactor(), "3");
422
-        }
423 327
     }
424 328
     private void saveExcepLog(CP x, WaterMonitorHourRecord record) {
425 329
         ExcepLog excepLog = new ExcepLog();
426
-        Optional<OutletAndFactorVo> first = getloadData().stream().filter(d -> d.getFactor()!=null && d.getFactor().equals(x.getFactor())).findFirst();
330
+        Optional<OutletAndFactorVo> first = getloadData().stream().filter(d -> d.getMnCode()!=null && d.getMnCode().equals(record.getMn()) && d.getFactor()!=null && d.getFactor().equals(x.getFactor())).findFirst();
427 331
         if (first.isPresent()) {
428 332
             excepLog.setOutletName(first.get().getOutletName());
429 333
             excepLog.setOutletCode(first.get().getOutletCode());
@@ -433,16 +337,34 @@ public class MonitorBusiness {
433 337
             excepLog.setSixLast(record.getIsSix());
434 338
             excepLog.setLoseData(record.getIsLose());
435 339
             excepLogService.save(excepLog);
340
+            if (record.getIsLose()!=null && !record.getIsLose().equals("1")){
341
+                addExcepLog(record.getFactor(), "1",record.getMn());
342
+            }
343
+            if (record.getIsSix()!=null && !record.getIsSix().equals("1")){
344
+                addExcepLog(record.getFactor(), "2",record.getMn());
345
+            }
346
+            // 小时均值超限
347
+            String range = first.get().getRange();
348
+            if (range!=null && !range.isEmpty()){
349
+                if (range.contains("-")){
350
+                    String[] split = range.split("-");
351
+                    if (split.length > 2
352
+                            && ((isNumeric(range)  && new BigDecimal(split[0]).compareTo(x.getAvg()) < 0)
353
+                            || (isNumeric(range) && new BigDecimal(split[2]).compareTo(x.getAvg()) < 0))) {
354
+                        addExcepLog(record.getFactor(), "3",record.getMn());
355
+                    }
356
+                }else{
357
+                    if (isNumeric(range) && new BigDecimal(range).compareTo(x.getAvg()) > 0){
358
+                        addExcepLog(record.getFactor(), "3",record.getMn());
359
+                    }
360
+                }
361
+            }
436 362
         }
437
-        if (record.getIsLose()!=null && !record.getIsLose().equals("1")){
438
-            addExcepLog(record.getFactor(), "1");
439
-        }
440
-        if (record.getIsSix()!=null && !record.getIsSix().equals("1")){
441
-            addExcepLog(record.getFactor(), "2");
442
-        }
443
-        if (record.getIsExceed()!=null && !record.getIsExceed().equals("1")){
444
-            addExcepLog(record.getFactor(), "3");
445
-        }
363
+
364
+    }
365
+
366
+    public boolean isNumeric(String str) {
367
+        return str.matches("\\d+(\\.\\d+)?"); // 正则表达式 \d 是数字字符,+ 表示一次或多次。
446 368
     }
447 369
 
448 370
     public Integer sendMessage(String time,String mn) throws ParseException {
@@ -488,9 +410,9 @@ public class MonitorBusiness {
488 410
     /**
489 411
      * 添加事后处置单信息
490 412
      */
491
-    public void addExcepLog(String factor,String dataType) {
413
+    public void addExcepLog(String factor,String dataType,String mn) {
492 414
         PostMarking postMarking = new PostMarking();
493
-        Optional<OutletAndFactorVo> first = getloadData().stream().filter(d -> d.getFactor()!=null && d.getFactor().equals(factor)).findFirst();
415
+        Optional<OutletAndFactorVo> first = getloadData().stream().filter(d -> d.getMnCode()!=null && d.getMnCode().equals(mn) && d.getFactor()!=null && d.getFactor().equals(factor)).findFirst();
494 416
         if (first.isPresent()) {
495 417
             postMarking.setOutletName(first.get().getOutletName());
496 418
             postMarking.setOutletNumber(first.get().getOutletCode());
@@ -510,7 +432,7 @@ public class MonitorBusiness {
510 432
                     break;
511 433
                 case "3":
512 434
                     //瞬时值是否为超标数据
513
-                    postMarking.setManagementContent("数据异常:监测因子【"+pollutionGas.mean()+"("+pollutionGas.code()+")】瞬时值超标。");
435
+                    postMarking.setManagementContent("数据异常:监测因子【"+pollutionGas.mean()+"("+pollutionGas.code()+")】小时均值超限。");
514 436
                     break;
515 437
             }
516 438
             postMarkingService.save(postMarking);

+ 5 - 0
src/main/java/com/cn/esermis/dpld/model/OutletAndFactorVo.java

@@ -19,10 +19,15 @@ public class OutletAndFactorVo {
19 19
 
20 20
 
21 21
     private String factor;
22
+
22 23
     private String mnCode;
24
+
23 25
     private String companyCode;
26
+
24 27
     private String departmentCode;
25 28
 
29
+    private String range;
30
+
26 31
 
27 32
 
28 33
 

+ 74 - 20
src/main/java/com/cn/esermis/server/Consumer.java

@@ -9,9 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired;
9 9
 import org.springframework.scheduling.annotation.Async;
10 10
 import org.springframework.stereotype.Component;
11 11
 
12
-import java.util.concurrent.ExecutorService;
13
-import java.util.concurrent.Executors;
14
-import java.util.concurrent.TimeUnit;
12
+import java.util.concurrent.*;
15 13
 
16 14
 /**
17 15
  * 消费者类
@@ -29,34 +27,90 @@ public class Consumer {
29 27
     public void receive() throws InterruptedException {
30 28
         System.out.println("开始消费消息");
31 29
         // 开启线程池,如果有数据就启动线程消费
32
-        int numberOfThreads = 8;
33 30
         int tolot = 0;
34
-        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
35 31
 
36
-        while (true){
32
+        int corePoolSize = 10; // 核心线程数
33
+        int maximumPoolSize = 50; // 最大线程数
34
+        long keepAliveTime = 60L; // 空闲线程存活时间(秒)
35
+        TimeUnit unit = TimeUnit.SECONDS; // 时间单位
36
+        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(200); // 工作队列
37
+        ThreadFactory threadFactory = Executors.defaultThreadFactory(); // 线程工厂
38
+        RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); // 拒绝策略
39
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(
40
+                corePoolSize,
41
+                maximumPoolSize,
42
+                keepAliveTime,
43
+                unit,
44
+                workQueue,
45
+                threadFactory,
46
+                handler
47
+        );
48
+
49
+        while (true) {
50
+            //取出消息
51
+//            Object message = redisQueueService.receiveMessage(Constant.SERVER);
52
+            boolean message = redisQueueService.isQueueEmpty(Constant.SERVER);
53
+            if (message) {
54
+//                System.out.println("消息队列为空");
55
+                try {
56
+                    Thread.sleep(1000 * 5);
57
+                } catch (InterruptedException e) {
58
+                }
59
+            } else {
60
+                tolot++;
61
+                int finalTolot = tolot;
62
+                Object mge = redisQueueService.receiveMessage(Constant.SERVER);
63
+                try {
64
+                    System.out.println("消费消息------------:第(" + finalTolot + ")条");
65
+                    JSONObject jsonObject = (JSONObject) JSON.toJSON(mge);
66
+                    monitorBusiness.inboundOperations(jsonObject);
67
+                } catch (Exception e) {
68
+                    redisQueueService.sendMessage(Constant.RECEIVEREPEAT, mge);
69
+                    Thread.currentThread().interrupt();
70
+                }
71
+//                executor.submit(() -> {
72
+//                    Object mge = redisQueueService.receiveMessage(Constant.SERVER);
73
+//                    try {
74
+//                        System.out.println("消费消息------------:第(" + finalTolot + ")条");
75
+//                        JSONObject jsonObject = (JSONObject) JSON.toJSON(mge);
76
+//                        monitorBusiness.inboundOperations(jsonObject);
77
+//                    } catch (Exception e) {
78
+//                        redisQueueService.sendMessage(Constant.RECEIVEREPEAT, mge);
79
+//                        Thread.currentThread().interrupt();
80
+//                    }
81
+//                });
82
+            }
83
+        }
84
+
85
+    }
86
+
87
+
88
+    @Async
89
+    public void receiveRepeat() throws InterruptedException {
90
+        System.out.println("开始消费消息多线程失败的信息");
91
+        // 开启线程池,如果有数据就启动线程消费
92
+//        int numberOfThreads = 8;
93
+        int tolot = 0;
94
+        while (true) {
37 95
             //取出消息
38
-            Object message = redisQueueService.receiveMessage(Constant.SERVER);
39
-            if (message==null || message.equals("")){
96
+//            Object message = redisQueueService.receiveMessage(Constant.SERVER);
97
+            boolean message = redisQueueService.isQueueEmpty(Constant.RECEIVEREPEAT);
98
+            if (message) {
40 99
 //                System.out.println("消息队列为空");
41 100
                 try {
42
-                    Thread.sleep(1000*5);
101
+                    Thread.sleep(1000 * 5);
43 102
                 } catch (InterruptedException e) {
44 103
                 }
45
-            }else{
104
+            } else {
46 105
                 tolot++;
47 106
                 int finalTolot = tolot;
48
-                executorService.submit(new Runnable() {
49
-                    @Override
50
-                    public void run() {
51
-                        // 数据入库
52
-//                        System.out.println("消费消息------------:"+message);
53
-                        System.out.println("消费消息------------:第("+ finalTolot+")条");
54
-                        JSONObject jsonObject = (JSONObject) JSON.toJSON(message);
55
-                        monitorBusiness.inboundOperations(jsonObject);
56
-                    }
57
-                });
107
+                Object mge = redisQueueService.receiveMessage(Constant.RECEIVEREPEAT);
108
+                System.out.println("消费消息------------:第(" + finalTolot + ")条");
109
+                JSONObject jsonObject = (JSONObject) JSON.toJSON(mge);
110
+                monitorBusiness.inboundOperations(jsonObject);
58 111
             }
59 112
         }
113
+
60 114
     }
61 115
 
62 116
 }

+ 2 - 1
src/main/resources/static/mybatis/DataAcquisitionInstrumentMapper.xml

@@ -9,10 +9,11 @@
9 9
                a.outlet_code outletCode,
10 10
                c.factor `factor`,
11 11
                a.mn_code `mnCode`,
12
+               c.range_str `range`,
12 13
                a.company_code `companyCode`,
13 14
                a.department_code `departmentCode`
14 15
         from data_acquisition_instrument a
15
-                 left join analytical_instruments b on a.id = b.data_acquisition_instrument_id
16
+                 INNER join analytical_instruments b on a.id = b.data_acquisition_instrument_id
16 17
                  INNER join monitoring_factor c on b.id = c.analytical_instrument_id
17 18
     </select>
18 19
     <select id="getCompony" resultType="java.util.Map">