我正在尝试通过使用BucketingSink和自定义ParquetSinkWriter在HDFS上使用Apache Flink编写Parquet文件。 这是代码,上面的错误指示从下面开始启用检查点(在BucketingSink类中调用snapshotState())刷新方法时无法正常工作。甚至writer也被“ writer.close();”关闭。但仍然收到来自“ writer = createWriter();”的错误。有什么想法吗?谢谢 得到这样的错误: org.apache.hadoop.fs.FileAl…

2020年12月2日 0条评论 59点热度 阅读全文

我想拥有一个能够满足两个要求的Kakfa消费者: 要求时使用所有消息(将其获取)放弃/忽略所有尚未提交的消息(需要帮助) 为简单起见,我只运行一个主题和一个分区。这是我设置消费者的方式: private Consumer<Long, String> createConsumer() { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"…

2020年12月1日 0条评论 45点热度 阅读全文

按照Avro docs中“默认”属性的定义:“此字段的默认值,在读取缺少此字段的实例时使用(可选)。” 这意味着如果缺少相应的字段,则采用默认值。 但这似乎并非如此。考虑以下student模式: { "type": "record", "namespace": "com.example", "name": "Student", "fields": [{ "name": "age", "type": "int", "default": -1 }, { "name": "name", "type": "string",…

2020年11月29日 0条评论 62点热度 阅读全文

我正在尝试从架构注册表中检索给定kafka主题的架构主题版本。我可以使用client.register(schema-name, schema)成功发布新版本,但是不确定如何获取版本。我在下面使用curl请求进行了尝试,但结果立即达到-1(空)。 CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(schemaRegistryUrl, 20); // curl -X GET http://schema.registry.url:888…

2020年11月29日 0条评论 54点热度 阅读全文

我是Apache Kafka的新手,我正尝试在Android Studio上使用它,以便使用A simple Kafka Consumer and Producer example中的代码将数据生成到位于我PC上的服务器 摇篮代码: apply plugin: 'com.android.application' android { packagingOptions { exclude 'META-INF/DEPENDENCIES' exclude 'META-INF/LICENSE' exclude 'META-I…

2020年11月28日 0条评论 64点热度 阅读全文

我正在尝试使用Spring云流+ Kafka绑定对Apache Kafka进行“恰好一个交付”概念的一些PoC。春云流Kafka粘结剂:“尝试从状态IN_TRANSACTION到状态IN_TRANSACTION的无效转换” 我安装了Apache Kafka“kafka_2.11-1.0.0”,并在生产者中定义了“transactionIdPrefix”,我知道这是我在Spring Kafka中启用事务所需要做的唯一事情,但是当我这样做时,运行简单源代码&在同一个应用程序中接收器绑定,我看到一些消息在消费者中…

2020年11月27日 0条评论 85点热度 阅读全文

我只是在学习spring boot和kakfa。我进行了一些探索,并配置了一个示例生产者应用程序,如下所示。但是,我无法发布消息。如果我能在这里错过什么得到帮助,那就太好了。我已经启动了zookeeper服务和kakfa服务,并确保该主题可用。 Config: import java.util.HashMap; import java.util.Map; import com.jpmorgan.sample.producer.KafkaProducer; import org.apache.kafka.client…

2020年11月24日 0条评论 54点热度 阅读全文

我有来自两个主题orders和fsource的两个流。主要是订单是静态的,很少更新,而fsource则以每秒1000的速率更新。在这里,我使用了KTable-KTabke连接,因为它们具有相同的键。 PObject: private String orderId;{1,2,3,4,5,6} private Double price; private Long oTNum;//{1,2,3,4,5,6} FSource: private String orderId;{1,2,3,4,5,6} private Dou…

2020年11月21日 0条评论 69点热度 阅读全文

我有“ N”个Kafka主题,我需要根据业务逻辑使用来自不同主题的消息。我需要处理/过滤它们并将其发送到服务总线。我不想创建N个kafka配置来使用消息,我只想使其成为一个库,在这里我可以简单地将属性外部化以进行选择并配置使用者。这样我就可以将业务逻辑放入应用程序中。以前有没有人做过这种实现。请让我知道这种方法的最佳实践。 编辑:这是我的KafkaConsumerConfig看起来像: KafkaConsumerConfig.java @Configuration public class KafkaConsume…

2020年11月18日 0条评论 44点热度 阅读全文

我正在运行火花流作业以使用直接方法从kafka消耗(对于kafka 0.1.0或更高版本)。使用maven-assembly-plugin构建POM文件,并使用jar tf <jar file> | grep ConsumerRecord检查jar文件的内容。我得到以下输出 org / apache / kafka / clients / consumer / ConsumerRecord.class org / apache / kafka / clients / consumer / Consume…

2020年11月18日 0条评论 92点热度 阅读全文