Skip to content

Instantly share code, notes, and snippets.

@f99aq8ove
Created June 6, 2010 11:29
Show Gist options
  • Save f99aq8ove/427517 to your computer and use it in GitHub Desktop.
Save f99aq8ove/427517 to your computer and use it in GitHub Desktop.
package MyCassandra;
use Any::Moose;
use namespace::autoclean;
use Net::Cassandra;
use Data::MessagePack;
use Data::Dumper;
use Encode;
has 'keyspace' => (is => 'ro', isa => 'Str');
has 'columnFamily' => (is => 'ro', isa => 'Str');
has 'cassandraArgs' => (is => 'ro', isa => 'HashRef');
has 'cassandra' =>
(is => 'bare', reader => '_get_cassandra', writer => '_set_cassandra');
sub BUILD {
my ($self, @init_args) = @_;
my $cassandra = Net::Cassandra->new($self->cassandraArgs);
$self->_set_cassandra($cassandra->client);
}
sub timestamp {
use Time::HiRes qw/time/;
return sprintf '%d', time * 1000;
}
sub set {
my ($self, $rowKey, $colKey, $value) = @_;
eval {
$self->_get_cassandra->insert(
$self->keyspace,
$rowKey,
Net::Cassandra::Backend::ColumnPath->new(
{ column_family => $self->columnFamily, column => $colKey, }
),
Data::MessagePack->pack($value),
timestamp(),
Net::Cassandra::Backend::ConsistencyLevel::ZERO
);
};
confess Dumper($@) if $@;
}
sub set_slice {
my ($self, $rowKey, $data) = @_;
my $timestamp = timestamp();
my @cols = do {
my @list;
while (my ($k, $v) = each %$data) {
push @list,
Net::Cassandra::Backend::ColumnOrSuperColumn->new(
{ column => Net::Cassandra::Backend::Column->new(
{ name => $k,
value => Data::MessagePack->pack($v),
timestamp => $timestamp,
}
)
}
);
}
@list;
};
eval {
$self->_get_cassandra->batch_insert(
$self->keyspace, $rowKey,
{ $self->columnFamily => ¥@cols },
Net::Cassandra::Backend::ConsistencyLevel::ZERO
);
};
confess Dumper($@) if $@;
}
sub get {
my ($self, $rowKey, $colKey) = @_;
my $what;
eval {
$what = $self->_get_cassandra->get(
$self->keyspace,
$rowKey,
Net::Cassandra::Backend::ColumnPath->new(
{ column_family => $self->columnFamily, column => $colKey, }
),
Net::Cassandra::Backend::ConsistencyLevel::QUORUM
);
};
if ($@) {
if ($@ =~ /^Net::Cassandra::Backend::NotFoundException/) {
return undef;
}
else {
confess Dumper($@) if $@;
}
}
return {
value => my_decode(Data::MessagePack->unpack($what->column->value)),
timestamp => $what->column->timestamp
};
}
sub my_decode {
my $data = shift;
if (ref $data eq 'HASH') {
foreach my $k (keys %{$data}) {
$data->{$k} = Encode::decode_utf8 $data->{$k};
}
}
elsif (ref $data eq 'ARRAY') {
@{$data} = map { Encode::decode_utf8 $_ } @{$data};
}
else {
$data = Encode::decode_utf8 $data;
# warn utf8::is_utf8($data) ? 'UTF-8 flag' : 'not UTF-8 flag';
}
return $data;
}
sub my_encode {
my $data = shift;
if (ref $data eq 'HASH') {
foreach my $k (keys %{$data}) {
$data->{$k} = Encode::encode_utf8 $data->{$k};
}
}
elsif (ref $data eq 'ARRAY') {
@{$data} = map { Encode::encode_utf8 $_ } @{$data};
}
else {
$data = Encode::encode_utf8 $data;
# warn utf8::is_utf8($data) ? 'UTF-8 flag' : 'not UTF-8 flag';
}
return $data;
}
sub get_slice {
my ($self, $rowKey) = @_;
my $what;
eval {
$what = $self->_get_cassandra->get_slice(
$self->keyspace,
$rowKey,
Net::Cassandra::Backend::ColumnParent->new(
{ column_family => $self->columnFamily }
),
Net::Cassandra::Backend::SlicePredicate->new(
{ slice_range => Net::Cassandra::Backend::SliceRange->new(
# XXX: default limit 100
{ start => '', finish => '' }
)
}
),
Net::Cassandra::Backend::ConsistencyLevel::QUORUM
);
};
confess Dumper($@) if $@;
return do {
my %hash;
foreach my $data (@$what) {
# XXX: no timestamp
$hash{ $data->column->name }
= my_decode(Data::MessagePack->unpack($data->column->value));
}
¥%hash;
};
}
sub del {
my ($self, $rowKey, $colKey) = @_;
eval {
$self->_get_cassandra->remove(
$self->keyspace,
$rowKey,
Net::Cassandra::Backend::ColumnPath->new(
{ column_family => $self->columnFamily, column => $colKey }
),
timestamp()
);
};
confess Dumper($@) if $@;
}
__PACKAGE__->meta->make_immutable;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment