Skip to content

Instantly share code, notes, and snippets.

@nebhale
Last active April 18, 2019 18:33
Show Gist options
  • Save nebhale/e75f426a1406461410c8ba99f550cdfd to your computer and use it in GitHub Desktop.
Save nebhale/e75f426a1406461410c8ba99f550cdfd to your computer and use it in GitHub Desktop.
/*
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.r2dbc.postgresql;
import com.zaxxer.hikari.HikariDataSource;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.test.Example;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.testcontainers.containers.PostgreSQLContainer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import javax.sql.DataSource;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.POSTGRESQL_DRIVER;
import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE;
import static io.r2dbc.spi.ConnectionFactoryOptions.DRIVER;
import static io.r2dbc.spi.ConnectionFactoryOptions.HOST;
import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD;
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT;
import static io.r2dbc.spi.ConnectionFactoryOptions.USER;
final class ColumnMixUpTest {
private static final int SIZE = 5_000_000;
private final PostgreSQLContainer<?> container = new PostgreSQLContainer<>("postgres:11.1");
{
this.container.start();
}
private final ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, POSTGRESQL_DRIVER)
.option(DATABASE, this.container.getDatabaseName())
.option(HOST, this.container.getContainerIpAddress())
.option(PORT, this.container.getMappedPort(5432))
.option(PASSWORD, this.container.getPassword())
.option(USER, this.container.getUsername())
.build());
private final DataSource dataSource = DataSourceBuilder.create()
.type(HikariDataSource.class)
.url(this.container.getJdbcUrl())
.username(this.container.getUsername())
.password(this.container.getPassword())
.build();
private final JdbcOperations jdbcOperations = new JdbcTemplate(this.dataSource);
private final PlatformTransactionManager transactionManager = new DataSourceTransactionManager(this.dataSource);
private final TransactionTemplate transactionTemplate = new TransactionTemplate(this.transactionManager);
@Test
void columnMixUp() {
AtomicInteger count = new AtomicInteger(1);
Mono.from(this.connectionFactory.create())
.flatMapMany(connection -> Flux.from(connection
.createStatement("SELECT * from column_mixup")
.execute())
.flatMap(result -> result.map((row, rowMetadata) ->
String.format("%d: %s", row.get("id", Integer.class), row.get("content", String.class))))
.concatWith(Example.close(connection)))
.as(StepVerifier::create)
.expectNextSequence(IntStream.range(0, SIZE)
.mapToObj(i -> String.format("%d: value-%d", i + 1, i + 1))
.collect(Collectors.toList()))
.verifyComplete();
}
@BeforeEach
void populate() {
this.jdbcOperations.execute("CREATE TABLE column_mixup ( id SERIAL PRIMARY KEY, content VARCHAR(2000) )");
this.transactionTemplate.execute(action -> this.jdbcOperations.batchUpdate("INSERT INTO column_mixup(content) VALUES (?)", new BatchPreparedStatementSetter() {
@Override
public int getBatchSize() {
return SIZE;
}
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
ps.setString(1, String.format("value-%d", i + 1));
}
}));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment