I'm trying to write log files to hdfs
which is in EMR
using flume but I'm facing an error.
I have flume
(1.6.0
version) on machine X and another flume
running on machine Y which have AWS
and I want to populate my log files (to HDFS
which in AWS
(EMR
)) from machine X to machine Y while running agent encounters error in machine Y.
My machine X config:
agent.sources = localsource
agent.channels = memoryChannel
agent.sinks = avro_Sink
agent.sources.localsource.type = spooldir
agent.sources.localsource.spoolDir = /home/dwh/teja/Flumedata/
agent.sources.localsource.fileHeader = true
agent.sources.localsource.channels = memoryChannel
agent.sinks.avro_Sink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000
agent.sinks.avro_Sink.type = avro
agent.sinks.avro_Sink.hostname= ec2-serverid.compute-1.amazonaws.com
agent.sinks.avro_Sink.port= 8021
agent.sinks.avro_Sink.avro.batchSize = 100
agent.sinks.avro_Sink.avro.rollCount = 0
agent.sinks.avro_Sink.avro.rollSize = 73060835
agent.sinks.avro_Sink.avro.rollInterval = 0
agent.sources.localsource.interceptors = search-replace regex-filter1
agent.sources.localsource.interceptors.search-replace.type = search_replace
agent.sources.localsource.interceptors.search-replace.searchPattern = ###|##
agent.sources.localsource.interceptors.search-replace.replaceString = | my machineY config:
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = avro
tier1.sources.source1.bind=serverid
tier1.sources.source1.port = 8021
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type= memory
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.path = hdfs://serverid:8020/user/hadoop/flumelogs/
tier1.sinks.sink1.hdfs.filePrefix = Flumedata
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.writeFormat= Text
tier1.sinks.sink1.hdfs.batchSize = 10000
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.rollSize = 73060835
tier1.sinks.sink1.hdfs.rollInterval = 0
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
ERROR log:
2016-06-08 15:19:01,635 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:455)] HDFS IO error
org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy5.getProtocolVersion(Unknown Source)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:238)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:203)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:243)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:235)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
If anyone is familiar with this please help.