对象的序列化
对象的序列化和传输在过去这些年也发生了很多的变化。
Java序列化
很早以前用这个,因为这种序列化方式只能局限于一个单一语言,已经不再用了。
Json
用Jackson的ObjectMapper把一个对象序列化成Jason,然后再反序列化.
Avro
Kafka的默认方式。需要提前预定义好model,序列化,反序列化的时候都需要用那个model。model更新的时候我记得并不像想象的那么简单。比如说加一个字段。
我们必须asdl定义
NoAE的所有的消息都有自己对应的Class,Serializer和Deseriallizer,都是基于Avro的。比如
NotificationInstanceDeserializer
//NotificationInstance is generated from *.avdl
public class NotificationInstanceDeserializer extends GenericBinaryDeserializer<NotificationInstance> {
public NotificationInstanceDeserializer() {
}
//NotificationInstance is Avro auto generated.
public Class<NotificationInstance> getType() {
return NotificationInstance.class;
}
public List<Schema> getWriterSchemas() {
List<Schema> prevSchemas = new ArrayList();
prevSchemas.add(NotificationInstanceOld.getClassSchema());
return prevSchemas;
}
}
NotificationInstanceSerializer
//NotificationInstance is generated from *.avdl
public class NotificationInstanceSerializer extends GenericBinarySerializer<NotificationInstance> {
public NotificationInstanceSerializer() {
}
public Schema getSchema() {
return NotificationInstance.getClassSchema();
}
}
AvroSerializer.java (This copied from GenericBinarySerializer.java in CDC-model)
// The class has to extend the Avro class "SpecificRecordBASE"
public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {
@Override
public byte[] serialize(String topic, T data) {
if (data == null) {
return null;
}
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
BinaryEncoder encoder = ENCODER_FACTORY.binaryEncoder(baos, null);
DatumWriter<T> writer = new SpecificDatumWriter<>(getSchema());
writer.write(data, encoder);
encoder.flush();
return baos.toByteArray();
} catch (IOException e) {
LOGGER.w
}
}
而DACO的Serializer和Deserializer都是用一个 JsonSerializer.其实就是Jakson的ObjectMapper的实现。不过传输的都是二进制(但是其实就是json专程bytes而已,Kafka的所有消息都是bytes)。DataModel必须是JsonNode的实现(其实不需要,objectMapper.writeValueAsBytes这个方法就是接受Object)。没有语言独立的表达。。绑定Jackson.
这个就是偷懒的做法。不需要定义avil文件,只需要写一个继承JsonNode的bean就好。主要是公司没有一个统一的标准,大家都选择自己最convinience的方法去做。以后如果要改格式就还挺麻烦。
JsonSerializer.java
public class JsonSerializer implements Serializer<JsonNode> {
public byte[] serialize(String topic, JsonNode data) {
if (data == null) {
return null;
}
try {
return objectMapper.writeValueAsBytes(data);
} catch (IOException e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
KafkaJsonDeserializer.java. 所以Kafka consumer都是拿的JsonNode,需要再转为子类。比如NotificationMessage.java
public class KafkaJsonDeserializer implements Deserializer<JsonNode> {
private static final ObjectMapper MAPPER = Jackson.newObjectMapper()
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public JsonNode deserialize(String key, byte[] bytes) {
try {
return MAPPER.readTree(bytes);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
Protocal Buffer
gRPC的默认方式。和Avro类似。尺寸都比较小。