4

We started to consolidate eventlog data from our applications by publishing messages to a Kafka topic. Although we could write directly from the application to Kafka, we chose to treat it as a generic problem and use the Flume agent. This provides some flexibility: if we wanted to capture something else from a server, we could just tail a different source and publish to a different Kafka topic.

We created a Flume agent conf file to tail a log and publish to a Kafka topic:

tier1.sources  = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = exec
tier1.sources.source1.command = tail -F /var/log/some_log.log
tier1.sources.source1.channels = channel1

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = some_log
tier1.sinks.sink1.brokerList = hadoop01:9092,hadoop02.com:9092,hadoop03.com:9092
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.batchSize = 20

Unfortunately, the messages themselves don't specify the host that generated them. If we have an application running on multiple hosts and an error occurs, we have no way to figure out which host generated the message.

I notice that, if Flume wrote directly to HDFS, we could use a Flume interceptor to write to a specific HDFS location. Although we could probably do something similar with Kafka, i.e. create a new topic for each server, this could become unwieldy. We'd end up with thousands of topics.

Can Flume append/include the hostname of the originating host when it publishes to Kafka topic?

Alex Woolford
  • 4,433
  • 11
  • 47
  • 80

2 Answers2

2

You can create a custom TCP source which reads the client address and adds it to the header.

@Override
    public void configure(Context context) {
        port = context.getInteger("port");
        buffer = context.getInteger("buffer");

        try{
            serverSocket = new ServerSocket(port);
            logger.info("FlumeTCP source initialized");
        }catch(Exception e) {
            logger.error("FlumeTCP source failed to initialize");
        }
    }

@Override
    public void start() {
        try {
            clientSocket = serverSocket.accept();
            receiveBuffer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
            logger.info("Connection established with client : " + clientSocket.getRemoteSocketAddress());
            final ChannelProcessor channel = getChannelProcessor();
            final Map<String, String> headers = new HashMap<String, String>();
            headers.put("hostname", clientSocket.getRemoteSocketAddress().toString());
            String line = "";
            List<Event> events = new ArrayList<Event>();

            while ((line = receiveBuffer.readLine()) != null) {
                Event event = EventBuilder.withBody(
                        line, Charset.defaultCharset(),headers);

                logger.info("Event created");
                events.add(event);
                if (events.size() == buffer) {
                    channel.processEventBatch(events);
                }
            }
        } catch (Exception e) {

        }
        super.start();
    }

The flume-conf.properties can be configured as:

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

agent.sources = CustomTcpSource
agent.channels = memoryChannel
agent.sinks = loggerSink

# For each one of the sources, the type is defined
agent.sources.CustomTcpSource.type = com.vishnu.flume.source.CustomFlumeTCPSource
agent.sources.CustomTcpSource.port = 4443
agent.sources.CustomTcpSource.buffer = 1


# The channel can be defined as follows.
agent.sources.CustomTcpSource.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

I sent a test message to test this and it looked like :

Event: { headers:{hostname=/127.0.0.1:50999} body: 74 65 73 74 20 6D 65 73 73 61 67 65             test message }

I have upload the project in my github

vishnu viswanath
  • 3,794
  • 2
  • 36
  • 47
1

If you're using the exec source, nothing prevents you from running a smart command to prefix the hostname to the log file content.

Note: if the command uses things like pipes, you'll also need to specify the shell like this:

tier1.sources.source1.type = exec
tier1.sources.source1.shell = /bin/sh -c
tier1.sources.source1.command =  tail -F /var/log/auth.log | sed --unbuffered "s/^/$(hostname) /"

The messages look like this:

frb.hi.inet 2015-11-17 08:39:39.432 INFO [...]

... where frb.hi.inet us the name of my host.

Alex Woolford
  • 4,433
  • 11
  • 47
  • 80
frb
  • 3,738
  • 2
  • 21
  • 51