Created
February 3, 2012 05:23
-
-
Save jteso/1728295 to your computer and use it in GitHub Desktop.
Spring Integration Outbound adapter to write files into HDFS (Hadoop file System).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package directlabs.experimental; | |
import org.apache.commons.lang.exception.ExceptionUtils; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FSDataOutputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.springframework.integration.Message; | |
import org.springframework.integration.handler.AbstractMessageHandler; | |
import org.springframework.util.Assert; | |
/** | |
* Spring Integration Outbound adapter to write files into HDFS (Hadoop file System). | |
* @author jtedilla | |
* | |
*/ | |
public class HdfsOutboundEndpoint extends AbstractMessageHandler{ | |
private Configuration configuration; | |
private String fileName = "si-default.data"; | |
public void setFileName(String fileName) { | |
this.fileName = fileName; | |
} | |
/* Constructors */ | |
public HdfsOutboundEndpoint() { | |
super(); | |
} | |
public HdfsOutboundEndpoint(Configuration configuration) { | |
super(); | |
this.configuration = configuration; | |
} | |
@Override | |
protected void handleMessageInternal(Message<?> message) throws Exception { | |
Assert.isInstanceOf(String.class, message.getPayload(), "Payload must be a String"); | |
String msg = (String) message.getPayload(); | |
writeInHdfs(msg); | |
} | |
protected void writeInHdfs(String msg) { | |
Configuration conf = this.configuration == null ? new Configuration(true) : new Configuration(this.configuration); | |
try{ | |
FileSystem hdfs = FileSystem.get(conf); | |
Path hdfsFile = new Path(this.fileName); | |
FSDataOutputStream out = null; | |
out = hdfs.exists(hdfsFile)? hdfs.append(hdfsFile):hdfs.create(hdfsFile); | |
out.writeUTF(msg); | |
}catch(Throwable t) { | |
throw new RuntimeException("Received exception:" + ExceptionUtils.getFullStackTrace(t) + " while trying to write in HDFS."); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment