我有来自两个主题orders和fsource的两个流。主要是订单是静态的,很少更新,而fsource则以每秒1000的速率更新。在这里,我使用了KTable-KTabke连接,因为它们具有相同的键。 PObject: private String orderId;{1,2,3,4,5,6} private Double price; private Long oTNum;//{1,2,3,4,5,6} FSource: private String orderId;{1,2,3,4,5,6} private Dou…

2020年11月21日 0条评论 71点热度 阅读全文

我要实现的是基于消息中存在的时间戳记对记录中存在的每个消息进行计数。每个记录由List<Metric>对象组成。我想提取每个度量标准的时间戳,并根据度量标准名称汇总该度量标准。 公制 public class Metric { String metric; Long timestamp; Double value; } 自定义时间戳提取器 我已经实现了这个时间戳提取器,它将记录转换为List对象。当前,它获取为该ArrayList进行窗口化的第一个时间戳。 public class EventTimes…

2020年10月23日 0条评论 53点热度 阅读全文

因此,问题是关于在过滤器,映射等流函数内部使用外部状态存储的安全性。 可以这样做吗? JedisPool pool = ...; KStream stream = ...; stream.map((k, v) -> { JedisClient client = pool.getResource(); .... client.close(); }); ... KafkaStreams streams = ...; 是否由于在多个流任务中使用单个池而导致错误? 在apache flink中,我可以使用 Rich*…

2020年9月10日 0条评论 53点热度 阅读全文

我正在使用Kafka Stream 2.1 我正在尝试为聚合的流应用程序编写一些测试使用 Activity 间隔为300毫秒的 session 窗口按事件的键(即相关ID)进行某些事件。 这是由方法表示的聚合实现: private static final int INACTIVITY_GAP = 300; public KStream<String, AggregatedCustomObject> aggregate(KStream<String, CustomObject> source…

2020年9月10日 0条评论 48点热度 阅读全文

我有一个KStream eventsStream,它是从“事件”主题中获取数据的。事件有两种类型,它们的键:1. {user_id = X, event_id = 1} {..value, include time_event...}2. {user_id = X, event_id = 2} {..value, include time_event...} 如果在10分钟内用户未给出带有event_id = 1的事件,则需要将带有event_id = 2的事件迁移到“结果”主题。 例如,1.第一种情况:我们得到数…

2020年9月7日 0条评论 44点热度 阅读全文

我有一个运行有一个线程的Kafka-Streams应用程序,可以很好地处理一个分区的主题。 我需要运行该应用程序的多个实例,同时处理不同的主题。在我当前的场景中,所有主题都只有一个分区。 当我运行同一应用程序(具有相同的APPLICATION_ID)的新实例,处理另一个主题时,Streams客户端不会在此新应用程序中创建新任务。第一个实例继续处理任务0_0中的第一个主题,第二个实例在没有和分配的分区进行提示的情况下等待。 我知道我只使用一个分区的主题,但是在这种情况下,如果我有两个实例和两个带有一个分区的主题,要处…

2020年8月7日 0条评论 45点热度 阅读全文

有没有办法在Kafka Streams聚合中使用Autowired bean的新实例? @EnableBinding(Processor.class) public class MessageReceiver { @StreamListener(target = Processor.INPUT) @SendTo(Processor.OUTPUT) public KStream<String, List<CustomEvent>> process(KStream<String, Even…

2020年8月4日 0条评论 56点热度 阅读全文

我想为该主题所消耗的内容生成唯一的有序ID,并且它在多个实例中应该是唯一的。 (不是uuid) 解决方案如下: 不知道为什么不使用UUID。但是您可以结合使用偏移量和分区号来计算唯一的ID。就像是: // you need to know upfront how many partitions the input topis has private final static int NUMBER_OF_PARTITIONS = ... // within `Transformer#transform()` usin…

2020年7月13日 0条评论 44点热度 阅读全文

我正在使用Kafka Streams创建一个基于Spring的服务,并且在使用现有@Bean KStream VS为该流创建流时注意到了巨大的耗时时间差异。 假设我有以下代码: @Autowired private StreamsBuilder eventsStreamsBuilder; @Bean("eventsKStream") public KStream<String, String> eventsKStream() { KStream<String, String> stream …

2020年7月8日 0条评论 39点热度 阅读全文

我有一个使用kafka流的spring boot应用程序(kafka docker映像:wurstmeister/kafka:2.12-2.1.1,kafka依赖项:org.apache.kafka:kafka-streams:2.4.1)。在应用程序启动期间,我检查是否创建了主题my-topic,如果没有创建,则由应用程序创建。在该应用程序创建KTable之后,如下所示: streamsBuilder.table("my-topic", Consumed.with(Serdes.String(), Serdes.…

2020年5月18日 0条评论 47点热度 阅读全文