发送方需要将类对象转换成二进制压入项目自定义数据结构,接收方接收到数据后,需要将二进制转换成类对象。
项目接收方通过消息号和回执 ID,可以知道该消息如何去处理。
prorostuff 相比.proto 文件,不用先转换和编译.proto 文件,不需要做一层中间文件的转换。这层中间文件经常会和版本严格匹配时出现格式语法错误。
Pom 如下:
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.4.0</version>
</dependency>
netty 只用 4 是 Java 网络的一个主流方式
这里需要通过继承 Netty4MessageToByteEncoder Message 是 Bean 对象,根据自己项目定制来的
public class MessageEncoder extends MessageToByteEncoder<NettyMessage>{
@Override
protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf out) throws Exception {
//ProrostuffSerialize提供了序列化方式。
out.writeBytes(ProrostuffSerialize.serialize(msg));
}
}
关于对 ProrostuffSerialize 实现,本身还是创建有序缓存空间,schema 流式,提供正反序列化方式。
import com.dyuproject.protostuff.LinkedBuffer;
下面的 ProtostuffIOUtil 等都是这个包下面的。
public class ProrostuffSerialize{
private static class SerializeObject{
private Object target;
}
@SuppressWarnings("unchecked")
public static byte[] serialize(Object object) {
SerializeObject serializeObject = new SerializeObject();
serializeObject .target = object;
Class<serializeObject > serializeObjectClass = (Class<serializeObject >) serializeObject.getClass();
LinkedBuffer buffer= LinkedBuffer.allocate(1024 * 4);
try {
Schema<serializeObject > schema = RuntimeSchema.getSchema(serializeObjectClass );
return ProtostuffIOUtil.toByteArray(serializeObject , schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
linkedBuffer.clear();
}
}
//clazz泛型是Bean对象Message
@SuppressWarnings("unchecked")
public static <T> T deserialize(byte[] data, Class<T> clazz) {
try {
//创建SerializeObject的schema对象
Schema<SerializeObject> schema = RuntimeSchema.getSchema(SerializeObject.class);
SerializeObject serializeObject= schema.newMessage();
ProtostuffIOUtil.mergeFrom(data, serializeObject, schema);
return (T) serializeObject.target;
} catch (Exception e) {
log.error("not found schema obj");
throw new IllegalStateException(e.getMessage(), e);
}
}
public static <T> byte[] serializeList(List<T> objList) {
if (CollectionUtils.isEmpty(objList)) {
log.error("objList is empty");
throw new RuntimeException("Failed to serializer");
}
@SuppressWarnings("unchecked") Schema<T> schema =
(Schema<T>) RuntimeSchema.getSchema(objList.get(0).getClass());
LinkedBuffer buffer = LinkedBuffer.allocate(1024 * 1024);
byte[] bytes = null;
ByteArrayOutputStream outputStream= null;
try {
outputStream= new ByteArrayOutputStream();
ProtostuffIOUtil.writeListTo(outputStream, objList, schema, buffer);
bytes = outputStream.toByteArray();
} catch (Exception e) {
log.error("Failed to serializer, objList={}", objList);
throw new RuntimeException("Failed to serializer");
} finally {
buffer.clear();
outputStream.close();
}
return bytes;
}
}
serialize 时候,使用这个 ProtobufIOUtil.toByteArray(T message, Schema schema, LinkedBuffer buffer) 压到不同数据类型,默认是 byte[]
想变成字符串在外面包一层 new String(ProtobufIOUtil.toByteArray()) 就行。
上面的反序列化数组就不写了。
LengthFieldBasedFrameDecoder 继承自 MessageToByteDecoder,也是使用 netty 对 Message 进行自行处理分流和处理分包。
public class MessageDecoder extends LengthFieldBasedFrameDecoder{
public MessageDecoder(int maxLength, int offset, int lengthFieldLength) {
super(maxLength, offset, lengthFieldLength);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf input) throws Exception {
try {
//要使用readBytes防止会读取不到,需要先给他申请可读部分的空间,这块是用隐式指针进行位移。
byte[] dstBytes = new byte[input.readableBytes()];
//读取缓存区接收到剩余部分二进制
input.readBytes(dstBytes,0,input.readableBytes());
//接收到Message对象
return SerializeUtil.deserialize(dstBytes, Message.class);
} catch (Exception e) {
log.error("exception when decoding: {}",e);
return null;
}
}
}
然后把编码器和解码器添加到 nettyOptions 管道容器的地方
@Component
public class ChannelInitializer extends ChannelInitializer<SocketChannel> {
//自定义Handler
@Autowired
ChannelHandler channelHandler;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channel= socketChannel.pipeline();
channel.addLast("idleStateHandler",
new IdleStateHandler(5, 0, 0, TimeUnit.MINUTES));
//字符串编解码器
channel.addLast("encoder", new NettyMessageEncoder());
channel.addLast("decoder",new MessageDecoder(1024 * 1024, 4, 4));
pipeline.addLast("channelHandler", ChannelHandler);
}
}
施工中