|
@@ -112,61 +112,10 @@ public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandler
|
112
|
112
|
dataMap.put(ctx.channel().id().toString(), startStr);
|
113
|
113
|
}
|
114
|
114
|
}
|
115
|
|
-// System.out.println("-----------"+adapter.redisQueueService.receiveMessage(Constant.SERVER));
|
116
|
|
-
|
117
|
|
-// ctx.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
|
118
|
|
-// //转成String
|
119
|
|
-// String data = (String) msg;
|
120
|
|
-// //替换掉 换行 和 空格
|
121
|
|
-// data = data.replaceAll("\r|\n", "");
|
122
|
|
-// //获取通道ID 并打印
|
123
|
|
-// String channelId = ctx.channel().id().toString();
|
124
|
|
-// System.out.println("channelId=" + channelId + "data=" + data);
|
125
|
|
-//
|
126
|
|
-//
|
127
|
|
-// // 这里我将通道id作为code来使用,实际是需要msg里来摘取的客户端数据里的唯一值的
|
128
|
|
-//
|
129
|
|
-// // 如果没有则创建 如果有,更新data值
|
130
|
|
-// BootNettyChannel b = BootNettyChannelCache.get("server:" + channelId);
|
131
|
|
-// if (b == null) {
|
132
|
|
-// //创建通道
|
133
|
|
-// BootNettyChannel bnc = new BootNettyChannel();
|
134
|
|
-// bnc.setChannel(ctx.channel());
|
135
|
|
-// //设置通道编码
|
136
|
|
-// bnc.setCode("server:" + channelId);
|
137
|
|
-// bnc.setReport_last_data(data);
|
138
|
|
-// //保存通道
|
139
|
|
-// BootNettyChannelCache.save("server:" + channelId, bnc);
|
140
|
|
-// } else {
|
141
|
|
-// //更新data值什么意思呢?
|
142
|
|
-// b.setReport_last_data(data);
|
143
|
|
-// }
|
144
|
|
-// //回写给客户端
|
145
|
|
-// ctx.writeAndFlush(Unpooled.buffer().writeBytes(("server:" + channelId).getBytes()));
|
146
|
|
-// // netty的编码已经指定,因此可以不需要再次确认编码
|
147
|
|
-// // ctx.writeAndFlush(Unpooled.buffer().writeBytes(channelId.getBytes(CharsetUtil.UTF_8)));
|
148
|
|
-//
|
149
|
|
-// //直接这样写入也可以
|
150
|
|
-// //ctx.write("我是服务端,我收到你的消息了!");
|
151
|
|
-// //ctx.flush();
|
152
|
|
-// in.release();
|
153
|
115
|
} catch (Exception e) {
|
154
|
116
|
System.out.println("channelRead--" + e.toString());
|
155
|
117
|
}
|
156
|
118
|
}
|
157
|
|
-
|
158
|
|
- public String convertByteBufToString(ByteBuf buf) {
|
159
|
|
- String str;
|
160
|
|
- if (buf.hasArray()) { // 处理堆缓冲区
|
161
|
|
- str = new String(buf.array(), buf.arrayOffset() + buf.readerIndex(), buf.readableBytes());
|
162
|
|
- } else { // 处理直接缓冲区以及复合缓冲区
|
163
|
|
- byte[] bytes = new byte[buf.readableBytes()];
|
164
|
|
- buf.getBytes(buf.readerIndex(), bytes);
|
165
|
|
- str = new String(bytes, 0, buf.readableBytes());
|
166
|
|
- }
|
167
|
|
- return str;
|
168
|
|
- }
|
169
|
|
-
|
170
|
119
|
/**
|
171
|
120
|
* 从客户端收到新的数据、读取完成时调用
|
172
|
121
|
*/
|