-
-
Save arajkumar/638e78002b65f14b82957c21f0ce8124 to your computer and use it in GitHub Desktop.
pipeline benchmarking
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <stdbool.h> | |
#include <stdint.h> | |
#include <libpq-fe.h> | |
bool exit_pipe(PGconn *connection ) | |
{ | |
PGpipelineStatus status = PQpipelineStatus(connection); | |
if (status != PQ_PIPELINE_ON) | |
{ | |
return true; | |
} | |
int ok = PQpipelineSync(connection); | |
if (!ok) | |
{ | |
return false; | |
} | |
int results = 0; | |
while (PQconsumeInput(connection) != 0) | |
{ | |
PGresult *res = PQgetResult(connection); | |
if (res == NULL) | |
{ | |
continue; | |
} | |
results++; | |
ExecStatusType resultStatus = PQresultStatus(res); | |
fprintf(stderr, "resultStatus: %d count %d\n", resultStatus, results); | |
PQclear(res); | |
if (resultStatus == PGRES_PIPELINE_SYNC) | |
{ | |
break; | |
} | |
bool ok = | |
resultStatus == PGRES_SINGLE_TUPLE || | |
resultStatus == PGRES_TUPLES_OK || | |
resultStatus == PGRES_COPY_BOTH || | |
resultStatus == PGRES_COMMAND_OK; | |
if (!ok) | |
{ | |
return false; | |
} | |
} | |
return true; | |
} | |
/* PostgreSQL ("Grand Unified Configuration") setting */ | |
typedef struct GUC | |
{ | |
char *name; | |
char *value; | |
} GUC; | |
#define COMMON_GUC_SETTINGS \ | |
{ "client_encoding", "'UTF-8'" }, \ | |
{ "extra_float_digits", "3" }, \ | |
{ "statement_timeout", "0" }, \ | |
{ "default_transaction_read_only", "off" } | |
GUC applySettingsSync[] = { | |
COMMON_GUC_SETTINGS, | |
{ "synchronous_commit", "on" }, | |
{ "session_replication_role", "'replica'" }, | |
{ NULL, NULL }, | |
}; | |
GUC applySettings[] = { | |
COMMON_GUC_SETTINGS, | |
{ "synchronous_commit", "off" }, | |
{ "session_replication_role", "'replica'" }, | |
{ NULL, NULL }, | |
}; | |
int main() { | |
const char *conninfo = getenv("PGCOPYDB_TARGET_PGURI"); | |
PGconn *conn; | |
PGresult *res; | |
/* Connect to the database */ | |
conn = PQconnectdb(conninfo); | |
if (PQstatus(conn) != CONNECTION_OK) { | |
fprintf(stderr, "Connection to database failed: %s", PQerrorMessage(conn)); | |
PQfinish(conn); | |
exit(1); | |
} | |
int status = PQsetnonblocking(conn, 1 /* 1-non blocking, 0-blocking */); | |
if (status != 0) | |
{ | |
fprintf(stderr, "PQsetnonblocking failed: %s", PQerrorMessage(conn)); | |
return false; | |
} | |
{ | |
const char* paramValues[1] = {"pgcopydb"}; | |
if (!PQsendQueryParams(conn, "select pg_replication_origin_session_setup($1)", 1, NULL, paramValues, NULL, NULL, 0)) { | |
fprintf(stderr, "replication origin session setup: %s", PQerrorMessage(conn)); | |
} | |
res = PQgetResult(conn); | |
PQclear(res); | |
res = PQgetResult(conn); | |
PQclear(res); | |
} | |
/* Start pipeline mode */ | |
if (!PQenterPipelineMode(conn)) { | |
fprintf(stderr, "Entering pipeline mode failed: %s", PQerrorMessage(conn)); | |
PQfinish(conn); | |
exit(1); | |
} | |
int i = 0; | |
int syncBatch = 100; | |
for (i = 0; i < 50000; i++) | |
{ | |
// apply applySettingsSync | |
// | |
GUC *guc; | |
bool sync = i % syncBatch == 0; | |
if (sync) { | |
guc = applySettingsSync; | |
} else { | |
guc = applySettings; | |
} | |
for (int j = 0; guc[j].name != NULL; j++) | |
{ | |
char sql[1024] = { 0 }; | |
snprintf(sql, sizeof(sql), "SET %s TO %s", | |
guc[j].name, guc[j].value); | |
/* Begin transaction */ | |
if (!PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 0)) { | |
fprintf(stderr, "Sending BEGIN failed: %s", PQerrorMessage(conn)); | |
} | |
} | |
/* Begin transaction */ | |
if (!PQsendQueryParams(conn, "BEGIN", 0, NULL, NULL, NULL, NULL, 0)) { | |
fprintf(stderr, "Sending BEGIN failed: %s", PQerrorMessage(conn)); | |
} | |
/* Insert statement */ | |
const char* paramValues[3] = {"now()", "hello", "123"}; | |
if (!PQsendQueryParams(conn, "INSERT INTO metrics(\"time\", name, value) VALUES ($1, $2, $3)", 3, NULL, paramValues, NULL, NULL, 0)) { | |
fprintf(stderr, "Sending INSERT failed: %s", PQerrorMessage(conn)); | |
} | |
if (!PQsendQueryParams(conn, "select pg_replication_origin_xact_setup('0/0', now())", 0, NULL, NULL, NULL, NULL, 0)) { | |
fprintf(stderr, "Sending COMMIT failed: %s", PQerrorMessage(conn)); | |
} | |
/* Commit transaction */ | |
if (!PQsendQueryParams(conn, "COMMIT", 0, NULL, NULL, NULL, NULL, 0)) { | |
fprintf(stderr, "Sending COMMIT failed: %s", PQerrorMessage(conn)); | |
} | |
// PQflush(conn); | |
if (sync) { | |
/* Process pipeline results */ | |
uint32_t now = time(NULL); | |
if (!exit_pipe(conn)) { | |
fprintf(stderr, "Exiting pipeline mode failed: %s", PQerrorMessage(conn)); | |
} | |
fprintf(stderr, "Sync took %d sec\n", time(NULL) - now); | |
} | |
} | |
if (!exit_pipe(conn)) { | |
fprintf(stderr, "Exiting pipeline mode failed: %s", PQerrorMessage(conn)); | |
} | |
fprintf(stderr, "Exiting pipeline mode success\n"); | |
return 0; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
To compile,
To run