Created
July 10, 2018 10:36
-
-
Save steklopod/5caf0971b323b7b20a2d3117c9c386b8 to your computer and use it in GitHub Desktop.
JDBC methods (select, insert)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ru.gamble.servicebus.core.processor.ifd.manager; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.camel.Exchange; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.stereotype.Service; | |
import ru.gamble.servicebus.core.domain.Task; | |
import ru.gamble.servicebus.core.domain.TaskType; | |
import ru.gamble.servicebus.core.exception.PackageIdUpdaterExeption; | |
import ru.gamble.servicebus.core.exception.RowException; | |
import ru.gamble.servicebus.core.exception.StatusUpdaterException; | |
import ru.gamble.servicebus.core.exception.TaskTypeConfigurationException; | |
import ru.gamble.servicebus.core.model.ifd.TableRows; | |
import ru.gamble.servicebus.core.service.ServiceBusMainParamService; | |
import ru.gamble.servicebus.core.service.TaskTypeService; | |
import ru.gamble.servicebus.core.util.StringParseUtils; | |
import javax.sql.DataSource; | |
import java.sql.*; | |
import java.util.AbstractMap.SimpleEntry; | |
import java.util.*; | |
import java.util.Map.Entry; | |
import java.util.function.Function; | |
import static java.util.stream.Collectors.toList; | |
import static ru.gamble.servicebus.core.model.ifd.SignalTableColumnNames.*; | |
import static ru.gamble.servicebus.core.model.ifd.SignalTableStatus.*; | |
import static ru.gamble.servicebus.core.util.DbUtil.generateFilledInClauseForLargeBatches; | |
import static ru.gamble.servicebus.core.util.DbUtil.getColumnNamesClassesFromResultSet; | |
@Slf4j | |
@Service | |
public class TableWorker extends ParamsHelper { | |
@Autowired | |
private ServiceBusMainParamService paramService; | |
@Autowired | |
private TaskTypeService taskTypeService; | |
/** | |
* Выбор всех строк во входящей таблице для переноса с учетом игнорируемых колонок. | |
* | |
* @param ds datasource; | |
* @param tableName входящая таблица для переноса; | |
* @param ignoredColumns список игнорируемых колонок. | |
* @return объект пару: а)таблицу целиком б) список названий колонок для переноса. | |
*/ | |
public Entry<List<TableRows>, List<String>> selectAllRows(DataSource dsToReadColumnNames, String inRange, DataSource ds, String tableName, List<String> ignoredColumns) { | |
String whereClause = " WHERE " + PACKAGE_ID.getColumnName() + " IN (" + inRange + ")"; | |
try (Connection connection = dsToReadColumnNames.getConnection(); | |
Statement stmnt = connection.createStatement(); | |
ResultSet rsForColumnNames = stmnt.executeQuery("SELECT * FROM " + tableName + " WHERE 1 = 0")) { | |
Map<String, Class<?>> columnNamesAndClasses = getColumnNamesClassesFromResultSet(rsForColumnNames); | |
columnNamesAndClasses.keySet().removeAll(ignoredColumns); | |
LinkedList<String> colNamesOnly = new LinkedList<>(columnNamesAndClasses.keySet()); | |
String select = generateSelectString(tableName, colNamesOnly) + whereClause; | |
try (Connection conn = ds.getConnection(); | |
ResultSet resultSet = conn.prepareStatement(select).executeQuery()) { | |
List<TableRows> tableRows = formTable(resultSet, columnNamesAndClasses); | |
return new SimpleEntry<>(tableRows, colNamesOnly); | |
} | |
} catch (SQLException | TaskTypeConfigurationException e) { | |
log.error("Can't get data from " + tableName); | |
throw new RowException(e); | |
} | |
} | |
/** | |
* Проверка DataSource на признак Oracle для определения SQL-типов. | |
*/ | |
public static DataSource selectDataSourceToReadColumnNames(DataSource signal, DataSource buffer) { | |
DataSource dsSignal = signal; | |
DataSource dsBuffer = buffer; | |
try (Connection senderConnection = dsSignal.getConnection(); | |
Connection receiverConnection = dsBuffer.getConnection()) { | |
String senderDbName = senderConnection.getMetaData().getDatabaseProductName().toLowerCase(); | |
String receiverDbName = receiverConnection.getMetaData().getDatabaseProductName().toLowerCase(); | |
if (!senderDbName.equals(receiverDbName) && senderDbName.contains("oracle")) { | |
return dsBuffer; | |
} else { | |
return dsSignal; | |
} | |
} catch (SQLException e) { | |
throw new RowException(e); | |
} | |
} | |
/** | |
* Инициализация списка имен колонок в целевой таблице. | |
*/ | |
List<String> initColumnNamesList(ResultSet resultSet) throws SQLException { | |
ResultSetMetaData metaData = resultSet.getMetaData(); | |
List<String> columnNames = new LinkedList<>(); | |
for (int i = 1; i <= metaData.getColumnCount(); i++) { | |
columnNames.add(metaData.getColumnName(i)); | |
} | |
return columnNames; | |
} | |
/** | |
* Вставка всех строк в целевую БУФЕРНУЮ таблицу с учетом изменения `package_id`. | |
* | |
* @param ds datasource; | |
* @param tableName имя таблицы для вставки; | |
* @param rowList объект таблицы, содержащий в себе все строки {@link TableRows row} | |
* @param columnNames список названий колонок таблицы; | |
*/ | |
public void insertRows(DataSource ds, String tableName, List<TableRows> rowList, Map<Integer, Integer> idsNewIds, List<String> columnNames) { | |
String insert = generateInsertString(tableName, columnNames); | |
int packageIdIndex = columnNames.indexOf(PACKAGE_ID.getColumnName()) + 1; | |
try (Connection connection = ds.getConnection(); | |
PreparedStatement stmnt = connection.prepareStatement(insert)) { | |
for (TableRows tableWorkerCurrent: rowList) { | |
int z = 1; | |
List<Object> valuesToInsert = new LinkedList<>(tableWorkerCurrent.getRow()); | |
for (Object valueOfCurrentColumn: valuesToInsert) { | |
if (valueOfCurrentColumn != null) { | |
if (z == packageIdIndex) { | |
Optional<Integer> autoGenId = Optional.ofNullable(idsNewIds.get(valueOfCurrentColumn)); | |
if (autoGenId.isPresent()) { | |
stmnt.setObject(z++, autoGenId.get()); | |
} | |
} else { | |
stmnt.setObject(z++, valueOfCurrentColumn); | |
} | |
} else { | |
stmnt.setNull(z++, Types.NULL); | |
} | |
} | |
stmnt.executeUpdate(); | |
} | |
} catch (SQLException e) { | |
log.error("Can't insert data in " + tableName, e.getLocalizedMessage()); | |
throw new RowException(e); | |
} | |
} | |
/** | |
* Вставка всех строк в целевую СИГНАЛЬНУЮ таблицу. | |
* | |
* @param ds datasource; | |
* @param tableName имя таблицы для вставки; | |
* @param rowList объект таблицы, содержащий в себе все строки {@link TableRows row}; | |
* @param columnNames список названий колонок таблицы; | |
* @param systemMethodIdOfRequester новое значение колонки `receiver_id`. | |
* @return словарь (старый id -> автоинкр. id) | |
*/ | |
public Map<Integer, Integer> insertRowsInSignal(DataSource ds, String tableName, List<TableRows> rowList, List<String> columnNames, Optional<Integer> systemMethodIdOfRequester) { | |
String insert = generateInsertString(tableName, columnNames); | |
ArrayList<Integer> autoIncrementsIds = new ArrayList<>(); | |
Map<Integer, Integer> idsNewId = new HashMap<>(); | |
int k = 0; | |
int statusCodeIndex = columnNames.indexOf(STATUS_CODE.getColumnName()) + 1; | |
int packageIdIndex = columnNames.indexOf(PACKAGE_ID.getColumnName()) + 1; | |
int receiverIdIndex = columnNames.indexOf(RECEIVER_ID.getColumnName()) + 1; | |
try (Connection connection = ds.getConnection(); | |
PreparedStatement stmnt = connection.prepareStatement(insert, Statement.RETURN_GENERATED_KEYS)) { | |
for (TableRows listOfRows: rowList) { | |
List<Object> valuesToInsert = new LinkedList<>(listOfRows.getRow()); | |
setObjectForEachColumnOfRowForSignalTable(valuesToInsert, autoIncrementsIds, stmnt, statusCodeIndex, packageIdIndex, receiverIdIndex, systemMethodIdOfRequester); | |
stmnt.executeUpdate(); | |
try (ResultSet rs = stmnt.getGeneratedKeys()) { | |
while (rs.next()) { | |
Integer key = autoIncrementsIds.get(k++); | |
idsNewId.put(key, rs.getInt(1)); | |
} | |
} | |
} | |
} catch (SQLException e) { | |
log.error("Can't insert data in " + tableName, e.getLocalizedMessage()); | |
throw new RowException(e); | |
} | |
return idsNewId; | |
} | |
private void setObjectForEachColumnOfRowForSignalTable(List<Object> values, List<Integer> autoIncrementsIds, PreparedStatement stmnt, | |
final int statusCodeIndex, final int packageIdIndex, int receiverIdIndex, Optional<Integer> systemMethodIdOfRequester) throws SQLException { | |
int z = 1; | |
for (Object entry: values) { | |
if (entry != null) { | |
if (z == statusCodeIndex) { | |
stmnt.setObject(z++, PROCESSING_BY_SOURCE_SYS.getId()); | |
} else if (z == packageIdIndex) { | |
autoIncrementsIds.add((Integer) entry); | |
stmnt.setNull(z++, Types.NULL); | |
} else if (z == receiverIdIndex && systemMethodIdOfRequester.isPresent()) { | |
stmnt.setObject(z++, systemMethodIdOfRequester.get()); | |
} else { | |
stmnt.setObject(z++, entry); | |
} | |
} else { | |
stmnt.setNull(z++, Types.NULL); | |
} | |
} | |
} | |
/** | |
* Формирование объекта таблицы, состоящей из кортежей {@link TableRows row} | |
* | |
* @param resultSet resultSet; | |
* @param columnNames список имен колонок таблицы. | |
* @return список всех строк таблицы. | |
* @throws SQLException | |
*/ | |
List<TableRows> formTable(ResultSet resultSet, Map<String, Class<?>> columnNames) throws SQLException { | |
List<TableRows> table = new LinkedList<>(); | |
if (resultSet != null) { | |
while (resultSet.next()) { | |
TableRows tableRow = new TableRows(); | |
for (Map.Entry<String, Class<?>> colNameAndClass: columnNames.entrySet()) { | |
tableRow.add(resultSet.getObject(colNameAndClass.getKey(), colNameAndClass.getValue())); | |
} | |
table.add(tableRow); | |
} | |
} else { | |
log.debug("resultSet is empty"); | |
} | |
return table; | |
} | |
public static List<String> getColumnNames(Entry<List<TableRows>, List<String>> signalTableRows) { | |
return new LinkedList<>(signalTableRows.getValue()); | |
} | |
public static List<TableRows> getTableRows(Entry<List<TableRows>, List<String>> signalTableRows) { | |
return signalTableRows.getKey(); | |
} | |
public Entry<List<TableRows>, List<String>> selectAllRows(DataSource dsToReadColumnNames, String inRange, DataSource dataSource, String tableName, String... ignoredColumns) { | |
return selectAllRows(dsToReadColumnNames, inRange, dataSource, tableName, Arrays.asList(ignoredColumns)); | |
} | |
/** | |
* Генерация строки для вставки, имея список колонок. | |
* | |
* @param tableToWriteIn таблица исходящих сообщений; | |
* @param columnNames список имен колонок. | |
* @return подготовленнную SQL строку для вставки PreparedStatement. | |
*/ | |
public static String generateInsertString(String tableToWriteIn, List<String> columnNames) { | |
return "INSERT INTO " + tableToWriteIn + " (" | |
+ String.join(", ", columnNames) | |
+ ") VALUES (" | |
+ String.join(", ", Collections.nCopies(columnNames.size(), "?")) | |
+ ")"; | |
} | |
/** | |
* Генерация строки выборки, имея список колонок. | |
* | |
* @param tableName таблица из которой читаем данные; | |
* @param columnNames список названий колонок таблицы; | |
* @return подготовленнную SQL строку для выборки колонок из переданного списка строк. | |
*/ | |
public static String generateSelectString(String tableName, Collection<String> columnNames) { | |
return "SELECT " + String.join(", ", columnNames) + " FROM " + tableName; | |
} | |
/** | |
* @param exchange текущий; | |
* @param targetTTParamName имя параметра целевых ТТ'ов, содержие целочисленное искомое значение; | |
* @param currentTTParamName имя параметра текущего ТТ, содержащего список целевых takType'ов. | |
* @return минимальное значение параметра `targetTTParamName` из целевых takType'ов. | |
*/ | |
public Optional<Integer> getMinValueFromTargetTaskTypes(Exchange exchange, String targetTTParamName, String currentTTParamName) { | |
Task task = exchange.getIn().getBody(Task.class); | |
Function<String, TaskType> taskTypeByName = taskTypeService::getTaskTypeByName; | |
Function<TaskType, Integer> extractIntParam = taskType -> StringParseUtils.extractIntValueFromString( | |
paramService.getByNameAndType(targetTTParamName, taskType) | |
.getValue()); | |
Function<String, Integer> extractIntParamValueFromTaskType = taskTypeByName.andThen(extractIntParam); | |
return getTTParamsAllAsList(task, currentTTParamName, false).stream() | |
.map(extractIntParamValueFromTaskType) | |
.min(Integer::compare); | |
} | |
/** | |
* Обновление `status_code` в #tableName, где `status_code` = 1 и `esb_package_id` меньше #minimum. | |
* | |
* @param tableName имя целевой таблицы; | |
* @param dataSource dataSource; | |
* @param minimum минимальное значение параметра `targetTTParamName` из целевых takType'ов. | |
*/ | |
public void updateStatusCode(String tableName, DataSource dataSource, Integer minimum) { | |
String sqlUpdate = "UPDATE " + tableName + " SET " + STATUS_CODE.getColumnName() + " = " + SUCCESS_COMPLETED.getId() + | |
" WHERE " + STATUS_CODE.getColumnName() + " = " + PROCESSING_BY_ESB.getId() + " AND " + ESB_PACKAGE_ID.getColumnName() + " <= ?"; | |
try (Connection connection = dataSource.getConnection(); | |
PreparedStatement ps = connection.prepareStatement(sqlUpdate)) { | |
ps.setInt(1, minimum); | |
ps.executeUpdate(); | |
} catch (SQLException e) { | |
throw new StatusUpdaterException("Can't update `status_code` in " + tableName, e); | |
} | |
} | |
/** | |
* Обновление esb_package_id в сигнальной таблице. | |
* @param tableName наименование сигнальной таблицы; | |
* @param ps preparedStatement. | |
* @param esbPackageToBeInserted значение `esb_package_id` который будет записан; | |
* @param packageIdOfUpdatedRow id строки в таблице, у которой будет обновлено `esb_package_id`. | |
*/ | |
public void updateEsbPckgId(String tableName, PreparedStatement ps, int esbPackageToBeInserted, int packageIdOfUpdatedRow) { | |
try { | |
ps.clearParameters(); | |
ps.setInt(1, esbPackageToBeInserted); | |
ps.setInt(2, packageIdOfUpdatedRow); | |
ps.executeUpdate(); | |
} catch (SQLException e) { | |
throw new PackageIdUpdaterExeption("Can't update esb_package_id in " + tableName, e); | |
} | |
} | |
public static String getUpdateString(String tableName) { | |
return "UPDATE " + tableName + " SET " + ESB_PACKAGE_ID.getColumnName() + " = ?" + | |
" WHERE " + PACKAGE_ID.getColumnName() + " = ?"; | |
} | |
/** | |
* Обновление `status_code` в таблице, где `status_code` == 3. | |
* | |
* @param statusCodeNewValue новое значение; | |
* @param tableName таблица; | |
* @param dataSource dataSource; | |
* @param fromOldIdsToAutoIncrIds | |
*/ | |
public void updateStatusCode(int statusCodeNewValue, String tableName, DataSource dataSource, Map<Integer, Integer> fromOldIdsToAutoIncrIds) { | |
List<String> pckgIds = fromOldIdsToAutoIncrIds.values().stream().map(String::valueOf).collect(toList()); | |
String whereTerm = generateFilledInClauseForLargeBatches(pckgIds, PACKAGE_ID.getColumnName()); | |
String sqlUpdate = "UPDATE " + tableName + " SET " + STATUS_CODE.getColumnName() + " = ?" + | |
" WHERE " + STATUS_CODE.getColumnName() + " = " + PROCESSING_BY_SOURCE_SYS.getId() + | |
" AND " + whereTerm; | |
try (Connection connection = dataSource.getConnection(); | |
PreparedStatement ps = connection.prepareStatement(sqlUpdate)) { | |
ps.setInt(1, statusCodeNewValue); | |
ps.executeUpdate(); | |
} catch (SQLException e) { | |
throw new StatusUpdaterException("Can't update `status_code` in " + tableName, e); | |
} | |
} | |
/** | |
* Получение списка id, где status_code = 2. | |
* | |
* @param tableName наименование сигнальной таблицы; | |
* @param connection connection. | |
* @return список `package_id` со статусом 2. | |
*/ | |
public List<Integer> getAllEsbPackageIdsWithStatus2(String tableName, Connection connection) { | |
String sqlSelect = "SELECT " + PACKAGE_ID.getColumnName() + " FROM " + tableName + | |
" WHERE " + STATUS_CODE.getColumnName() + " = " + READY_TO_PROCESS.getId() + | |
" AND " + ESB_PACKAGE_ID.getColumnName() + " IS NULL ORDER BY 1"; | |
List<Integer> ids = new ArrayList<>(); | |
try (Statement stmt = connection.createStatement(); | |
ResultSet rs = stmt.executeQuery(sqlSelect)) { | |
while (rs.next()) { | |
ids.add(rs.getInt(1)); | |
} | |
} catch (SQLException e) { | |
throw new PackageIdUpdaterExeption("Can't get ids with status_code = " + READY_TO_PROCESS.getId() + " from " + tableName, e); | |
} | |
return ids; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment