工具,  技术

对象的序列化

对象的序列化和传输在过去这些年也发生了很多的变化。

Java序列化

很早以前用这个,因为这种序列化方式只能局限于一个单一语言,已经不再用了。

Json

用Jackson的ObjectMapper把一个对象序列化成Jason,然后再反序列化.

Avro

Kafka的默认方式。需要提前预定义好model,序列化,反序列化的时候都需要用那个model。model更新的时候我记得并不像想象的那么简单。比如说加一个字段。
我们必须asdl定义
NoAE的所有的消息都有自己对应的Class,Serializer和Deseriallizer,都是基于Avro的。比如

NotificationInstanceDeserializer
//NotificationInstance is generated from *.avdl
public class NotificationInstanceDeserializer extends GenericBinaryDeserializer<NotificationInstance> {
    public NotificationInstanceDeserializer() {
    }
    //NotificationInstance is Avro auto generated.
    public Class<NotificationInstance> getType() {
        return NotificationInstance.class;
    }

    public List<Schema> getWriterSchemas() {
        List<Schema> prevSchemas = new ArrayList();
        prevSchemas.add(NotificationInstanceOld.getClassSchema());
        return prevSchemas;
    }
}
NotificationInstanceSerializer
//NotificationInstance is generated from *.avdl
public class NotificationInstanceSerializer extends GenericBinarySerializer<NotificationInstance> {
    public NotificationInstanceSerializer() {
    }

    public Schema getSchema() {
        return NotificationInstance.getClassSchema();
    }
}
AvroSerializer.java (This copied from GenericBinarySerializer.java in CDC-model)
// The class has to extend the Avro class "SpecificRecordBASE"
public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {

    @Override
    public byte[] serialize(String topic, T data) {
        if (data == null) {
            return null;
        }

        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
            BinaryEncoder encoder = ENCODER_FACTORY.binaryEncoder(baos, null);
            DatumWriter<T> writer = new SpecificDatumWriter<>(getSchema());
            writer.write(data, encoder);
            encoder.flush();
            return baos.toByteArray();
        } catch (IOException e) {
            LOGGER.w
        }
    }

而DACO的Serializer和Deserializer都是用一个 JsonSerializer.其实就是Jakson的ObjectMapper的实现。不过传输的都是二进制(但是其实就是json专程bytes而已,Kafka的所有消息都是bytes)。DataModel必须是JsonNode的实现(其实不需要,objectMapper.writeValueAsBytes这个方法就是接受Object)。没有语言独立的表达。。绑定Jackson.

这个就是偷懒的做法。不需要定义avil文件,只需要写一个继承JsonNode的bean就好。主要是公司没有一个统一的标准,大家都选择自己最convinience的方法去做。以后如果要改格式就还挺麻烦。

JsonSerializer.java

public class JsonSerializer implements Serializer<JsonNode> {

    public byte[] serialize(String topic, JsonNode data) {
        if (data == null) {
            return null;
        }

        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (IOException e) {
            throw new SerializationException("Error serializing JSON message", e);
        }

    }

KafkaJsonDeserializer.java. 所以Kafka consumer都是拿的JsonNode,需要再转为子类。比如NotificationMessage.java

public class KafkaJsonDeserializer implements Deserializer<JsonNode> {
    private static final ObjectMapper MAPPER = Jackson.newObjectMapper()
            .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
            .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public JsonNode deserialize(String key, byte[] bytes) {
        try {
            return MAPPER.readTree(bytes);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

Protocal Buffer

gRPC的默认方式。和Avro类似。尺寸都比较小。

Leave a Reply

Your email address will not be published. Required fields are marked *