Created
March 18, 2016 02:46
-
-
Save matteobertozzi/c04b5ec15da6ddba56be to your computer and use it in GitHub Desktop.
FreeFormReplicationEndpoint
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| $ cat cell.proto | |
| /** | |
| * The type of the key in a Cell | |
| */ | |
| enum CellType { | |
| MINIMUM = 0; | |
| PUT = 4; | |
| DELETE = 8; | |
| DELETE_COLUMN = 12; | |
| DELETE_FAMILY = 14; | |
| // MAXIMUM is used when searching; you look from maximum on down. | |
| MAXIMUM = 255; | |
| } | |
| /** | |
| * Protocol buffer version of Cell. | |
| */ | |
| message Cell { | |
| optional bytes row = 1; | |
| optional bytes family = 2; | |
| optional bytes qualifier = 3; | |
| optional uint64 timestamp = 4; | |
| optional CellType cell_type = 5; | |
| optional bytes value = 6; | |
| optional bytes tags = 7; | |
| } | |
| message TableName { | |
| required bytes namespace = 1; | |
| required bytes qualifier = 2; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Generated by the protocol buffer compiler. DO NOT EDIT! | |
| # source: cell.proto | |
| from google.protobuf.internal import enum_type_wrapper | |
| from google.protobuf import descriptor as _descriptor | |
| from google.protobuf import message as _message | |
| from google.protobuf import reflection as _reflection | |
| from google.protobuf import descriptor_pb2 | |
| # @@protoc_insertion_point(imports) | |
| DESCRIPTOR = _descriptor.FileDescriptor( | |
| name='cell.proto', | |
| package='', | |
| serialized_pb='\n\ncell.proto\"\x84\x01\n\x04\x43\x65ll\x12\x0b\n\x03row\x18\x01 \x01(\x0c\x12\x0e\n\x06\x66\x61mily\x18\x02 \x01(\x0c\x12\x11\n\tqualifier\x18\x03 \x01(\x0c\x12\x11\n\ttimestamp\x18\x04 \x01(\x04\x12\x1c\n\tcell_type\x18\x05 \x01(\x0e\x32\t.CellType\x12\r\n\x05value\x18\x06 \x01(\x0c\x12\x0c\n\x04tags\x18\x07 \x01(\x0c\"1\n\tTableName\x12\x11\n\tnamespace\x18\x01 \x02(\x0c\x12\x11\n\tqualifier\x18\x02 \x02(\x0c*`\n\x08\x43\x65llType\x12\x0b\n\x07MINIMUM\x10\x00\x12\x07\n\x03PUT\x10\x04\x12\n\n\x06\x44\x45LETE\x10\x08\x12\x11\n\rDELETE_COLUMN\x10\x0c\x12\x11\n\rDELETE_FAMILY\x10\x0e\x12\x0c\n\x07MAXIMUM\x10\xff\x01') | |
| _CELLTYPE = _descriptor.EnumDescriptor( | |
| name='CellType', | |
| full_name='CellType', | |
| filename=None, | |
| file=DESCRIPTOR, | |
| values=[ | |
| _descriptor.EnumValueDescriptor( | |
| name='MINIMUM', index=0, number=0, | |
| options=None, | |
| type=None), | |
| _descriptor.EnumValueDescriptor( | |
| name='PUT', index=1, number=4, | |
| options=None, | |
| type=None), | |
| _descriptor.EnumValueDescriptor( | |
| name='DELETE', index=2, number=8, | |
| options=None, | |
| type=None), | |
| _descriptor.EnumValueDescriptor( | |
| name='DELETE_COLUMN', index=3, number=12, | |
| options=None, | |
| type=None), | |
| _descriptor.EnumValueDescriptor( | |
| name='DELETE_FAMILY', index=4, number=14, | |
| options=None, | |
| type=None), | |
| _descriptor.EnumValueDescriptor( | |
| name='MAXIMUM', index=5, number=255, | |
| options=None, | |
| type=None), | |
| ], | |
| containing_type=None, | |
| options=None, | |
| serialized_start=200, | |
| serialized_end=296, | |
| ) | |
| CellType = enum_type_wrapper.EnumTypeWrapper(_CELLTYPE) | |
| MINIMUM = 0 | |
| PUT = 4 | |
| DELETE = 8 | |
| DELETE_COLUMN = 12 | |
| DELETE_FAMILY = 14 | |
| MAXIMUM = 255 | |
| _CELL = _descriptor.Descriptor( | |
| name='Cell', | |
| full_name='Cell', | |
| filename=None, | |
| file=DESCRIPTOR, | |
| containing_type=None, | |
| fields=[ | |
| _descriptor.FieldDescriptor( | |
| name='row', full_name='Cell.row', index=0, | |
| number=1, type=12, cpp_type=9, label=1, | |
| has_default_value=False, default_value="", | |
| message_type=None, enum_type=None, containing_type=None, | |
| is_extension=False, extension_scope=None, | |
| options=None), | |
| _descriptor.FieldDescriptor( | |
| name='family', full_name='Cell.family', index=1, | |
| number=2, type=12, cpp_type=9, label=1, | |
| has_default_value=False, default_value="", | |
| message_type=None, enum_type=None, containing_type=None, | |
| is_extension=False, extension_scope=None, | |
| options=None), | |
| _descriptor.FieldDescriptor( | |
| name='qualifier', full_name='Cell.qualifier', index=2, | |
| number=3, type=12, cpp_type=9, label=1, | |
| has_default_value=False, default_value="", | |
| message_type=None, enum_type=None, containing_type=None, | |
| is_extension=False, extension_scope=None, | |
| options=None), | |
| _descriptor.FieldDescriptor( | |
| name='timestamp', full_name='Cell.timestamp', index=3, | |
| number=4, type=4, cpp_type=4, label=1, | |
| has_default_value=False, default_value=0, | |
| message_type=None, enum_type=None, containing_type=None, | |
| is_extension=False, extension_scope=None, | |
| options=None), | |
| _descriptor.FieldDescriptor( | |
| name='cell_type', full_name='Cell.cell_type', index=4, | |
| number=5, type=14, cpp_type=8, label=1, | |
| has_default_value=False, default_value=0, | |
| message_type=None, enum_type=None, containing_type=None, | |
| is_extension=False, extension_scope=None, | |
| options=None), | |
| _descriptor.FieldDescriptor( | |
| name='value', full_name='Cell.value', index=5, | |
| number=6, type=12, cpp_type=9, label=1, | |
| has_default_value=False, default_value="", | |
| message_type=None, enum_type=None, containing_type=None, | |
| is_extension=False, extension_scope=None, | |
| options=None), | |
| _descriptor.FieldDescriptor( | |
| name='tags', full_name='Cell.tags', index=6, | |
| number=7, type=12, cpp_type=9, label=1, | |
| has_default_value=False, default_value="", | |
| message_type=None, enum_type=None, containing_type=None, | |
| is_extension=False, extension_scope=None, | |
| options=None), | |
| ], | |
| extensions=[ | |
| ], | |
| nested_types=[], | |
| enum_types=[ | |
| ], | |
| options=None, | |
| is_extendable=False, | |
| extension_ranges=[], | |
| serialized_start=15, | |
| serialized_end=147, | |
| ) | |
| _TABLENAME = _descriptor.Descriptor( | |
| name='TableName', | |
| full_name='TableName', | |
| filename=None, | |
| file=DESCRIPTOR, | |
| containing_type=None, | |
| fields=[ | |
| _descriptor.FieldDescriptor( | |
| name='namespace', full_name='TableName.namespace', index=0, | |
| number=1, type=12, cpp_type=9, label=2, | |
| has_default_value=False, default_value="", | |
| message_type=None, enum_type=None, containing_type=None, | |
| is_extension=False, extension_scope=None, | |
| options=None), | |
| _descriptor.FieldDescriptor( | |
| name='qualifier', full_name='TableName.qualifier', index=1, | |
| number=2, type=12, cpp_type=9, label=2, | |
| has_default_value=False, default_value="", | |
| message_type=None, enum_type=None, containing_type=None, | |
| is_extension=False, extension_scope=None, | |
| options=None), | |
| ], | |
| extensions=[ | |
| ], | |
| nested_types=[], | |
| enum_types=[ | |
| ], | |
| options=None, | |
| is_extendable=False, | |
| extension_ranges=[], | |
| serialized_start=149, | |
| serialized_end=198, | |
| ) | |
| _CELL.fields_by_name['cell_type'].enum_type = _CELLTYPE | |
| DESCRIPTOR.message_types_by_name['Cell'] = _CELL | |
| DESCRIPTOR.message_types_by_name['TableName'] = _TABLENAME | |
| class Cell(_message.Message): | |
| __metaclass__ = _reflection.GeneratedProtocolMessageType | |
| DESCRIPTOR = _CELL | |
| # @@protoc_insertion_point(class_scope:Cell) | |
| class TableName(_message.Message): | |
| __metaclass__ = _reflection.GeneratedProtocolMessageType | |
| DESCRIPTOR = _TABLENAME | |
| # @@protoc_insertion_point(class_scope:TableName) | |
| # @@protoc_insertion_point(module_scope) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.apache.hadoop.hbase.replication.regionserver; | |
| import java.io.IOException; | |
| import java.io.InputStream; | |
| import java.io.OutputStream; | |
| import java.net.Socket; | |
| import org.apache.commons.logging.Log; | |
| import org.apache.commons.logging.LogFactory; | |
| import org.apache.hadoop.io.IOUtils; | |
| import org.apache.hadoop.hbase.Abortable; | |
| import org.apache.hadoop.hbase.Cell; | |
| import org.apache.hadoop.hbase.CellUtil; | |
| import org.apache.hadoop.hbase.TableDescriptors; | |
| import org.apache.hadoop.hbase.TableName; | |
| import org.apache.hadoop.hbase.io.util.StreamUtils; | |
| import org.apache.hadoop.hbase.protobuf.ProtobufUtil; | |
| import org.apache.hadoop.hbase.regionserver.wal.WALEdit; | |
| import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; | |
| import org.apache.hadoop.hbase.wal.WAL; | |
| import java.nio.ByteBuffer; | |
| import java.util.ArrayList; | |
| import java.util.HashMap; | |
| import java.util.List; | |
| import java.util.UUID; | |
| /* | |
| * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.regionserver.FreeReplicationEndpoint', CONFIG => {'master_addresses' => 'localhost:18080'} | |
| * create 'testtb', {NAME => 'f', REPLICATION_SCOPE => 1} | |
| * put 'testtb', 'row0', 'f:q', 'value0' | |
| */ | |
| public class FreeReplicationEndpoint extends BaseReplicationEndpoint | |
| implements Abortable { | |
| private static final Log LOG = LogFactory.getLog(FreeReplicationEndpoint.class); | |
| // Always using the same UUID for now. | |
| private static final UUID STATIC_UUID = UUID.fromString("9ccee075-1c3c-47c0-835b-6dc0f657b875"); | |
| private String hostName; | |
| private int port; | |
| private Socket socket; | |
| private InputStream inputStream; | |
| private OutputStream outputStream; | |
| @Override | |
| public void init(Context context) throws IOException { | |
| super.init(context); | |
| } | |
| @Override | |
| public void abort(String why, Throwable e) { | |
| LOG.fatal("The FreeReplicationEndpoint corresponding to peer " + ctx.getPeerId() | |
| + " was aborted for the following reason(s):" + why, e); | |
| } | |
| @Override | |
| public boolean isAborted() { | |
| // Currently this is never "Aborted", we just log when the abort method is called. | |
| return false; | |
| } | |
| @Override | |
| protected void doStart() { | |
| String masterAddresses = ctx.getPeerConfig().getConfiguration().get("master_addresses"); | |
| String[] addr = masterAddresses.split(":"); | |
| hostName = addr[0]; | |
| port = Integer.parseInt(addr[1]); | |
| notifyStarted(); | |
| } | |
| @Override | |
| protected void doStop() { | |
| disconnect(); | |
| notifyStopped(); | |
| } | |
| private void connect(String hostName, int port) { | |
| try { | |
| socket = new Socket(hostName, port); | |
| inputStream = socket.getInputStream(); | |
| outputStream = socket.getOutputStream(); | |
| } catch (IOException e) { | |
| LOG.warn("Unable to connect", e); | |
| } | |
| LOG.info("connected to " + hostName + ":" + port); | |
| } | |
| private void disconnect() { | |
| IOUtils.closeStream(inputStream); | |
| IOUtils.closeStream(outputStream); | |
| IOUtils.closeStream(socket); | |
| } | |
| @Override | |
| public UUID getPeerUUID() { | |
| return STATIC_UUID; | |
| } | |
| @Override | |
| public boolean replicate(ReplicateContext replicateContext) { | |
| List<WAL.Entry> entries = new ArrayList<WAL.Entry>(replicateContext.getEntries()); | |
| String walGroupId = replicateContext.getWalGroupId(); | |
| while (this.isRunning()) { | |
| try { | |
| while (entries.size() > 0) { | |
| connect(hostName, port); | |
| WAL.Entry entry = entries.get(0); | |
| if (!writeEntry(entry)) { | |
| LOG.warn("Unable to replicate " + entry); | |
| } else { | |
| LOG.info("Entry replicated " + entry); | |
| } | |
| disconnect(); | |
| entries.remove(0); | |
| Thread.sleep(10000); | |
| } | |
| return true; | |
| } catch (Exception e) { | |
| LOG.warn("Unable to replicate", e); | |
| } finally { | |
| disconnect(); | |
| } | |
| } | |
| return false; | |
| } | |
| private boolean writeEntry(final WAL.Entry entry) throws IOException { | |
| // TODO: Replace with a protobuf message (WAL.Entry, something simpler) | |
| TableName tableName = entry.getKey().getTablename(); | |
| ArrayList<Cell> cells = entry.getEdit().getCells(); | |
| byte[] buf = ProtobufUtil.toProtoTableName(tableName).toByteArray(); | |
| StreamUtils.writeInt(outputStream, buf.length); | |
| outputStream.write(buf); | |
| StreamUtils.writeInt(outputStream, cells.size()); | |
| for (Cell cell: cells) { | |
| buf = ProtobufUtil.toCell(cell).toByteArray(); | |
| StreamUtils.writeInt(outputStream, buf.length); | |
| outputStream.write(buf); | |
| } | |
| int ack = inputStream.read(); | |
| return ack >= 0 ? ack == 1 : false; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import select | |
| import socket | |
| from cell_pb2 import Cell, TableName | |
| import cell_pb2 | |
| def decode_uint32(buf): | |
| result = buf[0] << 24 | |
| result += buf[1] << 16 | |
| result += buf[2] << 8 | |
| result += buf[3] | |
| return result | |
| class ReplicationSource(object): | |
| def __init__(self, sock, addr, handler): | |
| self._sock = sock | |
| self._addr = addr | |
| self._buffer = b'' | |
| self._parser = None | |
| self._entry = None | |
| self._handler = handler | |
| def close(self): | |
| self._sock.close() | |
| self._sock = None | |
| def recv(self): | |
| try: | |
| if not self._parser: | |
| self._parser = iter(self._parse_data()) | |
| next(self._parser) | |
| except StopIteration: | |
| table_name, cells = self._entry | |
| ack = self._handler(table_name, cells) | |
| self._sock.sendall(chr(ack)) | |
| self._parser = None | |
| self._entry = None | |
| def _parse_data(self): | |
| while not self._recv(4): yield | |
| length = decode_uint32(bytearray(self._buffer[:4])) | |
| self._buffer = self._buffer[4:] | |
| while not self._recv(length): yield | |
| table_name = TableName.FromString(self._buffer[:length]) | |
| self._buffer = self._buffer[length:] | |
| while not self._recv(4): yield | |
| count = decode_uint32(bytearray(self._buffer[:4])) | |
| self._buffer = self._buffer[4:] | |
| cells = [] | |
| for _ in xrange(count): | |
| while not self._recv(4): yield | |
| length = decode_uint32(bytearray(self._buffer[:4])) | |
| self._buffer = self._buffer[4:] | |
| while not self._recv(length): yield | |
| cell = Cell.FromString(self._buffer[:length]) | |
| self._buffer = self._buffer[length:] | |
| cells.append(cell) | |
| self._entry = (table_name, cells) | |
| def _recv(self, n): | |
| try: | |
| self._buffer += self._sock.recv(n) | |
| except socket.error: | |
| pass | |
| return len(self._buffer) >= n | |
| class ReplicationServer(object): | |
| def __init__(self, handler): | |
| self._sock = None | |
| self._epoll = None | |
| self._client = {} | |
| self._handler = handler | |
| def bind(self, host, port): | |
| self._sock = sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| sock.bind((host, port)) | |
| sock.listen(16) | |
| sock.setblocking(0) | |
| sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | |
| self._epoll = epoll = select.epoll() | |
| epoll.register(sock.fileno(), select.EPOLLIN) | |
| def close(self): | |
| if self._epoll: | |
| self._epoll.unregister(self._sock.fileno()) | |
| self._epoll.close() | |
| self._sock = None | |
| if self._sock: | |
| self._sock.close() | |
| self._sock = None | |
| def poll(self): | |
| for fileno, event in self._epoll.poll(16): | |
| if fileno == self._sock.fileno(): | |
| sock, addr = self._sock.accept() | |
| sock.setblocking(0) | |
| self._epoll.register(sock.fileno(), select.EPOLLIN) | |
| self._client[sock.fileno()] = ReplicationSource(sock, addr, self._handler) | |
| print('connected client', addr) | |
| elif event & select.EPOLLIN: | |
| self._client[fileno].recv() | |
| elif event & select.EPOLLHUP: | |
| client = self._client.pop(fileno) | |
| self._epoll.unregister(fileno) | |
| print('disconnected client', client._addr) | |
| client.close() | |
| def encode_uint32(value): | |
| buf = bytearray(4) | |
| buf[0] = (value >> 24) & 0xff; | |
| buf[1] = (value >> 16) & 0xff; | |
| buf[2] = (value >> 8) & 0xff; | |
| buf[3] = value & 0xff; | |
| return buf | |
| def write(): | |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| s.connect(('localhost', 8080)) | |
| table_name = TableName() | |
| table_name.namespace = 'default' | |
| table_name.qualifier = 'testtb' | |
| buf = table_name.SerializeToString() | |
| s.sendall(encode_uint32(len(buf))) | |
| s.sendall(buf) | |
| s.sendall(encode_uint32(1)) | |
| cell = Cell() | |
| cell.row = 'row-key' | |
| cell.family = 'f' | |
| cell.qualifier = 'q' | |
| cell.timestamp = 1 | |
| cell.cell_type = cell_pb2.PUT | |
| cell.value = 'value' | |
| buf = cell.SerializeToString() | |
| s.sendall(encode_uint32(len(buf))) | |
| s.sendall(buf) | |
| data = s.recv(1) | |
| print 'ack', ord(data) | |
| s.close() | |
| def handle_entry(table_name, cells): | |
| def cell_to_string(cell): | |
| return '%s %s:%s %s' % (cell.row, cell.family, cell.qualifier, cell.value) | |
| def table_to_string(name): | |
| return '%s:%s' % (name.namespace, name.qualifier) | |
| print 'HANDLE', table_to_string(table_name), [cell_to_string(cell) for cell in cells] | |
| return 1 | |
| if __name__ == '__main__': | |
| HOST, PORT = '0.0.0.0', 18080 | |
| server = ReplicationServer(handle_entry) | |
| server.bind(HOST, PORT) | |
| try: | |
| # Test Write | |
| #import threading | |
| #threading.Thread(target=write).start() | |
| print 'listening on', HOST, PORT | |
| while True: | |
| server.poll() | |
| finally: | |
| server.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment