7. kafka序列化&反序列化

2020年11月14日 45点热度 0条评论 来源: 阿飞的博客

序列化

kafka序列化消息是在生产端,序列化后,消息才能网络传输。而构造KafkaProducer代码如下:

Properties props = new Properties();
props.put("bootstrap.servers", "10.0.55.229:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer<>(props);

属性key.serializervalue.serializer就是key和value指定的序列化方式。无论是key还是value序列化和反序列化实现都是一样的,所以接下来都只以value的序列化和反序列为例。

StringSerializer

StringSerializer是内置的字符串序列化方式,核心源码如下:

/** * String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding, * value.serializer.encoding or serializer.encoding. The first two take precedence over the last. */
public class StringSerializer implements Serializer<String> { 
    private String encoding = "UTF8";
    ... ...

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            // 如果数据为空,那么直接返回null即可
            if (data == null)
                return null;
            else
                // 否则将String序列化,即转为byte[]即可
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

自定义序列化

和内置的StringSerializer字符串序列化一样,如果要自定义序列化方式,需要实现接口Serializer。假设每个字段按照下图所示的方式自定义序列化:

下面是一个简单的自定义实现(假设自定义Order类型的Serializer,Order类中有一些String,Integer,Long,Date类型的属性–其他类型暂不支持,读者可以自行扩展):

/** * @author wangzhenfei9 * @version 1.0.0 * @since 2018年06月22日 */
public class OrderSerializer implements Serializer<Order> { 
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // nothing to do
    }

    @Override
    public byte[] serialize(String topic, Order data){

        if (data == null) {
            return null;
        }

        Class<? extends Order> clazz = data.getClass();
        Field[] fields = clazz.getDeclaredFields();

        // 计算保存Order所示属性值总计需要多少字节
        int total = 0;

        // 遍历所有属性字段, 目前只支持Integer, Long, String, Date类型
        for (Field field:fields){
            String fieldName = field.getName();
            int fieldLen = fieldName.getBytes().length;
            Object value = getPropertyValue(data, fieldName);
            String type = field.getType().getSimpleName();
            System.out.println("propertyName: "+fieldName +", value: " + value
                    +", len: " + fieldLen+", type: " + type);
            // 每个属性序列化方式: 属性长度(都是4个字节)+属性名称(长度需要计算)+值长度+值
            switch (type){
                case "Long":
                    // 第一个4表示属性名长度需要的空间, 即int的长度;(int类型反序列化后需要4个字节长度,即32位)
                    // 第二个fieldLen表示属性名需要的空间
                    // 第三个4表示属性值长度需要的空间
                    // 第四个8表示属性值需要的空间(Long类型反序列化后需要8个字节长度,即64位)
                    total+=(4+fieldLen+4+8);
                    break;
                case "Date":
                    // +8+8
                    // 如果是日期类型先转成Long类型的timestamp
                    total+=(4+fieldLen+4+8);
                    break;
                case "Integer":
                    total+=(4+fieldLen+4+4);
                    break;
                case "String":
                    try {
                        total+=(4+fieldLen + 4 + value.toString().getBytes("utf-8").length);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                    break;
                // 本次自定义的Serializer只支持Integer, Long, Date, String类型属性, 如果想要支持其他类型属性, 请自行实现
                default:
                    throw new IllegalArgumentException("Unsupported argument type: "+type);
            }
        }

        // 算出需要个byte[]数组长度后, 分配同样大的byte[]中
        ByteBuffer result = ByteBuffer.allocate(total);

        for (Field field:fields){
            String fieldName = field.getName();
            byte[] fieldByte = fieldName.getBytes();
            int fieldLen = fieldByte.length;

            Object value = getPropertyValue(data, fieldName);
            String type = field.getType().getSimpleName();

            // 无论什么类型都要先put属性长度和属性名称
            result.putInt(fieldLen);
            result.put(fieldByte);

            switch (type){
                case "Long":
                    byte[] longByte = SerializeUtil.longSerialize((Long) value);
                    result.putInt(longByte.length);
                    result.put(longByte);
                    break;
                case "Date":
                    // 如果是日期类型先转成Long类型的timestamp
                    byte[] dateByte = SerializeUtil.longSerialize(((Date) value).getTime());
                    result.putInt(dateByte.length);
                    result.put(dateByte);
                    break;
                case "Integer":
                    byte[] integerByte = SerializeUtil.integerSerialize((Integer) value);
                    result.putInt(integerByte.length);
                    result.put(integerByte);
                    break;
                case "String":
                    byte[] stringByte = ((String)value).getBytes();
                    result.putInt(stringByte.length);
                    result.put(stringByte);
                    break;
                default:
                    throw new IllegalArgumentException("Unsupported argument type: "+type);
            }
        }

        // 返回序列化后的结果
        return result.array();
    }


    private Object getPropertyValue(Order order, String propertyName) {
        Method[] methods = order.getClass().getMethods();
        for (Method method : methods) {
            // 这里方法匹配还不够严谨
            if (method.getName().equalsIgnoreCase("get" + propertyName)
                    || method.getName().equalsIgnoreCase("is" + propertyName)) {
                try {
                    return method.invoke(order);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        return null;
    }

    @Override
    public void close() {
        // nothing to do
    }

}

自定义Serializer后,修改属性value.serializer的值为com.afei.kafka.serialization.OrderSerializer,且ProducerRecord申明为ProducerRecord

反序列化

kafka反序列化消息是在消费端。由于网络传输过来的是byte[],只有反序列化后才能得到生产者发送的真实的消息内容。而构造KafkaConsumer代码如下:

Properties props = new Properties();
props.put("bootstrap.servers", "10.0.55.229:9092");
props.put("key.deserializer",   "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

属性key.deserializervalue.deserializer就是key和value指定的反序列化方式。

StringDeserializer

StringDeserializer是内置的字符串反序列化方式,核心源码如下:

/** * String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding, * value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last. */
public class StringDeserializer implements Deserializer<String> { 
    private String encoding = "UTF8";
    ... ...

    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            // 如果数据为空,那么直接返回null即可
            if (data == null)
                return null;
            else
                // 否则将byte[]反序列化,即转为String即可
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

    ... ...
}

自定义反序列化

和内置的StringDeserializer字符串反序列化一样,如果要自定义反序列化方式,需要实现接口Deserializer。下面是一个简单的自定义实现(假设自定义Order类型的Deserializer,Order类中有一些String,Integer,Long,Date类型的属性–其他类型暂不支持),反序列化就是根据序列化的方式得到序列化前的内容:

/** * @author wangzhenfei9 * @version 1.0.0 * @since 2018年06月22日 */
public class OrderDeserializer implements Deserializer<Order> { 
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // nothing to do
    }

    @Override
    public Order deserialize(String topic, byte[] bytes) {

        ByteBuffer data = ByteBuffer.wrap(bytes);

        Field[] declaredFields = new Field[0];
        try {
            declaredFields = Class.forName(Order.class.getCanonicalName()).getDeclaredFields();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }

        int propertyCount = declaredFields.length;

        Order order = new Order();

        while ((propertyCount--)>0){
            int propertyLen = data.getInt();
            byte[] nameByte = new byte[propertyLen];
            data.get(nameByte);
            String propertyName = new String(nameByte);
            //根据属性名称得到属性的类型
            String propertyType = getPropertyType(Order.class, propertyName);

            int valueLen = data.getInt();
            byte[] valueByte = new byte[valueLen];
            data.get(valueByte);

            Object value;

            switch (propertyType ){
                case "Long":
                    value = DeserializeUtil.longDeserialize(valueByte);
                    break;
                case "Date":
                    // 如果是日期类型先反序列化成Long类型, 然后得到Date类型的值
                    // result.putLong(((Date) value).getTime());
                    value = new Date(DeserializeUtil.longDeserialize(valueByte));
                    break;
                case "Integer":
                    value = DeserializeUtil.integerDeserialize(valueByte);
                    break;
                case "String":
                    value = new String(valueByte);
                    break;
                default:
                    throw new IllegalArgumentException("Unsupported argument type: "+propertyType);
            }

            setPropertyValue(order, propertyName, value);
            System.out.println("property = "+propertyLen+", name = "+propertyName
                    +", value = "+valueLen+", name = "+value);
        }
        return order;
    }

    private String getPropertyType(Class<? extends Order> clazz, String propertyName) {
         /* * 得到类中的方法 */
        try {
            Field field = clazz.getDeclaredField(propertyName);
            return field.getType().getSimpleName();
        } catch (NoSuchFieldException e) {

        }
        return null;
    }

    /** * 调用Order中属性${propertyName}的setter方法并赋值${propertyValue} */
    private void setPropertyValue(Order order, String propertyName, Object propertyValue) {
         /* * 得到类中的方法 */
        Method[] methods = order.getClass().getMethods();
        for (Method method : methods) {
            // 这里方法匹配还不够严谨
            if (method.getName().equalsIgnoreCase("set" + propertyName)) {
                try {
                    method.invoke(order, propertyValue);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void close() {
        // nothing to do
    }

}

自定义Serializer后,修改属性value.serializer的值为com.afei.kafka.serialization.OrderDeserializer,且KafkaConsumer申明为KafkaConsumer

总结

可以看到,自定义Serializer和Deserializer非常痛苦,而且上面还有很多异常情况没有处理,还有很多类型不支持,非常脆弱。复杂类型的支持更是一件痛苦的事情,不同版本之间的兼容性问题更是一个极大的挑战。由于Serializer和Deserializer影响到上下游系统,导致牵一发而动全身。自定义序列化&反序列化实现不是能力的体现,而是逗比的体现。所以强烈不建议自定义实现序列化&反序列化推荐直接使用StringSerializerStringDeserializer,然后使用json作为标准的数据传输格式。站在巨人的肩膀上,事半功倍。

    原文作者:阿飞的博客
    原文地址: https://blog.csdn.net/feelwing1314/article/details/80917705
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。