Distributed Flume Setup With an S3 Sink

By eric

I have recently spent a few days getting up to speed with Flume, Cloudera‘s distributed log offering. If you haven’t seen this and deal with lots of logs, you are definitely missing out on a fantastic project. I’m not going to spend time talking about it because you can read more about it in the users guide or in the Quora Flume Topic in ways that are better than I can describe it. But I will tell you about is my experience setting up Flume in a distributed environment to sync logs to an Amazon S3 sink.

As CTO of SimpleReach, a company that does most of it’s work in the cloud, I’m constantly strategizing on how we can take advantage of the cloud for auto-scaling. Depending on the time of day or how much content distribution we are dealing with, we will spawn new instances to accommodate the load. We will still need the logs from those machines for later analysis (batch jobs like making use of Elastic Map Reduce).

I am going to attempt to do this as step by step as possible but much of the terminology I use is described in the users guide and there is an expectation that you have at least skimmed it prior to starting this HOWTO. I am using EMR (Elastic Map Reduce) on EC2 and not the provided Hadoop by Cloudera. Additionally, the Cloudera version that I am working with is cdh3b3.

Context
I have 3 kinds of servers all running CentOS in the Amazon cloud:

  1. a1: This is the agent which is producing all the logs
  2. c1: This is the collector which is aggregating all the logs (from a1, a2, a3, etc)
  3. u1: This is the flume master node which is sending out all the commands

There are actually n agents, but for this example, we’ll keep it simple. Also, for a complete copy of the config files, please check out the full gist available here.

Initial Setup
On both a1 and c1, you’ll have to install flume-node (flume-node contains the files necessary to run the agent or the collector).

1
2
3
# curl http://archive.cloudera.com/redhat/cdh/cloudera-cdh3.repo > /etc/yum.repos.d/cloudera-cdh3.repo
# yum update yum
# yum install flume flume-node

On u1, you’ll need to install the flume-master RPM:

1
2
3
# curl http://archive.cloudera.com/redhat/cdh/cloudera-cdh3.repo > /etc/yum.repos.d/cloudera-cdh3.repo
# yum update yum
# yum install flume flume-master

On each host, you need to copy the conf template file to the site specific config file. That is to say:

1
cp flume-site.xml.template flume-site.xml

First let’s jump onto the agent and set that up. Tune the $master_IP and $collector_IP variables appropriately, but change your /etc/flume/conf/flume-site.xml to look like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<configuration>
  <property>
    <name>flume.master.servers</name>
    <value>$master_IP</value>
    <description>This is the address for the config servers status server (http)</description>
  </property>

  <property>
    <name>flume.collector.event.host</name>
    <value>$collector_IP</value>
    <description>This is the host name of the default "remote" collector.</description>
  </property>

  <property>
    <name>flume.collector.port</name>
    <value>35853</value>
    <description>This default tcp port that the collector listens to in order to receive events it is collecting.</description>
  </property>

  <property>
    <name>flume.agent.logdir</name>
    <value>/mnt/flume-${user.name}/agent</value>
    <description> This is the directory that write-ahead logging data
      or disk-failover data is collected from applications gets
      written to. The agent watches this directory.
    </description>
  </property>
</configuration>

Now on to the collector. Same file, different config. Replace all the variables with you $master IP address (you should be using Amazon’s internal IPs otherwise you will be paying the regional charge). The $account and $secret variables are both your Amazon EC2/S3 account key and secret Access key respectively. The $bucket is the S3 bucket that will contain the log files. Also worthy of pointing out is the flume.collector.roll.millis and flume.collector.dfs.compress.gzip. The millis is how frequently the log file gets truncated and the next file begins to be written to. It would be nice if this could be done by file size and not only by time, but it works for now. The other config option is flume.collector.dfs.compress.gzip. This ensures that the logfiles are compressed prior to being dumped onto S3 (saves LOTS of space).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
<configuration>
  <property>
    <name>flume.master.servers</name>
    <value>$master</value>
    <description>This is the address for the config servers status server (http)</description>
  </property>


  <property>
    <name>flume.collector.event.host</name>
    <value>localhost</value>
    <description>This is the host name of the default "remote" collector.</description>
  </property>

  <property>
    <name>flume.collector.port</name>
    <value>35853</value>
    <description>This default tcp port that the collector listens to in order to receive events it is collecting.</description>
  </property>

  <property>
    <name>fs.default.name</name>
    <value>s3n://$account:$secret@$bucket</value>
  </property>

  <property>
    <name>fs.s3n.impl</name>
    <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
  </property>

  <property>
    <name>fs.s3.awsAccessKeyId</name>
    <value>$account</value>
  </property>

  <property>
    <name>fs.s3.awsSecretAccessKey</name>
    <value>$secret</value>
  </property>

  <property>
    <name>fs.s3n.awsAccessKeyId</name>
    <value>$account</value>
  </property>

  <property>
    <name>fs.s3n.awsSecretAccessKey</name>
    <value>$secret</value>
  </property>

  <property>
    <name>flume.agent.logdir</name>
    <value>/mnt/flume-${user.name}/agent</value>
    <description> This is the directory that write-ahead logging data
      or disk-failover data is collected from applications gets
      written to. The agent watches this directory.
    </description>
  </property>

  <property>
    <name>flume.collector.dfs.dir</name>
    <value>file:///mnt/flume-${user.name}/collected</value>
    <description>This is a dfs directory that is the the final resting
    place for logs to be stored in.  This defaults to a local dir in
    /tmp but can be hadoop URI path that such as hdfs://namenode/path/
    </description>
  </property>

  <property>
    <name>flume.collector.dfs.compress.gzip</name>
    <value>true</value>
    <description>Writes compressed output in gzip format to dfs. value is
     boolean type, i.e. true/false</description>
  </property>

  <property>
    <name>flume.collector.roll.millis</name>
    <value>60000</value>
    <description>The time (in milliseconds)
    between when hdfs files are closed and a new file is opened
    (rolled).
    </description>
  </property>
</configuration>

While we are still on the collector, in order to properly write to S3, you’ll need to make 4 file adjustments and all of them will go into the /usr/lib/flume/lib/ directory.

  1. commons-codec-1.4.jar
  2. jets3t-0.6.1.jar
  3. commons-httpclient-3.0.1.jar
  4. emr-hadoop-core-0.20.jar

The one thing that should be noted here is that the emr-hadoop-core-0.20.jar file replaces the hadoop-core.jar symlink. The emr-hadoop-core-0.20.jar file is the hadoop-core.jar file from an EC2 Hadoop cluster instance. Note: This will break the ability to seamlessly upgrade via the RPM (which is how you installed it if you’ve been following my HOWTO). Keep these files around just in case. I have added a tarball of the files here, but they are all still available with a quick Google search.

And now on to the master. There was actually no configuration that I did on the master file system to get things up and running. But if flume is writing to a /tmp directory on an ephemeral file system, then it should be fixed.

Web Based Setup

I chose to do the individual machine setup via the master web interface. You can get to this pointing your web browser at http://u1:35871/ (replace u1 with public DNS IP of your flume master). Ensure that the port is accessible from the outside through your security settings. At this point, it was easiest for me to ensure all hosts running flume could talk to all ports on all other hosts running flume. You can certainly lock this down to the individual ports for security once everything is up and running.

At this point, you should go to a1 and c1 run /etc/init.d/flume-node start. If everything goes well, then the master (whose IP is specified in their configs) should be notified of their existence. Now you can configure them from the web. Click on the config link and then fill in the text lines as follows (use what is in bold):

  • Agent Node: $agent_ec2_internal_ip
  • Source: tailDir(“/mnt/logs/”,”.*.log”)
  • Sink: agentBESink(“$collector_ec2_internal_ip”,35853)

Note: I chose to use tailDir since I will control rotating the logs on my own. I am also using agentBESink because I am ok with losing log lines if the case arises.

Now click Submit Query and go back to the config page to setup the collector:

  • Agent Node: $collector_ec2_internal_ip
  • Source: collectorSource(35853)
  • Sink: collectorSink(“s3n://$account:$secret@$bucket/logs/%Y/%m/%d/%H00″,”server”)

This is going to tell the collector that we are sinking to s3native with the $account key and the $secret key into the $bucket with an initial folder of ‘logs’. It will then log to sub-folders with YYYY/MM/DD/HH00 (or 2011/02/03/1300/server-.log). There will be 60 gziped files in each folder since the timing is setup to be 1 file per minute. Now click Submit Query and go to the ‘master’ page and you should see 2 commands listed as “SUCCEEDED” in the command history. If they have not succeeded, ensure a few things have been done (there are probably more, but this is a handy start:

  1. Always use double quotes (“) since single quotes (‘) aren’t interpreted correctly. UPDATE: Single quotes are interpreted correctly, they are just not accepted intentionally (Thanks jmhsieh)
  2. In your regex, use something like “.*\\.log” since the ‘.’ is part of the regex.
  3. In your regex, ensure that your blackslashes are properly escaped: “foo\\bar” is the correct version of trying to match “foo\bar”.
  4. Ensure any ‘/’ are inserted as ‘%2F’ in the Amazon account and secret codes.

Additionally, there are also tables of Node Status and Node Configuration. These should match up with what you think you configured.

At this point everything should work. Admittedly I had a lot of trouble getting to this point. But with the help of the Cloudera folks and the users on irc.freenode.net in #flume, I was able to get things going. The logs sadly aren’t too helpful here in most cases (but look anyway cause they might provide you with more info than they provided for me). If I missed anything in this post or there is something else I am unaware of, then let me know.

References

Follow My Travels

Buy My Book

Archives

  • 2020
  • 2019
  • 2017
  • 2014
  • 2013
  • 2012
  • 2011
  • 2010
  • 2009
  • 2008
  • 2007
  • 2006

New Posts By Email

writing