Created
June 6, 2010 11:29
-
-
Save f99aq8ove/427517 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package MyCassandra; | |
use Any::Moose; | |
use namespace::autoclean; | |
use Net::Cassandra; | |
use Data::MessagePack; | |
use Data::Dumper; | |
use Encode; | |
has 'keyspace' => (is => 'ro', isa => 'Str'); | |
has 'columnFamily' => (is => 'ro', isa => 'Str'); | |
has 'cassandraArgs' => (is => 'ro', isa => 'HashRef'); | |
has 'cassandra' => | |
(is => 'bare', reader => '_get_cassandra', writer => '_set_cassandra'); | |
sub BUILD { | |
my ($self, @init_args) = @_; | |
my $cassandra = Net::Cassandra->new($self->cassandraArgs); | |
$self->_set_cassandra($cassandra->client); | |
} | |
sub timestamp { | |
use Time::HiRes qw/time/; | |
return sprintf '%d', time * 1000; | |
} | |
sub set { | |
my ($self, $rowKey, $colKey, $value) = @_; | |
eval { | |
$self->_get_cassandra->insert( | |
$self->keyspace, | |
$rowKey, | |
Net::Cassandra::Backend::ColumnPath->new( | |
{ column_family => $self->columnFamily, column => $colKey, } | |
), | |
Data::MessagePack->pack($value), | |
timestamp(), | |
Net::Cassandra::Backend::ConsistencyLevel::ZERO | |
); | |
}; | |
confess Dumper($@) if $@; | |
} | |
sub set_slice { | |
my ($self, $rowKey, $data) = @_; | |
my $timestamp = timestamp(); | |
my @cols = do { | |
my @list; | |
while (my ($k, $v) = each %$data) { | |
push @list, | |
Net::Cassandra::Backend::ColumnOrSuperColumn->new( | |
{ column => Net::Cassandra::Backend::Column->new( | |
{ name => $k, | |
value => Data::MessagePack->pack($v), | |
timestamp => $timestamp, | |
} | |
) | |
} | |
); | |
} | |
@list; | |
}; | |
eval { | |
$self->_get_cassandra->batch_insert( | |
$self->keyspace, $rowKey, | |
{ $self->columnFamily => ¥@cols }, | |
Net::Cassandra::Backend::ConsistencyLevel::ZERO | |
); | |
}; | |
confess Dumper($@) if $@; | |
} | |
sub get { | |
my ($self, $rowKey, $colKey) = @_; | |
my $what; | |
eval { | |
$what = $self->_get_cassandra->get( | |
$self->keyspace, | |
$rowKey, | |
Net::Cassandra::Backend::ColumnPath->new( | |
{ column_family => $self->columnFamily, column => $colKey, } | |
), | |
Net::Cassandra::Backend::ConsistencyLevel::QUORUM | |
); | |
}; | |
if ($@) { | |
if ($@ =~ /^Net::Cassandra::Backend::NotFoundException/) { | |
return undef; | |
} | |
else { | |
confess Dumper($@) if $@; | |
} | |
} | |
return { | |
value => my_decode(Data::MessagePack->unpack($what->column->value)), | |
timestamp => $what->column->timestamp | |
}; | |
} | |
sub my_decode { | |
my $data = shift; | |
if (ref $data eq 'HASH') { | |
foreach my $k (keys %{$data}) { | |
$data->{$k} = Encode::decode_utf8 $data->{$k}; | |
} | |
} | |
elsif (ref $data eq 'ARRAY') { | |
@{$data} = map { Encode::decode_utf8 $_ } @{$data}; | |
} | |
else { | |
$data = Encode::decode_utf8 $data; | |
# warn utf8::is_utf8($data) ? 'UTF-8 flag' : 'not UTF-8 flag'; | |
} | |
return $data; | |
} | |
sub my_encode { | |
my $data = shift; | |
if (ref $data eq 'HASH') { | |
foreach my $k (keys %{$data}) { | |
$data->{$k} = Encode::encode_utf8 $data->{$k}; | |
} | |
} | |
elsif (ref $data eq 'ARRAY') { | |
@{$data} = map { Encode::encode_utf8 $_ } @{$data}; | |
} | |
else { | |
$data = Encode::encode_utf8 $data; | |
# warn utf8::is_utf8($data) ? 'UTF-8 flag' : 'not UTF-8 flag'; | |
} | |
return $data; | |
} | |
sub get_slice { | |
my ($self, $rowKey) = @_; | |
my $what; | |
eval { | |
$what = $self->_get_cassandra->get_slice( | |
$self->keyspace, | |
$rowKey, | |
Net::Cassandra::Backend::ColumnParent->new( | |
{ column_family => $self->columnFamily } | |
), | |
Net::Cassandra::Backend::SlicePredicate->new( | |
{ slice_range => Net::Cassandra::Backend::SliceRange->new( | |
# XXX: default limit 100 | |
{ start => '', finish => '' } | |
) | |
} | |
), | |
Net::Cassandra::Backend::ConsistencyLevel::QUORUM | |
); | |
}; | |
confess Dumper($@) if $@; | |
return do { | |
my %hash; | |
foreach my $data (@$what) { | |
# XXX: no timestamp | |
$hash{ $data->column->name } | |
= my_decode(Data::MessagePack->unpack($data->column->value)); | |
} | |
¥%hash; | |
}; | |
} | |
sub del { | |
my ($self, $rowKey, $colKey) = @_; | |
eval { | |
$self->_get_cassandra->remove( | |
$self->keyspace, | |
$rowKey, | |
Net::Cassandra::Backend::ColumnPath->new( | |
{ column_family => $self->columnFamily, column => $colKey } | |
), | |
timestamp() | |
); | |
}; | |
confess Dumper($@) if $@; | |
} | |
__PACKAGE__->meta->make_immutable; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment