十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
netty的pipeline处理链上的handler:需要IdleStateHandler心跳检测channel是否可以,以及处理登录认证的UserAuthHandler和消息处理MessageHandler
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(defLoopGroup,
//编码解码器
new HttpServerCodec(),
//将多个消息转换成单一的消息对象
new HttpObjectAggregator(65536),
//支持异步发送大的码流,一般用于发送文件流
new ChunkedWriteHandler(),
//检测链路是否读空闲,配合心跳handler检测channel是否正常
new IdleStateHandler(60, 0, 0),
//处理握手和认证
new UserAuthHandler(),
//处理消息的发送
new MessageHandler()
);
}
对于所有连进来的channel,我们需要保存起来,往后的群发消息需要依靠这些channel
public static void addChannel(Channel channel) {
String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
System.out.println("addChannel:" + remoteAddr);
if (!channel.isActive()) {
logger.error("channel is not active, address: {}", remoteAddr);
}
UserInfo userInfo = new UserInfo();
userInfo.setAddr(remoteAddr);
userInfo.setChannel(channel);
userInfo.setTime(System.currentTimeMillis());
userInfos.put(channel, userInfo);
}
登录后,channel就变成有效的channel,无效的channel之后将会丢弃
public static boolean saveUser(Channel channel, String nick, String password) {
UserInfo userInfo = userInfos.get(channel);
if (userInfo == null) {
return false;
}
if (!channel.isActive()) {
logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick);
return false;
}
if (nick == null || password == null) {
return false;
}
LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password);
Account account = accountMapperStatic.selectOne(lambdaQueryWrapper);
if (account == null) {
return false;
}
// 增加一个认证用户
userCount.incrementAndGet();
userInfo.setNick(nick);
userInfo.setAuth(true);
userInfo.setId(account.getId());
userInfo.setUsername(account.getUsername());
userInfo.setGroupNumber(account.getGroupNumber());
userInfo.setTime(System.currentTimeMillis());
// 注册该用户推送消息的通道
offlineInfoTransmitStatic.registerPull(channel);
return true;
}
当channel关闭时,就不再接收消息。unregisterPull就是注销信息消费者,客户端不再接取聊天消息。此外,从下方有一个加写锁的操作,就是为了避免channel还在发送消息时,这边突然关闭channel,这样会导致报错。
public static void removeChannel(Channel channel) {
try {
logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel));
//加上读写锁保证移除channel时,避免channel关闭时,还有别的线程对其操作,造成错误
rwLock.writeLock().lock();
channel.close();
UserInfo userInfo = userInfos.get(channel);
if (userInfo != null) {
if (userInfo.isAuth()) {
offlineInfoTransmitStatic.unregisterPull(channel);
// 减去一个认证用户
userCount.decrementAndGet();
}
userInfos.remove(channel);
}
} finally {
rwLock.writeLock().unlock();
}
}
为了无缝切换使用rabbitmq、rocketmq、activemq、不使用中间件存储和转发聊天消息这4种状态,定义如下4个接口。依次是发送单聊消息、群聊消息、客户端启动接收消息、客户端下线不接收消息。
public interface OfflineInfoTransmit {
void pushP2P(Integer userId, String message);
void pushGroup(String groupNumber, String message);
void registerPull(Channel channel);
void unregisterPull(Channel channel);
}
其中,如何使用rabbitmq、rocketmq、activemq三种中间件中的一种来存储和转发聊天消息,它的处理流程如下:
代码地址:
https://github.com/shuangyueliao/netty-chat
另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。