博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【nifi数据采集】nifi给kafka打数据遇到的坑 - kafka2.1 的listeners配置
阅读量:2156 次
发布时间:2019-05-01

本文共 1887 字,大约阅读时间需要 6 分钟。

如下图,在测试使用nifi往kafka打数据的时候,发现通过ambari安装的kafka收不到数据,而通过docker安装的kafka则可以收到数据。

模拟公司场景,消费海外的kafka,往国内kafka打数据
模拟公司场景,消费海外的kafka,往国内kafka打数据

nifi后台log日志报错信息

2020-04-19 11:09:02,916 INFO [Timer-Driven Process Thread-10] o.a.kafka.common.utils.AppInfoParser Kafka version : 0.9.0.12020-04-19 11:09:02,916 INFO [Timer-Driven Process Thread-10] o.a.kafka.common.utils.AppInfoParser Kafka commitId : 23c69d62a0cabf062020-04-19 11:09:33,124 ERROR [Timer-Driven Process Thread-10] o.a.n.p.kafka.pubsub.PublishKafka PublishKafka[id=90615958-0171-1000-ecfb-347ad3ec18ae] Failed to send all message for StandardFlowFileRecord[uuid=cf69651e-f562-42c6-abed-9b8b55259213,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1587248292562-1, container=default, section=1], offset=419, length=9],offset=0,name=cf69651e-f562-42c6-abed-9b8b55259213,size=9] to Kafka; routing to failure due to org.apache.kafka.common.errors.TimeoutException: Batch Expired: org.apache.kafka.common.errors.TimeoutException: Batch Expiredorg.apache.kafka.common.errors.TimeoutException: Batch Expired

log报错并没有直接说明报错原因,而是报一个【批次过期】错误,幸好在网上搜这个错误的时候大部分信息都指向了kafka的server.properties中listeners的配置问题。

原因

ambari安装kafka是server.properties中listeners的配置默认ip是localhost,这样导致远程访问不到kafka的broker

解决

因为使用ambari安装的kafka,不需要到kafka目录找到配置修改,只需要在ambari的web管理界面修改配置即可。

将listeners的localhost改成真实ip。

在这里插入图片描述

注意:

我先将ip用hostname的方式,结果是不行的,最后改成真实ip才能接收到数据。
这个问题考虑过nifi所在的机器是不是配置了hosts映射,专门测试过也不行,所以还是要使用真实ip吧
在ambari里面的配置有版本管理,很方便记录

ps.这里跟网上的稍微不同的是ambari里面只改listeners就解决了,也没找到【advertised.listeners】这个配置

如果是自己手动安装的kafka就需要将下面两个参数都改了。修改前:listeners=PLAINTEXT://:9092修改后listeners=PLAINTEXT://真实ip:9092advertised.listeners=PLAINTEXT://真实ip:9092

最终效果

最终打通效果图最终打通效果图

nifi消费192.168.2.15:9092nifi消费192.168.2.15:9092】kafka的配置

nifi给【192.168.2.15:9093】kafka打数据的配置nifi给【192.168.2.15:9093】kafka打数据的配置

nifi给【192.168.2.11:6667】kafka打数据的配置nifi给【192.168.2.11:6667】kafka打数据的配置

数据流

1、通过【192.168.2.15:9092】这个kafka的生产者产生消息

2、然后nifi消费这个kafka的消息,在分别向【192.168.2.11:6667】和【192.168.2.15:9093】这两个kafka打,
3、通过console-consumer查看两个kafka是不是接收到消息了。

转载地址:http://fulwb.baihongyu.com/

你可能感兴趣的文章
深入理解JVM虚拟机1:JVM内存的结构与消失的永久代
查看>>
深入理解JVM虚拟机3:垃圾回收器详解
查看>>
深入理解JVM虚拟机4:Java class介绍与解析实践
查看>>
深入理解JVM虚拟机5:虚拟机字节码执行引擎
查看>>
深入理解JVM虚拟机6:深入理解JVM类加载机制
查看>>
深入了解JVM虚拟机8:Java的编译期优化与运行期优化
查看>>
深入理解JVM虚拟机9:JVM监控工具与诊断实践
查看>>
深入理解JVM虚拟机10:JVM常用参数以及调优实践
查看>>
深入理解JVM虚拟机11:Java内存异常原理与实践
查看>>
深入理解JVM虚拟机12:JVM性能管理神器VisualVM介绍与实战
查看>>
深入理解JVM虚拟机13:再谈四种引用及GC实践
查看>>
Spring源码剖析1:Spring概述
查看>>
Spring源码剖析2:初探Spring IOC核心流程
查看>>
Spring源码剖析3:Spring IOC容器的加载过程
查看>>
Spring源码剖析4:懒加载的单例Bean获取过程分析
查看>>
Spring源码剖析5:JDK和cglib动态代理原理详解
查看>>
Spring源码剖析6:Spring AOP概述
查看>>
Spring源码剖析7:AOP实现原理详解
查看>>
Spring源码剖析8:Spring事务概述
查看>>
Spring源码剖析9:Spring事务源码剖析
查看>>