Last active
September 25, 2018 23:46
-
-
Save colindean/d1191daf17c494edd3f7e032b0e1fb61 to your computer and use it in GitHub Desktop.
An attempt at an ExecuteGroovyScript to fill in for ListDatabaseTables until NIFI-5519 is implemented
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/******** | |
* ListDatabaseTablesWithLookup | |
* | |
* by Colin Dean <[email protected]> | |
* | |
* It it a cobbled-together attempt at implementing something to workaround | |
* ListDatabaseTables' inability to take incoming FlowFiles, which prevents | |
* that processor from using DBCPConnectionPoolLookup as its controller service | |
* instead of DBCPConnectionPool. This affects NiFi 1.7.0+. | |
* | |
* https://issues.apache.org/jira/browse/NIFI-5519 is tracking a proposal to | |
* change the behavior of ListDatabaseTables. | |
* | |
* Required dynamic properties: | |
* | |
* * databaseConnectionPoolName - the name of the controller service that is a DBCPConnectionPoolLookup | |
* | |
* This script borrows heavily from ListDatabaseTables, a part of Apache NiFi 1.7.1 | |
* | |
* https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java | |
* | |
* /* | |
* * Licensed to the Apache Software Foundation (ASF) under one or more | |
* * contributor license agreements. See the NOTICE file distributed with | |
* * this work for additional information regarding copyright ownership. | |
* * The ASF licenses this file to You under the Apache License, Version 2.0 | |
* * (the "License"); you may not use this file except in compliance with | |
* * the License. You may obtain a copy of the License at | |
* * | |
* * http://www.apache.org/licenses/LICENSE-2.0 | |
* * | |
* * Unless required by applicable law or agreed to in writing, software | |
* * distributed under the License is distributed on an "AS IS" BASIS, | |
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* * See the License for the specific language governing permissions and | |
* * limitations under the License. | |
* */ | |
import org.apache.nifi.controller.ControllerService | |
import org.apache.nifi.flowfile.FlowFile | |
import org.apache.nifi.logging.ComponentLog | |
import org.apache.nifi.processor.ProcessContext | |
import org.apache.nifi.processor.ProcessSession | |
import org.apache.nifi.processor.Relationship | |
import java.sql.Connection | |
import java.sql.DatabaseMetaData | |
import java.util.stream.Stream | |
import org.apache.nifi.util.StringUtils | |
import java.util.stream.Collectors | |
import java.sql.SQLException | |
import java.sql.ResultSet | |
final String dbcpLookupServiceAttribute = "database.name" | |
// Attribute names | |
final String DB_TABLE_NAME = "db.table.name" | |
final String DB_TABLE_CATALOG = "db.table.catalog" | |
final String DB_TABLE_SCHEMA = "db.table.schema" | |
final String DB_TABLE_FULLNAME = "db.table.fullname" | |
final String DB_TABLE_TYPE = "db.table.type" | |
final String DB_TABLE_REMARKS = "db.table.remarks" | |
final String DB_TABLE_COUNT = "db.table.count" | |
//renaming vars from binding for syntax highlighting | |
ProcessSession thisSession = session | |
ProcessContext thisContext = context | |
Relationship SUCCESS = REL_SUCCESS | |
Relationship FAILURE = REL_FAILURE | |
ComponentLog Log = log | |
FlowFile flowFileIn = thisSession.get() | |
if(!flowFileIn) return | |
boolean flowFileInIsRemovable = true | |
Closure<Connection> getConnection = { String dbcpLookupServiceKey -> | |
// expected dynamic properties | |
def dbcpLookupServiceName = databaseConnectionPoolName.value | |
Log.info("Controller service to lookup by name: $dbcpLookupServiceName") | |
if(dbcpLookupServiceName == null) { | |
throw new Exception("databaseConnectionPoolName property unset") | |
} | |
def serviceLookup = thisContext.controllerServiceLookup | |
def dbcpServiceId = serviceLookup.getControllerServiceIdentifiers(ControllerService).find { | |
cs -> serviceLookup.getControllerServiceName(cs) == dbcpLookupServiceName | |
} | |
if(dbcpServiceId == null) { | |
throw new Exception("Unable to get controller service id for $dbcpLookupServiceName") | |
} | |
Log.info("$dbcpLookupServiceName is ID $dbcpServiceId") | |
def connectionConfig = [(dbcpLookupServiceAttribute): dbcpLookupServiceKey] | |
Log.info("Connection configuration: $connectionConfig") | |
def dbcpService = serviceLookup.getControllerService(dbcpServiceId) | |
if(dbcpService == null) { | |
throw new Exception("Unable to get controller service for ${dbcpServiceId}") | |
} | |
return dbcpService.getConnection(connectionConfig) | |
} | |
Connection conn | |
try { | |
String dbcpLookupServiceKey = flowFileIn.'database.name' | |
conn = getConnection(dbcpLookupServiceKey) | |
Log.debug("Connection open") | |
DatabaseMetaData dbMetaData = conn.getMetaData() | |
// https://docs.oracle.com/javase/8/docs/api/java/sql/DatabaseMetaData.html#getTables(java.lang.String,%20java.lang.String,%20java.lang.String,%20java.lang.String[]) | |
ResultSet rs = dbMetaData.getTables(null, null, null, null) | |
while (rs.next()) { | |
FlowFile flowFileOut = thisSession.create(flowFileIn) | |
if(!flowFileOut) return | |
try { | |
final String tableCatalog = rs.getString(1) | |
final String tableSchema = rs.getString(2) | |
final String tableName = rs.getString(3) | |
final String tableType = rs.getString(4) | |
final String tableRemarks = rs.getString(5) | |
Log.info("Found table $tableName for database key $dbcpLookupServiceKey") | |
// Build fully-qualified name | |
String fqn = Stream.of(tableCatalog, tableSchema, tableName) | |
.filter({ segment -> !StringUtils.isEmpty(segment) }) | |
.collect(Collectors.joining(".")) | |
if (tableCatalog != null) { | |
flowFileOut = thisSession.putAttribute(flowFileOut, DB_TABLE_CATALOG, tableCatalog) | |
} | |
if (tableSchema != null) { | |
flowFileOut = thisSession.putAttribute(flowFileOut, DB_TABLE_SCHEMA, tableSchema) | |
} | |
flowFileOut = thisSession.putAttribute(flowFileOut, DB_TABLE_NAME, tableName) | |
flowFileOut = thisSession.putAttribute(flowFileOut, DB_TABLE_FULLNAME, fqn) | |
flowFileOut = thisSession.putAttribute(flowFileOut, DB_TABLE_TYPE, tableType) | |
if (tableRemarks != null) { | |
flowFileOut = thisSession.putAttribute(flowFileOut, DB_TABLE_REMARKS, tableRemarks) | |
} | |
thisSession.transfer(flowFileOut, SUCCESS) | |
} catch(Exception e) { | |
Log.error('Scripting error', e) | |
flowFileOut = thisSession.putAttribute(flowFileOut, 'script.error', e.message) | |
thisSession.transfer(flowFileOut, FAILURE) | |
} | |
} | |
} catch (Exception e) { | |
flowFileInIsRemovable = false | |
Log.error('Scripting error while establishing querying database', e) | |
flowFileIn = thisSession.putAttribute(flowFileIn, 'script.error', e.message) | |
thisSession.transfer(flowFileIn, FAILURE) | |
} finally { | |
if(flowFileInIsRemovable){ | |
thisSession.remove(flowFileIn) | |
} | |
if (conn) { | |
conn.close() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment