报错信息:
java.lang.NullPointerException: RabbitMQ source was instantiated with usesCorrelationId set to true but a message was received with correlation id set to null! at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.run(RMQSource.java:229) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
我的配置信息
DataStreamSource<String> dataStreamSource = env.addSource(new RMQSource<>(connectionConfig, "info", true, new SimpleStringSchema())) .setParallelism(1);
第二个参数是 是否需要usesCorrellationId,一个标志发送的信息是否唯一的确认字段,如果是true,则在发送的时候必须携带这个字段
改成false就行了
原文:https://www.cnblogs.com/yangchas/p/13600817.html