我已经用Java编写了一个非常简单的代码来读取文件并将这些记录发送给Kafka主题。一切都按预期进行。但是,我不想使用file而是使用Kafka文件连接器。我过去使用REST proxy(curl)命令做到了,但是从未在Java中尝试过。我需要一些帮助。 我可以看到Maven存储库中有Kafka-connect api,可以将其添加到我的pom.xml文件中。将其集成到Java代码中的下一步应该是什么。 我的代码无需Kafka connect即可读取文件: import org.apache.kafka.clien…

2020年7月24日 0条评论 59点热度 阅读全文

我正在尝试执行以下操作: 使用redis连接器 将一些数据放入geode 使用CacheListener 对一些关键的创建/更新事件使用react 使用Geode客户端及其区域上的entrySet方法读取该数据。 我已经无法从我的Geode客户端访问redis数据了。我必须执行以下操作: region.get(Coder.stringToByteArrayWrapper("key")); 我也有很多麻烦让 region.entrySet()工作。首先,它对 ClientRegionShortcut.PROXY根本不…

2020年5月13日 0条评论 48点热度 阅读全文

我试图从卡夫卡读取消息到S3,有问题添加jar到Kafka连接类路径。Kafka连接自定义timestamp.extractor 目标是根据时间戳在分区中写入消息,该时间戳是卡夫卡消息中密钥的一部分。 为了使故事简短,我必须提供自定义时间戳提取器。在文档here之后创建了一个类,该类实现TimestampExtractor接口并将一个JAR位置添加到plugin.path属性。 问题是,当我开始连接时,找不到类。不知怎的,罐子是不是在classpath中,我越来越 org.apache.kafka.common.c…

2020年2月5日 0条评论 44点热度 阅读全文

我有一个kafka connect jar,它需要作为docker容器运行。我需要在容器中的日志文件中捕获所有连接日志(最好是在目录/文件-/ etc / kafka / kafka-connect-logs中),然后可以使用以下命令将其推送到localhost(运行docker引擎的主机) docker 中的卷。当我将connect-log4j.properties更改为追加到日志文件中时,我看到没有创建日志文件。如果我在没有docker的情况下尝试相同的操作,并通过更改connect-log4j.propert…

2020年1月5日 0条评论 30点热度 阅读全文

我在使用kafka源连接器(需要准备SourceRecord实例)时将Avro对象(org.apache.avro.specific.SpecificRecord的实例)发送到kafka主题时遇到问题。以我为例,假设基于模式,例如: { "namespace": "com.model.avro.generated", "type": "record", "name": " MessageExVal", "version": "1", "fields": [ { "name": "messageSource", "t…

2020年1月5日 0条评论 51点热度 阅读全文

我有一个生产者类使用Github中的自定义JsonSerializer发送到主题 public class JsonSerializer<T> implements Serializer<T> { ... @Override public byte[] serialize(String topic, T data) { try { return this.objectMapper.writeValueAsBytes(data); } catch (JsonProcessingExceptio…

2019年12月3日 0条评论 55点热度 阅读全文

由于调试需要连接我的dockerized flex 实例的Kafka连接器非常困难,因此我不知道为什么会收到此异常... 当我尝试新建运输客户端时: Settings settings = Settings.builder() .put("cluster.name", clusterName).build(); client = new PreBuiltTransportClient(settings); client.addTransportAddress(new InetSocketTransportAddre…

2019年10月4日 0条评论 42点热度 阅读全文

使用Debezium 0.7读取MySQL,但在初始快照阶段出现刷新超时和OutOfMemoryError错误。查看下面的日志,似乎连接器试图一次写太多消息: WorkerSourceTask{id=accounts-connector-0} flushing 143706 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask] WorkerSourceTask{id=accounts-co…

2019年8月30日 0条评论 67点热度 阅读全文

我正在将我的数据从kafka主题流式传输到elasticsearch中。但是它从连接器{\"type\":\"illegal_argument_exception\",\"reason\":\"object mapping [search_data] can't be changed from nested to non-nested\"}抛出此错误 但是,当我从主题获取消息并使用elasticsearch api手动添加文档时,它工作正常。 kafka-connect-elasticsearch不支持嵌套对象类型…

2019年8月29日 0条评论 55点热度 阅读全文

我引用了以下链接以了解HDFS Connect for Kafka https://docs.confluent.io/2.0.0/connect/connect-hdfs/docs/index.html通过 hive 集成,我能够将数据从kafka导出到HDFS。现在,我正在尝试借助Java程序将avro记录写入kafka public static void main(String[] args) throws InterruptedException,IOException,RestClientExcepti…

2019年8月1日 0条评论 67点热度 阅读全文