Skip to content

Instantly share code, notes, and snippets.

@kassane
Created February 23, 2020 17:34
Show Gist options
  • Save kassane/b43cde40818c35be0a23104e1f978a26 to your computer and use it in GitHub Desktop.
Save kassane/b43cde40818c35be0a23104e1f978a26 to your computer and use it in GitHub Desktop.
Libpq Async
#pragma once
#include <libpq-fe.h>
#include <experimental/string_view>
#include "PQQuery.h"
#include "PQError.h"
#include <tuple>
#include <boost/asio/io_service.hpp>
#include <boost/asio/local/stream_protocol.hpp>
class PQDBConn
{
public:
PQDBConn(const PQDBConn&) = delete;
void operator=(const PQDBConn&) = delete;
PQDBConn(PQDBConn &&_other):
m_ioService(_other.m_ioService)
{
m_conn = std::move(_other.m_conn);
_other.m_conn = nullptr;
}
PQDBConn(::boost::asio::io_service& _io_service):
m_ioService(_io_service)
{
}
~PQDBConn()
{
if (m_conn) PQfinish(m_conn);
}
PQError open(const std::experimental::string_view &_conninfo)
{
/* Make a connection to the database */
m_conn = PQconnectdb(_conninfo.data());
return checkError("Connection to database failed.");
}
PQError open(const std::experimental::string_view &_conninfo, ::boost::asio::yield_context _yield)
{
/* Make an async connection to the database */
m_conn = PQconnectStart(_conninfo.data());
if (status() == CONNECTION_BAD)
return checkError("Connection to database failed.");
auto socket = this->socket();
while(true)
{
auto ret = PQconnectPoll(m_conn);
switch(ret)
{
case PGRES_POLLING_READING:
socket.async_read_some(::boost::asio::null_buffers(), _yield);
break;
case PGRES_POLLING_WRITING:
socket.async_write_some(::boost::asio::null_buffers(), _yield);
break;
case PGRES_POLLING_OK:
return PQError();
case PGRES_POLLING_FAILED:
return checkError("Connection to database failed.");
default:
break;
}
}
return PQError();
}
ConnStatusType status() const { return PQstatus(m_conn); }
bool operator!() const { return status() != CONNECTION_OK; }
std::tuple<PQQuery, PQResult> execute(const std::experimental::string_view &_sql)
{
auto query = PQQuery(this);
auto result = query.exec(_sql);
if (!result)
throw PQError(std::string("Statement execution failed: ") + std::to_string(result.status()), PQerrorMessage(m_conn));
return std::make_tuple(std::move(query), std::move(result));
}
std::tuple<PQQuery, PQResult> execute(const std::experimental::string_view &_sql, ::boost::asio::yield_context _yield)
{
auto query = PQQuery(this);
auto result = query.exec(_sql, _yield);
if (!result)
throw PQError(std::string("Statement execution failed: ") + std::to_string(result.status()), PQerrorMessage(m_conn));
return std::make_tuple(std::move(query), std::move(result));
}
PQError checkError(const std::experimental::string_view &_errorMessage)
{
/* Check to see that the backend connection was successfully made */
if (status() != CONNECTION_OK)
{
return PQError(_errorMessage.to_string(), PQerrorMessage(m_conn));
}
return PQError();
}
::boost::asio::local::stream_protocol::socket socket()
{
::boost::asio::local::stream_protocol stream_protocol;
return ::boost::asio::local::stream_protocol::socket(m_ioService, stream_protocol, dup(PQsocket(m_conn)));
}
PGconn* conn() { return m_conn; }
private:
PGconn *m_conn = nullptr;
::boost::asio::io_service& m_ioService;
};
PQResult PQQuery::exec(const std::experimental::string_view &_sql)
{
return PQResult(PQexec(m_conn->conn(), _sql.data()));
}
PQResult PQQuery::exec(const std::experimental::string_view &_sql, ::boost::asio::yield_context _yield)
{
auto conn = m_conn->conn();
auto ret = PQsendQuery(conn, _sql.data());
if (ret != 1)
return PQResult();
auto socket = m_conn->socket();
do
{
socket.async_read_some(::boost::asio::null_buffers(), _yield);
ret = PQconsumeInput(conn);
if (ret != 1)
return PQResult();
}
while(PQisBusy(conn) == 1);
return PQResult(PQgetResult(m_conn->conn()));
}
#pragma once
#include <exception>
#include <string>
class PQError: std::exception
{
public:
PQError() {}
PQError(const PQError &_other):
m_isError(_other.m_isError),
m_errorMessage(_other.m_errorMessage),
m_driverMessage(_other.m_driverMessage)
{}
PQError(std::string _errorMessage,
std::string _driverError):
m_isError(true),
m_errorMessage(std::move(_errorMessage)),
m_driverMessage(std::move(_driverError))
{}
virtual const char* what() const _GLIBCXX_TXN_SAFE_DYN _GLIBCXX_USE_NOEXCEPT
{
return m_errorMessage.c_str();
}
const std::string& errorMessage() const { return m_errorMessage; }
const std::string& driverMessage() const { return m_driverMessage; }
operator bool() const
{
return !m_isError;
}
private:
bool m_isError = false;
std::string m_errorMessage;
std::string m_driverMessage;
};
#pragma once
#include "PQResult.h"
template<class T>
class PQResultVal
{
public:
PQResultVal(const T&_queryCol): m_queryCol(_queryCol) {}
auto value()
{
return m_queryCol.colIterator().rowIterator().query().value_as_char(m_queryCol.colIterator().rowIterator().row(), m_queryCol.col());
}
const T &col() const { return m_queryCol; };
private:
const T &m_queryCol;
};
template<class T>
class PQResultCol
{
public:
PQResultCol(const T&_colIterator, int _col): m_colIterator(_colIterator), m_col(_col) {}
bool operator!=(const PQResultCol<T> &_other) const { return m_col != _other.m_col; }
PQResultCol<T>& operator++() { m_col++; return *this; }
PQResultVal<PQResultCol<T>> operator*() { return PQResultVal<PQResultCol<T>>(*this); };
int col() const { return m_col; }
const T& colIterator() const { return m_colIterator; }
private:
const T &m_colIterator;
int m_col;
};
template<class T>
class PQResultColIterator
{
public:
PQResultColIterator(const T&_queryIterator, int _col): m_queryIterator(_queryIterator), m_col(_col) {}
bool operator!=(const PQResultColIterator<T> &_other) const { return m_col != _other.m_col; }
PQResultColIterator<T>& operator++() { m_col++; return *this; }
const T& rowIterator() const { return m_queryIterator; }
int col() const { return m_col; }
private:
const T &m_queryIterator;
int m_col;
};
class PQResultIterator
{
public:
PQResultIterator(const PQResult&_query, int _row): m_query(_query), m_row(_row) {}
bool operator!=(const PQResultIterator &_other) const { return m_row != _other.m_row; }
PQResultIterator& operator++() { m_row++; return *this; }
PQResultColIterator<PQResultIterator> operator*() const { return PQResultColIterator<PQResultIterator>(*this, 0); }
int numFields() const { return m_query.numFields(); }
int row() const { return m_row; }
const PQResult& query() const { return m_query; }
private:
const PQResult &m_query;
int m_row;
};
PQResultIterator begin(const PQResult &query)
{
return PQResultIterator(query, 0);
}
PQResultIterator end(const PQResult &query)
{
return PQResultIterator(query, query.numRows());
}
PQResultCol<PQResultColIterator<PQResultIterator>> begin(const PQResultColIterator<PQResultIterator> &query)
{
return PQResultCol<PQResultColIterator<PQResultIterator>>(query, 0);
}
PQResultCol<PQResultColIterator<PQResultIterator>> end(const PQResultColIterator<PQResultIterator> &query)
{
return PQResultCol<PQResultColIterator<PQResultIterator>>(query, query.rowIterator().query().numFields());
}
#pragma once
#include <libpq-fe.h>
#include <experimental/string_view>
#include <future>
#include "PQResult.h"
#include <boost/asio/spawn.hpp>
class PQDBConn;
class PQQuery
{
public:
PQQuery(const PQQuery&) = delete;
void operator=(const PQQuery&) = delete;
PQQuery(PQQuery &&_other)
{
m_conn = _other.m_conn;
}
PQQuery(PQDBConn *_conn): m_conn(_conn)
{
//auto res = PQsendQuery(_conn, _sql.data());
}
inline PQResult exec(const std::experimental::string_view &_sql);
inline PQResult exec(const std::experimental::string_view &_sql, ::boost::asio::yield_context _yield);
private:
PQDBConn *m_conn = nullptr;
};
#pragma once
#include <libpq-fe.h>
#include <vector>
#include <netinet/in.h>
class PQResult
{
public:
PQResult(const PQResult&) = delete;
void operator=(const PQResult&) = delete;
PQResult(PQResult &&_other)
{
m_res = std::move(_other.m_res);
m_numRows = _other.m_numRows;
m_numFields = _other.m_numFields;
_other.m_res = nullptr;
}
PQResult& operator=(PQResult &&_other)
{
m_res = std::move(_other.m_res);
m_numRows = _other.m_numRows;
m_numFields = _other.m_numFields;
_other.m_res = nullptr;
return *this;
}
PQResult(PGresult *_res = nullptr): m_res(_res)
{
if (m_res)
{
m_numRows = PQntuples(m_res);
m_numFields = PQnfields(m_res);
}
}
~PQResult() { if (m_res) PQclear(m_res); }
ExecStatusType status() const { return PQresultStatus(m_res); }
bool operator!() const
{
if (m_res)
{
switch(status())
{
case PGRES_EMPTY_QUERY: /* empty query string was executed */
case PGRES_COMMAND_OK: /* a query command that doesn't return
* anything was executed properly by the
* backend */
case PGRES_TUPLES_OK: /* a query command that returns tuples was
* executed properly by the backend, PGresult
* contains the result tuples */
case PGRES_COPY_OUT: /* Copy Out data transfer in progress */
case PGRES_COPY_IN: /* Copy In data transfer in progress */
case PGRES_COPY_BOTH: /* Copy In/Out data transfer in progress */
case PGRES_NONFATAL_ERROR: /* notice or warning message */
case PGRES_SINGLE_TUPLE: /* single tuple from larger resultset */
return false;
case PGRES_BAD_RESPONSE: /* an unexpected response was recv'd from the
* backend */
case PGRES_FATAL_ERROR: /* query failed */
return true;
}
}
return true;
}
int numRows() const { return m_numRows; }
int numFields() const { return m_numFields; }
PGresult* res() const { return m_res; }
const char* value_as_char(int row, int col) const { return PQgetvalue(m_res, row, col); }
int32_t value_as_int32(int row, int col) const
{
void *ptr = PQgetvalue(m_res, row, col);
return ntohl(*static_cast<int32_t*>(ptr));
}
std::vector<char> value_as_vector(int row, int col) const
{
const auto len = PQgetlength(m_res, row, col);
auto ptr = PQgetvalue(m_res, row, col);
std::vector<char> result;
result.reserve(len);
result.assign(ptr, ptr + len);
return result;
}
std::string value_as_string(int row, int col) const
{
const auto len = PQgetlength(m_res, row, col);
auto ptr = PQgetvalue(m_res, row, col);
return std::string(ptr, len);
}
double value_as_double(int row, int col) const
{
double ptr = *reinterpret_cast<double*>(PQgetvalue(m_res, row, col));
return swap_endian(ptr);
}
private:
PGresult *m_res = nullptr;
int m_numRows = 0;
int m_numFields = 0;
#if BIG_ENDIAN
template <typename T>
constexpr static T& swap_endian(T& pX)
{
return pX;
}
#else
template <typename T>
constexpr static T& swap_endian(T& pX)
{
char& raw = reinterpret_cast<char&>(pX);
std::reverse(&raw, &raw + sizeof(T));
return pX;
}
#endif
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment