Last active
March 6, 2018 14:33
-
-
Save ricston-git/a19fc3e2e3f1b5c0fe9a4a790d483e2e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class QuerySingleFileProcessor implements Callable, Startable { | |
private SftpConnector sftpConnector; | |
private ImmutableEndpoint immutableEndpoint; | |
private SftpReceiverRequesterUtil util; | |
@Inject | |
private MuleContext muleContext; | |
@Override | |
public void start() throws MuleException { | |
// Lookup the SFTPConnector from the Mule Registry | |
sftpConnector = (SftpConnector) this.muleContext.getRegistry().lookupConnector("SftpConnector"); | |
final EndpointBuilder endpointBuilder = this.muleContext.getRegistry().lookupObject("QuerySingleFileEndpoint"); | |
// Build an inbound endpoint based on the properties supplied in the global endpoint. | |
this.immutableEndpoint = endpointBuilder.buildInboundEndpoint(); | |
this.util = new SftpReceiverRequesterUtil(immutableEndpoint); | |
} | |
@Override | |
public Object onCall(MuleEventContext eventContext) throws Exception { | |
// Create a notifier which will notify notification subscribers of this SFTP event. | |
final SftpNotifier notifier = new SftpNotifier(sftpConnector, eventContext.getMessage(), immutableEndpoint, eventContext.getFlowConstruct().getName()); | |
// Obtain the name of the file we want to download from a flow variable. | |
final String fileName = eventContext.getMessage().getProperty("fileName", PropertyScope.INVOCATION); | |
if (StringUtils.isBlank(fileName)) { | |
throw new Exception("The flow variable 'fileName' cannot be null since it indicates which file to retrieve from the SFTP server."); | |
} | |
try { | |
// Download the file | |
final InputStream inputStream = util.retrieveFile(fileName, notifier); | |
// We only do this because org.mule.transport.sftp.SftpStream is package-private, | |
// but the subclasses are public. | |
if (inputStream instanceof SftpFileArchiveInputStream) { | |
final SftpFileArchiveInputStream sftpArchiveInputStream = (SftpFileArchiveInputStream) inputStream; | |
sftpArchiveInputStream.performPostProcessingOnClose(this.shouldDeleteFile()); | |
} | |
if (inputStream instanceof SftpInputStream) { | |
final SftpInputStream sftpInputStream = (SftpInputStream) inputStream; | |
sftpInputStream.performPostProcessingOnClose(this.shouldDeleteFile()); | |
} | |
return inputStream; | |
} | |
catch (IOException e) { | |
throw new DefaultMuleException(MessageFactory.createStaticMessage("An IOException was thrown whilst attempting to download '%s'", fileName), e); | |
} | |
} | |
/** | |
* A method that simply checks whether a file should be deleted or not based on the 'autoDelete' property | |
* defined on the connector or global endpoint. If you want to dynamically resolve this based on the Mule message, | |
* you can add it as a parameter. | |
*/ | |
protected boolean shouldDeleteFile() { | |
final String endpointAutoDelete = (String) immutableEndpoint.getProperty("autoDelete"); | |
final boolean connectorAutoDelete = sftpConnector.isAutoDelete(); | |
return connectorAutoDelete || Boolean.valueOf(endpointAutoDelete); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<sftp:connector name="SftpConnector" archiveDir="${sftp.archiveDir}" doc:name="SFTP"/> | |
<sftp:endpoint name="QuerySingleFileEndpoint" host="${sftp.host}" port="${sftp.port}" autoDelete="true" | |
connector-ref="SftpConnector" user="${sftp.username}" password="${sftp.password}" | |
path="${sftp.path}" doc:name="SFTP"/> | |
<flow name="read-sftp-files-flow" initialState="started"> | |
<sftp:inbound-endpoint host="${sftp.host}" port="${sftp.port}" doc:name="Read INSTRUCT files" | |
connector-ref="SftpConnector" user="${sftp.username}" password="${sftp.password}" | |
path="${sftp.path}" pollingFrequency="${sftp.pollingFrequency:10000}"> | |
<file:filename-wildcard-filter pattern="*.instruct" caseSensitive="true"/> | |
</sftp:inbound-endpoint> | |
<object-to-string-transformer doc:name="Transform INSTRUCT file to String" /> | |
<splitter expression="#[payload.split('\n')]" doc:name="Split file line by line"/> | |
<logger level="INFO" message="File to read from SFTP is: #[payload]" doc:name="Log files to read" /> | |
<set-variable variableName="fileName" value="#[payload]" doc:name="Set fileName flowVar" /> | |
<component doc:name="Query Single File"> | |
<singleton-object class="com.ricston.blogs.QuerySingleFileProcessor" /> | |
</component> | |
<object-to-string-transformer doc:name="Transform read file to String" /> | |
<logger level="INFO" message="File contents: #[payload]" doc:name="Log file contents" /> | |
</flow> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<component doc:name="Query Single File"> | |
<singleton-object class="com.ricston.blogs.QuerySingleFileProcessor" /> | |
</component> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment