要求
conf 文件
a1.sources=r1
a1.sinks=k1
a1.channels=c1a1.sources.r1.type=avro
a1.sources.r1.bind=master
a1.sources.r1.port=9999a1.sinks.k1.type=avro
a1.sinks.k1.hostname=slave1
a1.sinks.k1.port=7777a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
第二个conf
a1.sources=r1
a1.sinks=k1
a1.channels=c1a1.sources.r1.type=avro
a1.sources.r1.bind=slave1
a1.sources.r1.port=7777a1.sinks.k1.type=loggera1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
当运行第第一个conf
出现错误
org.apache.flume.FlumeException: NettyAvroRpcClient { host: slave1, port: 8888 }: RPC connection errorat org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:181)at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:120)at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:638)at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:90)at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:210)at org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:290)at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)at org.apache.flume.SinkRunner.start(SinkRunner.java:79)at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error connecting to slave1/192.168.111.199:8888at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:169)... 16 more
Caused by: java.net.ConnectException: Connection refused: slave1/192.168.111.199:8888at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)... 3 more
17/10/06 21:52:54 INFO sink.AbstractRpcSink: Rpc sink s1 started.
17/10/06 21:52:54 INFO sink.AbstractRpcSink: Rpc sink s1: Building RpcClient with hostname: slave1, port: 8888
17/10/06 21:52:54 INFO sink.AvroSink: Attempting to create Avro Rpc client.
17/10/06 21:52:54 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
17/10/06 21:52:54 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send eventsat org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:389)at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient { host: slave1, port: 8888 }: RPC connection errorat org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:181)at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:120)at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:638)at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:90)at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:210)at org.apache.flume.sink.AbstractRpcSink.verifyConnection(AbstractRpcSink.java:270)at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:346)... 3 more
Caused by: java.io.IOException: Error connecting to slave1/192.168.111.199:8888at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:169)... 10 more
Caused by: java.net.ConnectException: Connection refused: slave1/192.168.111.199:8888at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)... 1 more
原因是先后顺序不对 得先打开第二个conf这样才可以保证sink的avro输出有接收方