Skip to content

Instantly share code, notes, and snippets.

@matteobertozzi
Created March 18, 2016 02:46
Show Gist options
  • Select an option

  • Save matteobertozzi/c04b5ec15da6ddba56be to your computer and use it in GitHub Desktop.

Select an option

Save matteobertozzi/c04b5ec15da6ddba56be to your computer and use it in GitHub Desktop.
FreeFormReplicationEndpoint
$ cat cell.proto
/**
* The type of the key in a Cell
*/
enum CellType {
MINIMUM = 0;
PUT = 4;
DELETE = 8;
DELETE_COLUMN = 12;
DELETE_FAMILY = 14;
// MAXIMUM is used when searching; you look from maximum on down.
MAXIMUM = 255;
}
/**
* Protocol buffer version of Cell.
*/
message Cell {
optional bytes row = 1;
optional bytes family = 2;
optional bytes qualifier = 3;
optional uint64 timestamp = 4;
optional CellType cell_type = 5;
optional bytes value = 6;
optional bytes tags = 7;
}
message TableName {
required bytes namespace = 1;
required bytes qualifier = 2;
}
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: cell.proto
from google.protobuf.internal import enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
DESCRIPTOR = _descriptor.FileDescriptor(
name='cell.proto',
package='',
serialized_pb='\n\ncell.proto\"\x84\x01\n\x04\x43\x65ll\x12\x0b\n\x03row\x18\x01 \x01(\x0c\x12\x0e\n\x06\x66\x61mily\x18\x02 \x01(\x0c\x12\x11\n\tqualifier\x18\x03 \x01(\x0c\x12\x11\n\ttimestamp\x18\x04 \x01(\x04\x12\x1c\n\tcell_type\x18\x05 \x01(\x0e\x32\t.CellType\x12\r\n\x05value\x18\x06 \x01(\x0c\x12\x0c\n\x04tags\x18\x07 \x01(\x0c\"1\n\tTableName\x12\x11\n\tnamespace\x18\x01 \x02(\x0c\x12\x11\n\tqualifier\x18\x02 \x02(\x0c*`\n\x08\x43\x65llType\x12\x0b\n\x07MINIMUM\x10\x00\x12\x07\n\x03PUT\x10\x04\x12\n\n\x06\x44\x45LETE\x10\x08\x12\x11\n\rDELETE_COLUMN\x10\x0c\x12\x11\n\rDELETE_FAMILY\x10\x0e\x12\x0c\n\x07MAXIMUM\x10\xff\x01')
_CELLTYPE = _descriptor.EnumDescriptor(
name='CellType',
full_name='CellType',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='MINIMUM', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='PUT', index=1, number=4,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='DELETE', index=2, number=8,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='DELETE_COLUMN', index=3, number=12,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='DELETE_FAMILY', index=4, number=14,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='MAXIMUM', index=5, number=255,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=200,
serialized_end=296,
)
CellType = enum_type_wrapper.EnumTypeWrapper(_CELLTYPE)
MINIMUM = 0
PUT = 4
DELETE = 8
DELETE_COLUMN = 12
DELETE_FAMILY = 14
MAXIMUM = 255
_CELL = _descriptor.Descriptor(
name='Cell',
full_name='Cell',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='row', full_name='Cell.row', index=0,
number=1, type=12, cpp_type=9, label=1,
has_default_value=False, default_value="",
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='family', full_name='Cell.family', index=1,
number=2, type=12, cpp_type=9, label=1,
has_default_value=False, default_value="",
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='qualifier', full_name='Cell.qualifier', index=2,
number=3, type=12, cpp_type=9, label=1,
has_default_value=False, default_value="",
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='timestamp', full_name='Cell.timestamp', index=3,
number=4, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='cell_type', full_name='Cell.cell_type', index=4,
number=5, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='value', full_name='Cell.value', index=5,
number=6, type=12, cpp_type=9, label=1,
has_default_value=False, default_value="",
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='tags', full_name='Cell.tags', index=6,
number=7, type=12, cpp_type=9, label=1,
has_default_value=False, default_value="",
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
serialized_start=15,
serialized_end=147,
)
_TABLENAME = _descriptor.Descriptor(
name='TableName',
full_name='TableName',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='namespace', full_name='TableName.namespace', index=0,
number=1, type=12, cpp_type=9, label=2,
has_default_value=False, default_value="",
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='qualifier', full_name='TableName.qualifier', index=1,
number=2, type=12, cpp_type=9, label=2,
has_default_value=False, default_value="",
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
serialized_start=149,
serialized_end=198,
)
_CELL.fields_by_name['cell_type'].enum_type = _CELLTYPE
DESCRIPTOR.message_types_by_name['Cell'] = _CELL
DESCRIPTOR.message_types_by_name['TableName'] = _TABLENAME
class Cell(_message.Message):
__metaclass__ = _reflection.GeneratedProtocolMessageType
DESCRIPTOR = _CELL
# @@protoc_insertion_point(class_scope:Cell)
class TableName(_message.Message):
__metaclass__ = _reflection.GeneratedProtocolMessageType
DESCRIPTOR = _TABLENAME
# @@protoc_insertion_point(class_scope:TableName)
# @@protoc_insertion_point(module_scope)
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.wal.WAL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
/*
* add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.regionserver.FreeReplicationEndpoint', CONFIG => {'master_addresses' => 'localhost:18080'}
* create 'testtb', {NAME => 'f', REPLICATION_SCOPE => 1}
* put 'testtb', 'row0', 'f:q', 'value0'
*/
public class FreeReplicationEndpoint extends BaseReplicationEndpoint
implements Abortable {
private static final Log LOG = LogFactory.getLog(FreeReplicationEndpoint.class);
// Always using the same UUID for now.
private static final UUID STATIC_UUID = UUID.fromString("9ccee075-1c3c-47c0-835b-6dc0f657b875");
private String hostName;
private int port;
private Socket socket;
private InputStream inputStream;
private OutputStream outputStream;
@Override
public void init(Context context) throws IOException {
super.init(context);
}
@Override
public void abort(String why, Throwable e) {
LOG.fatal("The FreeReplicationEndpoint corresponding to peer " + ctx.getPeerId()
+ " was aborted for the following reason(s):" + why, e);
}
@Override
public boolean isAborted() {
// Currently this is never "Aborted", we just log when the abort method is called.
return false;
}
@Override
protected void doStart() {
String masterAddresses = ctx.getPeerConfig().getConfiguration().get("master_addresses");
String[] addr = masterAddresses.split(":");
hostName = addr[0];
port = Integer.parseInt(addr[1]);
notifyStarted();
}
@Override
protected void doStop() {
disconnect();
notifyStopped();
}
private void connect(String hostName, int port) {
try {
socket = new Socket(hostName, port);
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
} catch (IOException e) {
LOG.warn("Unable to connect", e);
}
LOG.info("connected to " + hostName + ":" + port);
}
private void disconnect() {
IOUtils.closeStream(inputStream);
IOUtils.closeStream(outputStream);
IOUtils.closeStream(socket);
}
@Override
public UUID getPeerUUID() {
return STATIC_UUID;
}
@Override
public boolean replicate(ReplicateContext replicateContext) {
List<WAL.Entry> entries = new ArrayList<WAL.Entry>(replicateContext.getEntries());
String walGroupId = replicateContext.getWalGroupId();
while (this.isRunning()) {
try {
while (entries.size() > 0) {
connect(hostName, port);
WAL.Entry entry = entries.get(0);
if (!writeEntry(entry)) {
LOG.warn("Unable to replicate " + entry);
} else {
LOG.info("Entry replicated " + entry);
}
disconnect();
entries.remove(0);
Thread.sleep(10000);
}
return true;
} catch (Exception e) {
LOG.warn("Unable to replicate", e);
} finally {
disconnect();
}
}
return false;
}
private boolean writeEntry(final WAL.Entry entry) throws IOException {
// TODO: Replace with a protobuf message (WAL.Entry, something simpler)
TableName tableName = entry.getKey().getTablename();
ArrayList<Cell> cells = entry.getEdit().getCells();
byte[] buf = ProtobufUtil.toProtoTableName(tableName).toByteArray();
StreamUtils.writeInt(outputStream, buf.length);
outputStream.write(buf);
StreamUtils.writeInt(outputStream, cells.size());
for (Cell cell: cells) {
buf = ProtobufUtil.toCell(cell).toByteArray();
StreamUtils.writeInt(outputStream, buf.length);
outputStream.write(buf);
}
int ack = inputStream.read();
return ack >= 0 ? ack == 1 : false;
}
}
import select
import socket
from cell_pb2 import Cell, TableName
import cell_pb2
def decode_uint32(buf):
result = buf[0] << 24
result += buf[1] << 16
result += buf[2] << 8
result += buf[3]
return result
class ReplicationSource(object):
def __init__(self, sock, addr, handler):
self._sock = sock
self._addr = addr
self._buffer = b''
self._parser = None
self._entry = None
self._handler = handler
def close(self):
self._sock.close()
self._sock = None
def recv(self):
try:
if not self._parser:
self._parser = iter(self._parse_data())
next(self._parser)
except StopIteration:
table_name, cells = self._entry
ack = self._handler(table_name, cells)
self._sock.sendall(chr(ack))
self._parser = None
self._entry = None
def _parse_data(self):
while not self._recv(4): yield
length = decode_uint32(bytearray(self._buffer[:4]))
self._buffer = self._buffer[4:]
while not self._recv(length): yield
table_name = TableName.FromString(self._buffer[:length])
self._buffer = self._buffer[length:]
while not self._recv(4): yield
count = decode_uint32(bytearray(self._buffer[:4]))
self._buffer = self._buffer[4:]
cells = []
for _ in xrange(count):
while not self._recv(4): yield
length = decode_uint32(bytearray(self._buffer[:4]))
self._buffer = self._buffer[4:]
while not self._recv(length): yield
cell = Cell.FromString(self._buffer[:length])
self._buffer = self._buffer[length:]
cells.append(cell)
self._entry = (table_name, cells)
def _recv(self, n):
try:
self._buffer += self._sock.recv(n)
except socket.error:
pass
return len(self._buffer) >= n
class ReplicationServer(object):
def __init__(self, handler):
self._sock = None
self._epoll = None
self._client = {}
self._handler = handler
def bind(self, host, port):
self._sock = sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.listen(16)
sock.setblocking(0)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._epoll = epoll = select.epoll()
epoll.register(sock.fileno(), select.EPOLLIN)
def close(self):
if self._epoll:
self._epoll.unregister(self._sock.fileno())
self._epoll.close()
self._sock = None
if self._sock:
self._sock.close()
self._sock = None
def poll(self):
for fileno, event in self._epoll.poll(16):
if fileno == self._sock.fileno():
sock, addr = self._sock.accept()
sock.setblocking(0)
self._epoll.register(sock.fileno(), select.EPOLLIN)
self._client[sock.fileno()] = ReplicationSource(sock, addr, self._handler)
print('connected client', addr)
elif event & select.EPOLLIN:
self._client[fileno].recv()
elif event & select.EPOLLHUP:
client = self._client.pop(fileno)
self._epoll.unregister(fileno)
print('disconnected client', client._addr)
client.close()
def encode_uint32(value):
buf = bytearray(4)
buf[0] = (value >> 24) & 0xff;
buf[1] = (value >> 16) & 0xff;
buf[2] = (value >> 8) & 0xff;
buf[3] = value & 0xff;
return buf
def write():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('localhost', 8080))
table_name = TableName()
table_name.namespace = 'default'
table_name.qualifier = 'testtb'
buf = table_name.SerializeToString()
s.sendall(encode_uint32(len(buf)))
s.sendall(buf)
s.sendall(encode_uint32(1))
cell = Cell()
cell.row = 'row-key'
cell.family = 'f'
cell.qualifier = 'q'
cell.timestamp = 1
cell.cell_type = cell_pb2.PUT
cell.value = 'value'
buf = cell.SerializeToString()
s.sendall(encode_uint32(len(buf)))
s.sendall(buf)
data = s.recv(1)
print 'ack', ord(data)
s.close()
def handle_entry(table_name, cells):
def cell_to_string(cell):
return '%s %s:%s %s' % (cell.row, cell.family, cell.qualifier, cell.value)
def table_to_string(name):
return '%s:%s' % (name.namespace, name.qualifier)
print 'HANDLE', table_to_string(table_name), [cell_to_string(cell) for cell in cells]
return 1
if __name__ == '__main__':
HOST, PORT = '0.0.0.0', 18080
server = ReplicationServer(handle_entry)
server.bind(HOST, PORT)
try:
# Test Write
#import threading
#threading.Thread(target=write).start()
print 'listening on', HOST, PORT
while True:
server.poll()
finally:
server.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment