网络主题

发送方需要将类对象转换成二进制压入项目自定义数据结构,接收方接收到数据后,需要将二进制转换成类对象。
项目接收方通过消息号和回执 ID,可以知道该消息如何去处理。
prorostuff 相比.proto 文件,不用先转换和编译.proto 文件,不需要做一层中间文件的转换。这层中间文件经常会和版本严格匹配时出现格式语法错误。
Pom 如下:

<dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>1.4.0</version>
        </dependency>

netty 的编码类

netty 只用 4 是 Java 网络的一个主流方式
这里需要通过继承 Netty4MessageToByteEncoder Message 是 Bean 对象,根据自己项目定制来的

public class MessageEncoder extends MessageToByteEncoder<NettyMessage>{
   @Override
    protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf out) throws Exception {
        //ProrostuffSerialize提供了序列化方式。
        out.writeBytes(ProrostuffSerialize.serialize(msg));
    }
}

关于对 ProrostuffSerialize 实现,本身还是创建有序缓存空间,schema 流式,提供正反序列化方式。

import com.dyuproject.protostuff.LinkedBuffer;

下面的 ProtostuffIOUtil 等都是这个包下面的。

public class ProrostuffSerialize{

    private static class SerializeObject{
        private Object target;
    }

    @SuppressWarnings("unchecked")
    public static byte[] serialize(Object object) {
        SerializeObject serializeObject = new SerializeObject();
        serializeObject .target = object;
        Class<serializeObject > serializeObjectClass = (Class<serializeObject >) serializeObject.getClass();
        LinkedBuffer buffer= LinkedBuffer.allocate(1024 * 4);
        try {
            Schema<serializeObject > schema = RuntimeSchema.getSchema(serializeObjectClass );
            return ProtostuffIOUtil.toByteArray(serializeObject , schema, buffer);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            linkedBuffer.clear();
        }
    }


    //clazz泛型是Bean对象Message
    @SuppressWarnings("unchecked")
    public static <T> T deserialize(byte[] data, Class<T> clazz) {
        try {
            //创建SerializeObject的schema对象
            Schema<SerializeObject> schema = RuntimeSchema.getSchema(SerializeObject.class);
            SerializeObject serializeObject= schema.newMessage();
            ProtostuffIOUtil.mergeFrom(data, serializeObject, schema);
            return (T) serializeObject.target;
        } catch (Exception e) {
            log.error("not found schema obj");
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

   public static <T> byte[] serializeList(List<T> objList) {
        if (CollectionUtils.isEmpty(objList)) {
            log.error("objList is empty");
            throw new RuntimeException("Failed to serializer");
        }
        @SuppressWarnings("unchecked") Schema<T> schema =
                (Schema<T>) RuntimeSchema.getSchema(objList.get(0).getClass());
        LinkedBuffer buffer = LinkedBuffer.allocate(1024 * 1024);
        byte[] bytes = null;
        ByteArrayOutputStream outputStream= null;
        try {
            outputStream= new ByteArrayOutputStream();
            ProtostuffIOUtil.writeListTo(outputStream, objList, schema, buffer);
            bytes = outputStream.toByteArray();
        } catch (Exception e) {
            log.error("Failed to serializer, objList={}", objList);
            throw new RuntimeException("Failed to serializer");
        } finally {
            buffer.clear();
            outputStream.close();
        }
        return bytes;
       }
  }

serialize 时候,使用这个 ProtobufIOUtil.toByteArray(T message, Schema schema, LinkedBuffer buffer) 压到不同数据类型,默认是 byte[]
想变成字符串在外面包一层 new String(ProtobufIOUtil.toByteArray()) 就行。
上面的反序列化数组就不写了。

netty 的解码类

LengthFieldBasedFrameDecoder 继承自 MessageToByteDecoder,也是使用 netty 对 Message 进行自行处理分流和处理分包。

public class MessageDecoder extends LengthFieldBasedFrameDecoder{

    public MessageDecoder(int maxLength, int offset, int lengthFieldLength) {
        super(maxLength, offset, lengthFieldLength);
    }

    @Override
    public  Object decode(ChannelHandlerContext ctx, ByteBuf input) throws Exception {
        try {
           //要使用readBytes防止会读取不到,需要先给他申请可读部分的空间,这块是用隐式指针进行位移。
            byte[] dstBytes = new byte[input.readableBytes()];
           //读取缓存区接收到剩余部分二进制
            input.readBytes(dstBytes,0,input.readableBytes());
           //接收到Message对象
            return  SerializeUtil.deserialize(dstBytes, Message.class);
        } catch (Exception e) {
            log.error("exception when decoding: {}",e);
            return null;
        }
    }
}

然后把编码器和解码器添加到 nettyOptions 管道容器的地方

@Component
public class ChannelInitializer extends ChannelInitializer<SocketChannel> {

    //自定义Handler
    @Autowired
    ChannelHandler channelHandler;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline channel= socketChannel.pipeline();
        channel.addLast("idleStateHandler",
                new IdleStateHandler(5, 0, 0, TimeUnit.MINUTES));
        //字符串编解码器
         channel.addLast("encoder", new NettyMessageEncoder());
        channel.addLast("decoder",new MessageDecoder(1024 * 1024, 4, 4));
        pipeline.addLast("channelHandler", ChannelHandler);
    }
}

施工中


↙↙↙阅读原文可查看相关链接,并与作者交流