我正在尝试通过使用BucketingSink和自定义ParquetSinkWriter在HDFS上使用Apache Flink编写Parquet文件。 这是代码,上面的错误指示从下面开始启用检查点(在BucketingSink类中调用snapshotState())刷新方法时无法正常工作。甚至writer也被“ writer.close();”关闭。但仍然收到来自“ writer = createWriter();”的错误。有什么想法吗?谢谢 得到这样的错误: org.apache.hadoop.fs.FileAl…

2020年12月2日 0条评论 66点热度 阅读全文

我正在使用Apache Flink v1.6.0,并且尝试写入托管在Elastic Cloud中的Elasticsearch v6.4.0。向 flex 云群集进行身份验证时出现问题。 我已经能够让Flink写入本地Elasticsearch v6.4.0节点,该节点没有使用以下代码进行加密: /* Elasticsearch Configuration */ List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new Ht…

2020年11月12日 0条评论 58点热度 阅读全文

我在运行flink程序时遇到了一些问题,这是错误: java.lang.NoSuchMethodError: io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf; at org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78) at org.elasticsearch…

2020年11月5日 0条评论 48点热度 阅读全文

Closed. This question needs details or clarity。它当前不接受答案。 想改善这个问题吗?添加详细信息,并通过 editing this post阐明问题。 3年前关闭。 Improve this question 这是一种情况,我有两个数据源,分别是消息队列和MySQL表,它们可以分别称为DataStream和DataSet。我想启动一个基于DataStream的作业以从消息队列中提取数据并执行一些计算。在计算过程中,需要一个基于DataSet(MySQL表)的作业,其O…

2020年10月4日 0条评论 51点热度 阅读全文

我使用cloudera live vm,那里有一个hadoop和spral独立集群。现在我想用spark提交和flink运行脚本来提交我的工作。这也有效。但是我的应用程序可以在hdfs中找到输入和输出文件的路径。我将路径设置为:hdfs://127.0.0.1:50010 / user / cloudera / outputs我从港口得到的信息是这样的: 如何在Java中设置hdfs的路径? 最好的祝福,保罗 解决方案如下: 您不必设置DataNode主机的路径。在“概述”页面上,您将看到NameNode的连接信息…

2020年10月2日 0条评论 44点热度 阅读全文

我正在尝试实现一个类,该类使用户可以在不限制输入流类型的情况下操纵N个输入流。 首先,我想将所有输入数据流转换为keyedStreams。因此,我将输入数据流映射到一个元组中,然后,我应用KeyBy将其转换为键流。 我总是遇到序列化的问题,我尝试按照本指南https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html进行操作,但没有成功。 我想知道的是: 什么是Java中的序列化/反序列化?以及用途。我可以在Flink中使用序…

2020年9月30日 0条评论 45点热度 阅读全文

我收到了有关不同主题的一系列Avro格式的事件。我想使用这些并以拼花格式写入s3。我写了下面的工作,为每个事件创建一个不同的流,并从合计架构注册表中获取其架构,以创建事件的拼花槽。这工作正常,但是我面临的唯一问题是,每当有新事件开始时,我都必须更改YAML配置并每次都重新启动作业。有什么办法我不必重新启动作业,它就开始消耗新的事件集了。 YamlReader reader = new YamlReader(topologyConfig); EventTopologyConfig eventTopologyConfi…

2020年9月23日 0条评论 58点热度 阅读全文

我正在为Apache Flink 1.4中的processElement函数编写一些测试代码: public class ProcessFunctionClass { @Mock private ListState<String> listState; public void processElement(Tuple2<String, String> tuple2, Context context, Collector<Tuple2<String, String>> …

2020年9月14日 0条评论 53点热度 阅读全文

我正在使用Flink,并且具有动态更改的字段和嵌套字段的JSON字符串流到达我的系统。因此,我无法将此传入的JSON模拟并将其转换为静态POJO,而必须依赖于Map。 我的第一个转换是使用GSON解析将JSON字符串流转换为Map对象流,然后将地图包装在名为Data的DTO中。 (inside the first map transformation) LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class); Data data = new Dat…

2020年8月21日 0条评论 53点热度 阅读全文

当正常在Flink的DataStream外部进行连接时,是否有人遇到过从Flink作业连接到Cassandra的任何问题? Session session = clusterBuilder.getCluster().connect(); ResultSet resultSet = session.execute(resultStatement.getQuery()); 我不是在Locale中面对这个问题,而是在开发环境中。在本地连接中,它工作正常。即使使用相同的clusterbuilder设置,当我将这段代码保留在…

2020年8月13日 0条评论 63点热度 阅读全文