Protobuf与Netty集成
Netty对于消息编码和解码、半包读写问题支持很好;protobuf支持多语言,编码后消息小利于存储和传输。
Netty对于Protobuf提供了很好的支持,我们在使用的过程中,可以直接使用Netty提供的Protobuf相关Handler,如:下面通过一个小demo记录使用的方法,与前面的代码类似,分别定义一个client和一个server程序。
代码组织方式:
编写Protobuf
Person.proto
syntax = "proto2";
package com.dongqiang.protobuf;
option optimize_for = SPEED;
option java_package = "com.dongqiang.netty.sixthexample";
option java_outer_classname = "MyDataInfo";
message Person {
required string name = 1;
required int32 age = 2;
optional string address = 3;
}
需要在命令行使用protoc命令将Person.proto文件生成Java文件,生成的Java文件MyDataInfo.java位于package com.dongqiang.netty.sixthexample下
客户端
package com.dongqiang.netty.sixthexample;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Created by dongqiang on 2017/6/24.
*/
public class TestClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ClientInitializer());
bootstrap.connect("localhost", 8899).channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
ClientInitializer.java
package com.dongqiang.netty.sixthexample;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
/**
* Created by dongqiang on 2017/6/24.
*/
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ClientHandler());
}
}
这里初始化器中添加了多个Protobuf相关的Handler,用来编解码Protobuf数据。
package com.dongqiang.netty.sixthexample;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* Created by dongqiang on 2017/6/24.
*/
public class ClientHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
MyDataInfo.Person person = MyDataInfo.Person.newBuilder()
.setName("zhangsan").setAge(22).setAddress("nanjing").build();
ctx.channel().writeAndFlush(person);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
在客户端的handler中,当连接上服务器时就发送一个消息,内容为protobuf格式对象。
服务端
TestServer.java
package com.dongqiang.netty.sixthexample;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* Created by dongqiang on 2017/6/24.
*/
public class TestServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
ServerInitializer.java
package com.dongqiang.netty.sixthexample;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
/**
* Created by dongqiang on 2017/6/24.
*/
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ServerHandler());
}
}
ServerHandler.java
package com.dongqiang.netty.sixthexample;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.UUID;
/**
* Created by dongqiang on 2017/6/24.
*/
public class ServerHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {
String name = msg.getName();
int age = msg.getAge();
String address = msg.getAddress();
System.out.println(name + ":" + age + ":" + address);
ctx.channel().writeAndFlush("server: " + UUID.randomUUID().toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
运行起来后,可以在服务端看到如下:
存在的问题:
如果多消息怎么处理?
1.自定义协议,比如前两位作为类型标志。
2.定义枚举