• Design,  技术

    Data Connect的消息处理机制

    Data Connect Service提供同步数据到第三方。数据的change是来自数据库binlog生成的CDC (Change Data Capture)。 在系统设计中有几个挑战 Ordering的问题。前端的service并不保证顺序。那Data Connect如何保证顺序呢?技术上来讲是不可能的。但是我们可以做到在不保证顺序的情况下依然能正确处理。方法是我们从最开始的服务开始引入了Versioning的概念。如果data connect拿到的对于某一条记录的event早于之前的event,我们就丢弃。这个逻辑我们叫stale checker。这个在sync的逻辑里是可行的。比如说我们有“Create->Update”,其实对于sync,create和update都是基于Id的upsert,如果我们拿到create晚一些,就直接丢弃。同样的“update->update”也没问题,当然前提是每次我们都是拿到整个记录的full payload,不然就会有问题。我们并没有field level的versioning。 Duplication。Duplication其实我们并不太在意,因为我们的服务基本上是Idempotent的,也就是说同样的消息可以被处理两遍,没问题。原因是update本来就可以重新执行,create也可以,因为我们是根据id来的。还有就是我们有stale checker其实就能解决很多的问题。 Partition. 最开始的版本没有separate topic。我们partition by billing account id。 好处有几个 减少一个tenant占用所有的consumer的问题。当然side effect是某一些consumer会比较慢,如果一个tenant traffic比较多。 减少row locking。 控制并发。其实并发的控制我们其实可以用Resillience4j – 远程方法调用的工具箱 里面提到的同步控制机制。但是也会有负面影响。 潜在的enhance workers。我们不应该consumer thread里面处理所有的东西。应该用workers,这样就更容易scale out。问题是我们还是要保证worker的同步问题。如果有两个update对于同一个billing account,我们要保证scale checker依然可以工作。 减少tenant之间的影响。应该可以用dedicated topic来做。当然机制可能会比较复杂。我们应该还是需要common pool来出来ad-hoc traffic

  • 工具,  技术

    Resillience4j – 远程方法调用的工具箱

    在MicroService的世界,很多的远程调用。只要有远程调用就会有网络不稳定问题,某些api会fail掉,有时候api会很长时间没有反馈,导致client的线程大量占用。 其实在非microservice的场景,数据库连接也有同样的问题。数据库操作有时候会遇到lock的问题,需要重试基本上是肯定的。 Resillience4j这个library专门针对这些常用的场景提供了非常容易使用的library。几个典型的功能如下: Retry 根据返回的结果,或者异常决定是不是要retry,如何retry。这个场景太常见了。比如说对于locking issue可以这么做。如果是data integrity的错误,就不需要retry。 Circuit Breaker 根据一个threshold,如果说一定时间内错误率到达一定程度,预计不断请求返回错误,还不如直接忽略,继续执行。特别是有时候错误会消耗额外的等待时间。 Rate Limiter 限制call的rate。比如说限制一秒钟一次call Time Limiter 限制执行的等待时间。 BulkHead 限制并发执行的数量。 Cache 一定时间内的重复call的cache,提高性能。 Fallback 提供可选的方案。比如说Object Query不行可以转ZOQL

  • AWS,  技术

    AWS Service花费的潜在提高空间

    用AWS Service也有3,4年的时间时间。对于花费我觉得还是有挺大的提高空间. ECS中采用Spot instance。 DynamoDB的花销应该还可以优化。历史数据应该可以设置TTL清除。这个应该非常简单。减少不必要的write 访问。 Redis的使用可以减少,可以用DynamoDB 减少ECS中instance的数量,采用auto scaling。 更好的平衡ec2 instance的cpu,memory的数量和每一个tasks的需求。保证每一个ec2 instance都被合理的利用。 其实这些东西很多都很简单,但是没有做的原因是从领导就没有强有力的推动这些东西。推动的人技术很弱,没办法找到好的切入点。当然后来新的领导来了,鸡飞狗跳,大家都在离开,还有谁真的在乎呢?

  • Kafka,  技术

    Tcp/Ip如何工作 (Kafka producer机制)

    Application如果需要发送消息,是先发到操作系统的socket write buffer. 如果buffer(queue)满了,就是tcp congestion。同样的,外部来的消息会先到操作系统的socket receive buffer.如果buffer满了,操作系统就不会ack那个消息,也是congestion。 这个buffer的尺寸并不大,可以通过一下命令查询。参数分别代表,min, default, max: 之所以看到是因为看这篇文章http://cloudurable.com/blog/kafka-tutorial-kafka-producer-advanced-java-examples/index.html#:~:text=The%20Kafka%20Producer%20has%20a,acks%20to%20control%20record%20durability. 如果act是0,那么application只是把消息送到socket buffer就可以了。 还有一个配置参数 buffer.memory 这个是application level的setting,当broker无法访问,就会把消息放在内存中等待,在 max.block.ms 到期之前,send就会等待,到期就会抛一场。producer 也有重试机制。 但是这里有一个疑问,broker不可访问,是不是操作系统的socket write buffer也会满?我觉得不会,如果socket buffer满了就会影响所有的application,这个相当于外部的一个目标无法访问就导致所有的对外连接失效。应该是有一个机制操作系统知道外部网络不可送达,然后就会清楚缓存,告诉application目标不可达,然后application就会cache请求,然后不断的重试。 https://stackoverflow.com/questions/49649241/apache-kafka-batch-size-vs-buffer-memory

  • Java,  技术

    Baeldung-最好的技术网站,没有之一

    已经记不清楚看过多少baeldung的文章,今天又用它解决了一个java debug的问题。Tungsten以前一直可以remote debug忽然最近发现不行了。 最后还是这边文章解决了问题:https://www.baeldung.com/java-application-remote-debugging 原来根本原因是我们升级了JDK到8,老的Java 5的debug参数已经不能用了。。而且貌似Java 9以后参数又变了。 我看Baeldung有中文版,但是翻译工作做的真的一般。。中文真的应该有一个类似的权威的技术门户,文章的质量控制真的非常非常重要。有很多的技术网站,但是内容都是网友贡献,有些很好,有些质量却很差。还是需要中心化的审核机制

  • 工具,  技术

    对象的序列化

    对象的序列化和传输在过去这些年也发生了很多的变化。 Java序列化 很早以前用这个,因为这种序列化方式只能局限于一个单一语言,已经不再用了。 Json 用Jackson的ObjectMapper把一个对象序列化成Jason,然后再反序列化. Avro Kafka的默认方式。需要提前预定义好model,序列化,反序列化的时候都需要用那个model。model更新的时候我记得并不像想象的那么简单。比如说加一个字段。我们必须asdl定义NoAE的所有的消息都有自己对应的Class,Serializer和Deseriallizer,都是基于Avro的。比如 NotificationInstanceDeserializer NotificationInstanceSerializer AvroSerializer.java (This copied from GenericBinarySerializer.java in CDC-model) 而DACO的Serializer和Deserializer都是用一个 JsonSerializer.其实就是Jakson的ObjectMapper的实现。不过传输的都是二进制(但是其实就是json专程bytes而已,Kafka的所有消息都是bytes)。DataModel必须是JsonNode的实现(其实不需要,objectMapper.writeValueAsBytes这个方法就是接受Object)。没有语言独立的表达。。绑定Jackson. 这个就是偷懒的做法。不需要定义avil文件,只需要写一个继承JsonNode的bean就好。主要是公司没有一个统一的标准,大家都选择自己最convinience的方法去做。以后如果要改格式就还挺麻烦。 JsonSerializer.java KafkaJsonDeserializer.java. 所以Kafka consumer都是拿的JsonNode,需要再转为子类。比如NotificationMessage.java Protocal Buffer gRPC的默认方式。和Avro类似。尺寸都比较小。