Skip to content

Instantly share code, notes, and snippets.

@sandvige
Last active August 29, 2015 14:11
Show Gist options
  • Save sandvige/db7456d340e6b46fd7aa to your computer and use it in GitHub Desktop.
Save sandvige/db7456d340e6b46fd7aa to your computer and use it in GitHub Desktop.
Doctrine PDOCassandra Driver
<?php
namespace RG\CassandraBundle\Driver\PDOCassandra;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Platforms\AbstractPlatform;
class CassandraPlatform extends AbstractPlatform
{
public function getBooleanTypeDeclarationSQL(array $columnDef)
{
throw DBALException::notSupported(__METHOD__);
}
public function getIntegerTypeDeclarationSQL(array $columnDef)
{
throw DBALException::notSupported(__METHOD__);
}
public function getBigIntTypeDeclarationSQL(array $columnDef)
{
throw DBALException::notSupported(__METHOD__);
}
public function getSmallIntTypeDeclarationSQL(array $columnDef)
{
throw DBALException::notSupported(__METHOD__);
}
protected function _getCommonIntegerTypeDeclarationSQL(array $columnDef)
{
throw DBALException::notSupported(__METHOD__);
}
protected function initializeDoctrineTypeMappings()
{
throw DBALException::notSupported(__METHOD__);
}
public function getClobTypeDeclarationSQL(array $field)
{
throw DBALException::notSupported(__METHOD__);
}
public function getBlobTypeDeclarationSQL(array $field)
{
throw DBALException::notSupported(__METHOD__);
}
public function getName()
{
return "cassandra";
}
public function supportsTransactions()
{
return false;
}
public function convertBooleans($item)
{
if (is_array($item)) {
foreach ($item as $k => $value) {
if (is_bool($value)) {
$item[$k] = (string)$value;
}
}
} else if (is_bool($item)) {
$item = (string)$item;
}
return $item;
}
}
<?php
namespace RG\CassandraBundle\Driver\PDOCassandra;
use Doctrine\DBAL\Driver\PDOConnection;
use Doctrine\DBAL\Driver\Connection as BaseConnection;
class Connection extends PDOConnection implements BaseConnection
{
public function __construct($dsn, $user = null, $password = null, array $options = null)
{
parent::__construct($dsn, $user, $password, $options);
}
public function prepare($prepareString, $driverOptions = array())
{
$sqlConverter = new SqlToCqlConverter;
return parent::prepare($sqlConverter->removeTableAliases($prepareString), $driverOptions);
}
public function query()
{
$args = func_get_args();
$sqlConverter = new SqlToCqlConverter;
$args[0] = $sqlConverter->removeTableAliases($args[0]);
return call_user_func_array(array('parent', 'query'), $args);
}
public function exec($query)
{
$sqlConverter = new SqlToCqlConverter;
return parent::exec($sqlConverter->removeTableAliases($query));
}
public function beginTransaction()
{
return true;
}
public function commit()
{
return true;
}
public function rollback()
{
return true;
}
}
<?php
namespace RG\CassandraBundle\Driver\PDOCassandra;
use Doctrine\DBAL\Driver as BaseDriver;
use Doctrine\DBAL\Connection as BaseConnection;
use Doctrine\DBAL\Types\Type;
class Driver implements BaseDriver
{
public static function ensureTypeLoaded($type, $class)
{
if (Type::hasType($type)) {
Type::overrideType($type, $class);
} else {
Type::addType($type, $class);
}
}
public function __construct()
{
static::ensureTypeLoaded('map', 'RG\CassandraBundle\Driver\PDOCassandra\Type\MapType');
static::ensureTypeLoaded('timeuuid', 'RG\CassandraBundle\Driver\PDOCassandra\Type\TimeUUIDType');
static::ensureTypeLoaded('timestamp', 'RG\CassandraBundle\Driver\PDOCassandra\Type\TimestampType');
}
public function connect(array $params, $username = null, $password = null, array $driverOptions = array())
{
return new Connection(
$this->_constructPdoDsn($params),
$username,
$password,
$driverOptions
);
}
private static function generatePermutations($size)
{
$permutations = range(0, $size - 1);
shuffle($permutations);
return $permutations;
}
private static function applyPermutation($array, $permutations)
{
if (is_null($permutations))
return $array;
$permuted = array();
foreach ($permutations as $key => $value)
$permuted[$key] = $array[$value];
return $permuted;
}
private function _constructPdoDsn(array $params)
{
$dsn = 'cassandra:';
if (!isset($params['host']))
throw new \Exception("No hosts provided");
if (!isset($params['port']))
throw new \Exception("No ports provided");
$hosts = explode(';', $params['host']);
$ports = explode(';', $params['port']);
if (count($hosts) != count($ports))
throw new \Exception("Port count is not corresponding with the host count");
$permutations = null;
if (!isset($params['disable_shuffle']) || !$params['disable_shuffle'])
if (count($hosts) > 1)
$permutations = self::generatePermutations(count($hosts));
foreach (self::applyPermutation($hosts, $permutations) as $host)
$dsn .= 'host=' . $host . ';';
foreach (self::applyPermutation($ports, $permutations) as $port)
$dsn .= 'port=' . $port . ';';
if (isset($params['dbname']))
$dsn .= 'dbname=' . $params['dbname'] . ';';
return $dsn;
}
public function getDatabasePlatform()
{
return new CassandraPlatform();
}
public function getSchemaManager(BaseConnection $conn)
{
throw new \Exception("SchemaManager is not supported for Cassandra");
}
public function getName()
{
return 'pdo_cassandra';
}
public function getDatabase(BaseConnection $conn)
{
$params = $conn->getParams();
return $params['dbname'];
}
}
<?php
namespace RG\CassandraBundle\Driver\PDOCassandra;
use Doctrine\DBAL\Driver\PDOSqlite\Driver as SqliteDriver;
class DriverMock extends SqliteDriver
{
public function __construct()
{
Driver::ensureTypeLoaded('map', 'RG\CassandraBundle\Driver\PDOCassandra\Type\MapMockType');
Driver::ensureTypeLoaded('timeuuid', 'RG\CassandraBundle\Driver\PDOCassandra\Type\TimeUUIDMockType');
Driver::ensureTypeLoaded('timestamp', 'RG\CassandraBundle\Driver\PDOCassandra\Type\TimestampMockType');
}
public function getName()
{
return 'pdo_mock_cassandra';
}
}
<?php
namespace RG\CassandraBundle\Driver\PDOCassandra\Type;
use Doctrine\DBAL\Types\JsonArrayType;
class MapMockType extends JsonArrayType
{
public function getName()
{
return 'map';
}
}
<?php
namespace RG\CassandraBundle\Driver\PDOCassandra\Type;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Platforms\AbstractPlatform;
use RG\CassandraBundle\Driver\PDOCassandra\SqlToCqlConverter;
class MapType extends Type
{
public function getSqlDeclaration(array $fieldDeclaration, AbstractPlatform $platform)
{
throw new \Exception("Not implemented");
}
public function convertToPHPValue($map, AbstractPlatform $platform)
{
// PDO is converting correctly maps
return $map;
}
public function convertToDatabaseValue($map, AbstractPlatform $platform)
{
return SqlToCqlConverter::toCqlMap($map);
}
public function getName()
{
return 'map';
}
public function getBindingType()
{
return \PDO::CASSANDRA_COLLECTION;
}
}
<?php
namespace RG\CassandraBundle\Driver\PDOCassandra;
class SqlToCqlConverter extends \SqlFormatter
{
public function removeTableAliases($query)
{
$tokens = $this->tokenize($query);
if (!$tokens)
throw new \Exception("Invalid SQL format");
switch($tokens[0][1]) {
case 'SELECT':
$fromAlias = '';
$nextTokenIsTableName = false;
$nextTokenIsTableAlias = false;
foreach ($tokens as $token) {
if ($token[0] == \SqlFormatter::TOKEN_TYPE_WHITESPACE)
continue;
if ($token[1] == 'FROM') {
$nextTokenIsTableName = true;
continue;
}
if ($nextTokenIsTableName) {
$nextTokenIsTableAlias = true;
$nextTokenIsTableName = false;
continue;
}
if ($nextTokenIsTableAlias) {
$fromAlias = $token[1];
break;
}
}
if ($fromAlias) {
$removeNext = false;
$filteredTokens = array_filter($tokens, function($token) use($fromAlias, & $removeNext) {
if ($removeNext) {
$removeNext = false;
return false;
}
if ($token[1] == $fromAlias) {
$removeNext = true;
return false;
}
return true;
});
$query = array_reduce($filteredTokens, function($query, $token) {
return $query .= $token[1];
}, '');
}
break;
case 'UPDATE':
case 'INSERT':
case 'DELETE FROM':
break;
default:
throw new \Exception("Unhandled query type: " . $tokens[0][1]);
}
return $query;
}
public static function toCqlMap($map)
{
$cqlChunk = '';
$first = true;
foreach ($map as $key => $value) {
if (!$first)
$cqlChunk .= ',';
$cqlChunk .= "'" . trim($key) . "':" . $value;
$first = false;
}
return '{' . $cqlChunk . '}';
}
}
<?php
namespace RG\CassandraBundle\Driver\PDOCassandra\Type;
use Doctrine\DBAL\Types\DateTimeType;
class TimestampMockType extends DateTimeType
{
}
<?php
namespace RG\CassandraBundle\Driver\PDOCassandra\Type;
use Doctrine\DBAL\Types\DateTimeType;
use Doctrine\DBAL\Platforms\AbstractPlatform;
use DateTime;
class TimestampType extends DateTimeType
{
public function getSqlDeclaration(array $fieldDeclaration, AbstractPlatform $platform)
{
throw new \Exception("Not implemented");
}
public function convertToPHPValue($raw, AbstractPlatform $platform)
{
if (is_null($raw) || $raw === 'null')
return null;
if (!is_int($raw)) {
$date = unpack('H*', $raw);
$raw = hexdec(array_shift($date));
}
$dt = new DateTime("now");
$dt->setTimestamp($raw / 1000);
return $dt;
}
public function convertToDatabaseValue($dt, AbstractPlatform $platform)
{
return $dt->format(DateTime::ISO8601);
}
public function getBindingType()
{
return \PDO::CASSANDRA_INT;
}
}
<?php
namespace RG\CassandraBundle\Utils;
use DateTime;
use Exception;
class TimeUUID extends DateTime
{
// Interval between Julian and Gregorian calendar in 100nanoseconds
const Interval = "122192928000000000";
const Time = 1;
const DCE = 2;
/**
* Returns true if this UUID is parsable
*
* @param string $uuid
* @return boolean
*/
public static function isUUID($uuid, $extendCheck = true)
{
if (!preg_match('/([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})/', $uuid))
return false;
if ($extendCheck) {
list(, , $data3, $data4) = TimeUUID::extractFields($uuid);
if (!TimeUUID::checkVariantDCE($data4))
return false;
if (!TimeUUID::checkVersionTime($data3))
return false;
}
return true;
}
/**
* Ensure the value is not a signed extracted value
*
* @param char $value
* @return char
*/
private static function ensureNotSigned8bits($value)
{
return (256 + $value) % 256;
}
/**
* Create data according to the two given raw values
*
* @param integer $raw1
* @param integer $raw2
* @return char array[8]
*/
private static function createData($raw1, $raw2)
{
$b1 = ($raw1 & 0xFF000000) >> 24;
$b2 = ($raw1 & 0x00FF0000) >> 16;
$b3 = ($raw1 & 0x0000FF00) >> 8;
$b4 = ($raw1 & 0x000000FF);
$b5 = ($raw2 & 0xFF000000) >> 24;
$b6 = ($raw2 & 0x00FF0000) >> 16;
$b7 = ($raw2 & 0x0000FF00) >> 8;
$b8 = ($raw2 & 0x000000FF);
return array(
TimeUUID::ensureNotSigned8bits($b1), // 8bits
$b2, // 8bits
$b3, // 8bits
$b4, // 8bits
TimeUUID::ensureNotSigned8bits($b5), // 8bits
$b6, // 8bits
$b7, // 8bits
$b8, // 8bits
);
}
/**
* Extract fields from the given UUID
*
* @param string $uuid
* @return array
*/
private static function extractFields($uuid)
{
// remove dashes
$uuid = preg_replace("/[^a-f0-9]/is", "", $uuid);
$data1 = hexdec(substr($uuid, 0, 8));
$data2 = hexdec(substr($uuid, 8, 4));
$data3 = hexdec(substr($uuid, 12, 4));
$uuid332 = hexdec(substr($uuid, 16, 8));
$uuid432 = hexdec(substr($uuid, 24, 8));
return array(
$data1, // 32 bits
$data2, // 16 bits
$data3, // 16 bits
TimeUUID::createData($uuid332, $uuid432)
);
}
/**
* Build the UUID using the raw values
*
* @param integer $data1
* @param integer $data2
* @param integer $data3
* @param char array[8] $data4
* @return string
*/
private static function composeFields($data1, $data2, $data3, $data4)
{
$data1Str = str_pad(dechex($data1), 8, '0', STR_PAD_LEFT);
$data2Str = str_pad(dechex($data2), 4, '0', STR_PAD_LEFT);
$data3Str = str_pad(dechex($data3), 4, '0', STR_PAD_LEFT);
$data4Str0 = str_pad(dechex($data4[0]), 2, '0', STR_PAD_LEFT);
$data4Str1 = str_pad(dechex($data4[1]), 2, '0', STR_PAD_LEFT);
$result = "$data1Str-$data2Str-$data3Str-$data4Str0$data4Str1-";
foreach (range(2, 7) as $index)
$result .= str_pad(dechex($data4[$index]), 2, '0', STR_PAD_LEFT);
return $result;
}
/**
* Check if the variant of this UUID 4rd node is a DCE
*
* @param integer $data4
* @return boolean
*/
private static function checkVariantDCE($data4)
{
return ($data4[0] & 0xC0) == (TimeUUID::DCE << 6);
}
/**
* Check if the version of this UUID 3rd node is a Time
*
* @param integer $data3
* @return boolean
*/
private static function checkVersionTime($data3)
{
return ($data3 >> 12) == (TimeUUID::Time);
}
/**
* Parse the given uuid, check if it's a TimeUUID (RFC4144), and returns parsed data
*
* @param type $uuid
* @return array(
* Unix Timestamp,
* Data,
* 100nanoseconds
* )
*/
private static function parse($uuid)
{
list($data1, $data2, $data3, $data4) = TimeUUID::extractFields($uuid);
if (!TimeUUID::checkVariantDCE($data4))
throw new Exception("Not a valid TimeUUID: DCE not found (Variant)");
if (!TimeUUID::checkVersionTime($data3))
throw new Exception("Not a valid TimeUUID: Time not found (Version)");
// $time = ($data1 | ($data2 << 32) | (($data3 & 0xFFF) << 48)) - TimeUUID::Interval;
$time =
bcsub(
bcadd(
bcadd(
bcmul(
$data2,
"4294967296", // 0xFFFFFFFF
0
),
$data1
),
bcmul(
$data3 & 0xFFF,
"281474976710656", // 0xFFFFFFFFFFFF
0
)
),
TimeUUID::Interval
);
return array(
// intval($time / 10000000),
intval(bcdiv($time, 10000000, 0)),
$data4,
// intval($time % 10000000),
intval(bcmod($time, 10000000))
);
}
private $data = array(
0,
0,
0,
0,
0,
0,
0,
0
);
private $ticks = 0;
/**
* Returns $this data
*
* @return char array[8]
*/
public function getData()
{
return $this->data;
}
/**
* Specify the data this TimeUUID will holds. Check TimeUUIDMax for an example
*
* @param char array[8] $data
*/
public function setData($data)
{
$this->data = $data;
}
/**
* Returns $this 100nanoseconds
*
* @return integer
*/
public function getTicks()
{
return $this->ticks;
}
/**
* Since DateTime doesn't support 100nanoseconds, you can give here the
* missing accuracy of DateTime to build a complete TimeUUID
*
* @param integer $ticks
*/
public function setTicks($ticks)
{
$this->ticks = $ticks;
}
/**
* Attempt to construct a DateTime based on the given TimeUUID, fallback on
* default DateTime constructor if the given $time is not a TimeUUID
*
* @param type $time
* @param DateTimeZone $timezone (ignored when passing a TimeUUID in $time)
*/
public function __construct($time, $timezone = null)
{
if (TimeUUID::isUUID($time, false)) {
// TimeUUID are always UTC, and timezone parameter is ignored
// when ctoring with @ syntax
list($time, $this->data, $this->ticks) = TimeUUID::parse($time);
parent::__construct('@' . $time);
} else {
parent::__construct($time, $timezone);
// Build random stuff when it's a generated one
if (!$time || $time == "now") {
$crc = crc32(gethostname()) & 0x7FFFFFFF;
$fixedValue =
(($crc & 0xFF000000) >> 24) ^
(($crc & 0xFF0000) >> 16) ^
(($crc & 0xFF00) >> 8) ^
(($crc & 0xFF))
;
$this->setData(array(
rand(0, 255),
rand(0, 255),
rand(0, 255),
rand(0, 255),
rand(0, 255),
rand(0, 255),
rand(0, 255),
$fixedValue
));
$this->setTicks(rand(0, 99999999));
}
}
}
/**
* Return $this TimeUUID
*
* @return string
*/
public function __toString()
{
// ((Timesteamp() * 10000000) + Interval) + Ticks
$time =
bcadd(
bcadd(
bcmul(
$this->getTimestamp(),
10000000
),
TimeUUID::Interval
),
$this->getTicks()
);
// $time & 32 (0xFFFFFFFF => 2^32)
$data1 = bcmod($time, "4294967296");
// $time >> 32
$timeHigh = bcdiv($time, "4294967296", 0);
$data2 = $timeHigh & 0xFFFF;
$data3 = (($timeHigh >> 16) & 0xFFF) | TimeUUID::Time << 12;
$data = $this->data;
$data[0] = ($data[0] & 0x3F) | TimeUUID::DCE << 6;
return TimeUUID::composeFields($data1, $data2, $data3, $data);
}
/**
* Create a TimeUUID from a DateTime
*
* Note: This class can be called this way: TimeUUIDMax::fromDateTime()
*
* @param DateTime $dt
* @return TimeUUID (depends on which class has been called)
*/
public static function fromDateTime(DateTime $dt)
{
return new static($dt->format(DateTime::ISO8601));
}
}
<?php
namespace RG\CassandraBundle\Utils;
class TimeUUIDMax extends TimeUUID
{
public function __construct($time, $timezone = null)
{
parent::__construct($time, $timezone);
$this->setData(array(
0xFF,
0xFF,
0xFF,
0xFF,
0xFF,
0xFF,
0xFF,
0xFF
));
}
}
<?php
namespace RG\CassandraBundle\Utils;
class TimeUUIDMin extends TimeUUID
{
public function __construct($time, $timezone = null)
{
parent::__construct($time, $timezone);
$this->setData(array(
0,
0,
0,
0,
0,
0,
0,
0
));
}
}
<?php
namespace RG\CassandraBundle\Driver\PDOCassandra\Type;
use Doctrine\DBAL\Platforms\AbstractPlatform;
use Doctrine\DBAL\Types\StringType;
use RG\CassandraBundle\Utils\TimeUUID;
class TimeUUIDMockType extends StringType
{
public function getName()
{
return 'timeuuid';
}
public function convertToPHPValue($timeUUID, AbstractPlatform $platform)
{
return new TimeUUID($timeUUID);
}
public function convertToDatabaseValue($dt, AbstractPlatform $platform)
{
if (! $dt instanceof TimeUUID)
throw new \Exception("Not implemented (Cassandra is awaiting a TimeUUID)");
// Not using valid timestamp here may lead the value to not being correctely ordered
return (string)$dt;
}
}
<?php
namespace RG\CassandraBundle\Driver\PDOCassandra\Type;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Platforms\AbstractPlatform;
use RG\CassandraBundle\Utils\TimeUUID;
class TimeUUIDType extends Type
{
public function getSqlDeclaration(array $fieldDeclaration, AbstractPlatform $platform)
{
throw new \Exception("Not implemented");
}
public function convertToPHPValue($timeUUID, AbstractPlatform $platform)
{
return new TimeUUID($timeUUID);
}
public function convertToDatabaseValue($dt, AbstractPlatform $platform)
{
if (! $dt instanceof TimeUUID)
throw new \Exception("Not implemented (Cassandra is awaiting a TimeUUID)");
return (string)$dt;
}
public function getName()
{
return 'timeuuid';
}
public function getBindingType()
{
return \PDO::CASSANDRA_UUID;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment