-
-
Save sundy-li/71d21ccd2f0dd7ac4685231efa1e2296 to your computer and use it in GitHub Desktop.
test-jdbc
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* | |
*/ | |
public class SparkJdbcMain { | |
public static void main(String[] args) throws SQLException, ClassNotFoundException { | |
SparkSession spark = SparkSession | |
.builder() | |
.appName("spark-jdbc-test") | |
.enableHiveSupport() | |
.getOrCreate(); | |
// mocked 170 rows data | |
Dataset<Row> rdd = spark.sql("SELECT count(1) from test_table where day = '2020-10-10'"); | |
rdd.foreachPartition((ForeachPartitionFunction<Row>) iterator -> { | |
int columnNum = 34; | |
String params = Strings.repeat("?, ", columnNum); | |
Enumeration<Driver> drivers = DriverManager.getDrivers(); | |
while (drivers.hasMoreElements()) { | |
DriverManager.deregisterDriver(drivers.nextElement()); | |
} | |
DriverManager.registerDriver(new ClickHouseDriver()); | |
Connection connection = DriverManager.getConnection("jdbc:clickhouse://127.0.0.1:9000"); | |
Statement statement = connection.createStatement(); | |
statement.execute("DROP TABLE IF EXISTS xm_user"); | |
statement.execute("CREATE TABLE xm_user (\n" | |
+ " create_time DateTime('Asia/Shanghai'),\n" | |
+ " update_time DateTime('Asia/Shanghai') COMMENT 'keep the larger record when merge',\n" | |
+ " org_id String COMMENT 'pk_1',\n" | |
+ " user_id String COMMENT 'pk_2',\n" | |
+ " cf001 Nullable(Float64),\n" | |
+ " cf002 Nullable(Float64),\n" | |
+ " cf003 Nullable(Float64),\n" | |
+ " cf004 Nullable(Float64),\n" | |
+ " cf005 Nullable(Float64),\n" | |
+ " cs001 Nullable(String),\n" | |
+ " cs002 Nullable(String),\n" | |
+ " cs003 Nullable(String),\n" | |
+ " cs004 Nullable(String),\n" | |
+ " cs005 Nullable(String),\n" | |
+ " cs006 Nullable(String),\n" | |
+ " cs007 Nullable(String),\n" | |
+ " cs008 Nullable(String),\n" | |
+ " cs009 Nullable(String),\n" | |
+ " cs010 Nullable(String),\n" | |
+ " cs011 Nullable(String),\n" | |
+ " cs012 Nullable(String),\n" | |
+ " cs013 Nullable(String),\n" | |
+ " cs014 Nullable(String),\n" | |
+ " cs015 Nullable(String),\n" | |
+ " ct001 Nullable(DateTime('Asia/Shanghai')),\n" | |
+ " ct002 Nullable(DateTime('Asia/Shanghai')),\n" | |
+ " ct003 Nullable(DateTime('Asia/Shanghai')),\n" | |
+ " ct004 Nullable(DateTime('Asia/Shanghai')),\n" | |
+ " ct005 Nullable(DateTime('Asia/Shanghai')),\n" | |
+ " cl001 Nullable(Int64),\n" | |
+ " cl002 Nullable(Int64),\n" | |
+ " cl003 Nullable(Int64),\n" | |
+ " cl004 Nullable(Int64),\n" | |
+ " cl005 Nullable(Int64)\n" | |
+ ") ENGINE = ReplacingMergeTree(\"update_time\")\n" | |
+ "ORDER BY (org_id, user_id);"); | |
PreparedStatement | |
pstmt = connection.prepareStatement("INSERT INTO xm_user(create_time, update_time, org_id, user_id, cf001,cf002,cf003,cf004,cf005," | |
+ "cs001,cs002,cs003,cs004,cs005,cs006,cs007,cs008,cs009,cs010,cs011,cs012,cs013,cs014,cs015," | |
+ "ct001,ct002,ct003,ct004,ct005,cl001,cl002,cl003,cl004,cl005) VALUES(" | |
+ params.substring(0, params.length() - 2) + ")"); | |
while(iterator.hasNext()) { | |
iterator.next(); | |
pstmt.setTimestamp(1, new Timestamp(System.currentTimeMillis())); | |
pstmt.setTimestamp(2, new Timestamp(System.currentTimeMillis())); | |
pstmt.setString(3, "4444"); | |
pstmt.setString(4, "5555"); | |
pstmt.setLong(34, 333); | |
pstmt.addBatch(); | |
} | |
pstmt.executeBatch(); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment