My goal was to set up Flume on my web instances, and write all events into s3, so I could easily use other tools like Amazon Elastic Map Reduce, and Amazon Red Shift.
I didn't want to have to deal with log rotation myself, so I setup Flume to read from a syslog UDP source. In this case, Flume NG acts as a syslog server, so as long as Flume is running, my web application can simply write to it in syslog format on the specified port. Most languages have plugins for this.
At the time of this writing, I've been able to get Flume NG up and running on 3 ec2 instances, and all writing to the same bucket.
Flume NG is different than Flume OG (original generation) in that it doesn't use a master in any way. So if you see any articles referring to collectors or masters, you're probably looking at Flume OG documentations or guides.
First, go to http://flume.apache.org/download.html and find the link to the latest .tar.gz version. At the time of writing, it's 1.3.1
and the URL to download is http://apache.mirrors.tds.net/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz
SSH into your ec2 instances:
ssh -i ~/.ssh/XXXXXX.pem [email protected]
On your ec2 instances, wget
the .tar.gz, like so:
cd ~ wget http://apache.mirrors.tds.net/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz
Once it's done, untar it, like so:
cd ~ tar xvzf apache-flume-1.3.1-bin.tar.gz rm apache-flume-1.3.1-bin.tar.gz
To prepare for configuration, let's copy the template flume.conf
:
cp ~/apache-flume-1.3.1-bin/conf/flume-conf.properties.template ~/apache-flume-1.3.1-bin/conf/flume.conf
I ran into an issue which generated the error below. This seems to be caused by the hadoop jar not being included in Flume. So, download the hadoop binary as follows (look for the most recent version, which is 1.0.4 at the time of writing):
cd ~ wget http://mirror.symnds.com/software/Apache/hadoop/common/hadoop-1.0.4/hadoop-1.0.4-bin.tar.gz tar xvzf hadoop-1.0.4-bin.tar.gz rm hadoop-1.0.4-bin.tar.gz cp ~/hadoop-1.0.4/hadoop-core-1.0.4.jar ~/apache-flume-1.3.1-bin/lib/ cp ~/hadoop-1.0.4/lib/commons-configuration-1.6.jar ~/apache-flume-1.3.1-bin/lib/ cp ~/hadoop-1.0.4/lib/commons-httpclient-3.0.1.jar ~/apache-flume-1.3.1-bin/lib/ cp ~/hadoop-1.0.4/lib/jets3t-0.6.1.jar ~/apache-flume-1.3.1-bin/lib/ cp ~/hadoop-1.0.4/lib/commons-codec-1.4.jar ~/apache-flume-1.3.1-bin/lib/
Let's change the config file to write to S3:
vim ~/apache-flume-1.3.1-bin/conf/flume.conf
Replace the existing contents with the contents below.
Note: I originally thought about recoverable memory channels, but flume documentation has this warning: The Recoverable Memory Channel has been deprecated in favor of the FileChannel. FileChannel is durable channel and performs better than the Recoverable Memory Channel.
Another issue I ran into was that my secret key had a /
in it, which was causing issues. I generated a new keypair on amazon https://portal.aws.amazon.com/gp/aws/securityCredentials which didn't have any characters with issues, and it worked without a hitch.
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent.sources = seqGenSrc agent.channels = fileChannel agent.sinks = s3Sink # For each one of the sources, the type is defined agent.sources.seqGenSrc.type = syslogudp agent.sources.seqGenSrc.port = 5140 agent.sources.seqGenSrc.host = localhost # The channel can be defined as follows. agent.sources.seqGenSrc.channels = fileChannel # Each sink's type must be defined agent.sinks.s3Sink.type = hdfs #Specify the channel the sink should use agent.sinks.s3Sink.channel = fileChannel agent.sinks.s3Sink.hdfs.path = s3://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@BUCKET-NAME/ # Each channel's type is defined. agent.channels.fileChannel.type = file # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the file channel which can depend on the size of your filesystem agent.channels.fileChannel.capacity = 1000000
To run flume with the configuration you just setup, run:
cd ~/apache-flume-1.3.1-bin bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent
Let's trying writing to the UDP syslog on port 5140, and see if they end up in our s3 bucket! To do so, we'll run netcat
since logger
can't write to different ports.
The syslog implementation in Hadoop expects a format as follows: <14>Mmm DD HH:MM:SS ip-NN-NNN-NNN-NNN YOUR MESSAGE HERE"
, otherwise it'll get confused. Using that format, we run this (but please replace the timestamp):
nc -w0 -u localhost 5140 <<< "<14>Mar 29 19:25:55 ip-10-204-131-184 Test Message 1"
In a few seconds (give the buffering a chance ;-), you'll be able to see some files show up in your s3 bucket. Go to https://console.aws.amazon.com/s3/home?region=us-east-1 and see them!
The filenames will be in the format: FlumeData.NNNNNNNNNNNN
, and encoded in text format, since we didn't compress them in any way in the conf file. The resulting text in it should read as the same message after your IP address, with a newline:
Test Message 1
You can configure all your instances the same way, and try netcat
ing on all of them. You should see all of the writes show up to your s3 bucket, as I have been able to.
I don't want to have to login and start Flume NG on all my instances manually, of course.
I didn't see anything in the binary tarball for this, so I am going to start with (and revise) the init.d code from here: https://git-wip-us.apache.org/repos/asf?p=bigtop.git;a=blob;f=bigtop-packages/src/common/flume/flume-agent.init;hb=HEAD
My revision is posted at https://github.com/crowdmob/flume-1.3.1-agent.init/blob/master/flume-agent.init
First, let's put that in a script in init.d:
sudo touch /etc/init.d/flume-agent sudo chmod a+x /etc/init.d/flume-agent sudo vim /etc/init.d/flume-agent
Simply copy and paste the contents from https://raw.github.com/crowdmob/flume-1.3.1-agent.init/master/flume-agent.init
The next thing you have to do is to setup the directories for runtime. I simply did the following:
sudo mkdir /var/run/flume sudo mkdir /var/log/flume
Finally, run chkconfig to make sure it starts on boot:
sudo chkconfig flume-agent on
Here's the error that was generated, for completeness:
Fixed by including apache commons lib:
2013-03-29 16:53:01,756 (conf-file-poller-0) [ERROR - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:207)] Failed to start agent because dependencies were not found in classpath. Error follows. java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType at org.apache.flume.sink.hdfs.HDFSEventSink.configure(HDFSEventSink.java:214) at org.apache.flume.conf.Configurables.configure(Configurables.java:41) at org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadSinks(PropertiesFileConfigurationProvider.java:373) at org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.load(PropertiesFileConfigurationProvider.java:223) at org.apache.flume.conf.file.AbstractFileConfigurationProvider.doLoad(AbstractFileConfigurationProvider.java:123) at org.apache.flume.conf.file.AbstractFileConfigurationProvider.access$300(AbstractFileConfigurationProvider.java:38) at org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:202) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:165) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:679) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.io.SequenceFile$CompressionType at java.net.URLClassLoader$1.run(URLClassLoader.java:217) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:321) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:266) ... 15 more
Fixed by including the jets3t http libraries:
java.lang.NoClassDefFoundError: org/apache/http/client/methods/HttpHead at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:54) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:616) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) at org.apache.hadoop.fs.s3native.$Proxy1.initialize(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:207) at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53) at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172) at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170) at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143) at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170) at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:364) at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729) at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) at java.util.concurrent.FutureTask.run(FutureTask.java:166) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:679) Caused by: java.lang.ClassNotFoundException: org.apache.http.client.methods.HttpHead at java.net.URLClassLoader$1.run(URLClassLoader.java:217) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:321) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:266) ... 28 more Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoClassDefFoundError: org/apache/http/client/methods/HttpHead at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:54) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:616) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) at org.apache.hadoop.fs.s3native.$Proxy1.initialize(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:207) at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53) at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172) at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170) at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143) at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170) at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:364) at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729) at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) at java.util.concurrent.FutureTask.run(FutureTask.java:166) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:679) Caused by: java.lang.ClassNotFoundException: org.apache.http.client.methods.HttpHead at java.net.URLClassLoader$1.run(URLClassLoader.java:217) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:321) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:266) ... 28 more