Created
March 24, 2016 11:08
-
-
Save siddontang/f993b353de45610bbca4 to your computer and use it in GitHub Desktop.
A simple raft command line tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"net" | |
"os" | |
"path" | |
"time" | |
"github.com/coreos/etcd/clientv3" | |
"github.com/golang/protobuf/proto" | |
"github.com/pingcap/kvproto/pkg/metapb" | |
"github.com/pingcap/kvproto/pkg/pdpb" | |
"github.com/pingcap/kvproto/pkg/raft_cmdpb" | |
"github.com/pingcap/kvproto/pkg/raft_serverpb" | |
"github.com/pingcap/pd/server" | |
"github.com/pingcap/pd/util" | |
"github.com/spf13/cobra" | |
"github.com/twinj/uuid" | |
) | |
const ( | |
cliName = "raftctl" | |
cliDescription = "A simple command line client for raft server" | |
) | |
var ( | |
rootCmd = &cobra.Command{ | |
Use: cliName, | |
Short: cliDescription, | |
} | |
) | |
var ( | |
raftAddr string | |
etcdEndpoints []string | |
pdRootPath string | |
clusterID uint64 | |
client *clientv3.Client | |
) | |
func init() { | |
rootCmd.PersistentFlags().StringVar(&raftAddr, "raft", "127.0.0.1:20160", "set the raft server address") | |
rootCmd.PersistentFlags().StringSliceVar(&etcdEndpoints, "etcd", []string{"127.0.0.1:2379"}, "Etcd endpoints") | |
rootCmd.PersistentFlags().StringVar(&pdRootPath, "pd", "/pd", "pd root path in etcd") | |
rootCmd.PersistentFlags().Uint64Var(&clusterID, "cluster-id", 1, "default cluster id") | |
initKVCommand() | |
cobra.EnablePrefixMatching = true | |
} | |
func perror(err error) { | |
if err != nil { | |
fmt.Println(err) | |
os.Exit(-1) | |
} | |
} | |
func main() { | |
var err error | |
client, err = clientv3.New(clientv3.Config{ | |
Endpoints: etcdEndpoints, | |
DialTimeout: 3 * time.Second, | |
}) | |
perror(err) | |
err = rootCmd.Execute() | |
perror(err) | |
} | |
func initKVCommand() { | |
getCmd := &cobra.Command{ | |
Use: "get <key>", | |
Short: "get the value with given key", | |
Run: getCommandFunc, | |
} | |
rootCmd.AddCommand(getCmd) | |
putCmd := &cobra.Command{ | |
Use: "put <key> <value>", | |
Short: "set key with value", | |
Run: putCommandFunc, | |
} | |
rootCmd.AddCommand(putCmd) | |
seekCmd := &cobra.Command{ | |
Use: "seek <key>", | |
Short: "seek the key >= given key", | |
Run: seekCommandFunc, | |
} | |
rootCmd.AddCommand(seekCmd) | |
deleteCmd := &cobra.Command{ | |
Use: "delete <key>", | |
Short: "delete the key", | |
Run: deleteCommandFunc, | |
} | |
rootCmd.AddCommand(deleteCmd) | |
} | |
func getCommandFunc(cmd *cobra.Command, args []string) { | |
key := args[0] | |
region := getRegion(key) | |
leader := getRegionLeader(region) | |
req := &raft_cmdpb.RaftCmdRequest{ | |
Header: &raft_cmdpb.RaftRequestHeader{ | |
RegionId: region.RegionId, | |
Peer: leader, | |
}, | |
Requests: []*raft_cmdpb.Request{ | |
&raft_cmdpb.Request{ | |
CmdType: raft_cmdpb.CmdType_Get.Enum(), | |
Get: &raft_cmdpb.GetRequest{ | |
Key: []byte(key), | |
}, | |
}, | |
}, | |
} | |
resp := sendRaftCommnd(req) | |
fmt.Printf("%s\n", resp.Responses[0].Get) | |
} | |
func putCommandFunc(cmd *cobra.Command, args []string) { | |
key := args[0] | |
value := args[1] | |
region := getRegion(key) | |
leader := getRegionLeader(region) | |
req := &raft_cmdpb.RaftCmdRequest{ | |
Header: &raft_cmdpb.RaftRequestHeader{ | |
RegionId: region.RegionId, | |
Peer: leader, | |
}, | |
Requests: []*raft_cmdpb.Request{ | |
&raft_cmdpb.Request{ | |
CmdType: raft_cmdpb.CmdType_Put.Enum(), | |
Put: &raft_cmdpb.PutRequest{ | |
Key: []byte(key), | |
Value: []byte(value), | |
}, | |
}, | |
}, | |
} | |
sendRaftCommnd(req) | |
} | |
func seekCommandFunc(cmd *cobra.Command, args []string) { | |
key := args[0] | |
region := getRegion(key) | |
leader := getRegionLeader(region) | |
req := &raft_cmdpb.RaftCmdRequest{ | |
Header: &raft_cmdpb.RaftRequestHeader{ | |
RegionId: region.RegionId, | |
Peer: leader, | |
}, | |
Requests: []*raft_cmdpb.Request{ | |
&raft_cmdpb.Request{ | |
CmdType: raft_cmdpb.CmdType_Seek.Enum(), | |
Seek: &raft_cmdpb.SeekRequest{ | |
Key: []byte(key), | |
}, | |
}, | |
}, | |
} | |
resp := sendRaftCommnd(req) | |
fmt.Printf("%s\n", resp.Responses[0].Seek) | |
} | |
func deleteCommandFunc(cmd *cobra.Command, args []string) { | |
key := args[0] | |
region := getRegion(key) | |
leader := getRegionLeader(region) | |
req := &raft_cmdpb.RaftCmdRequest{ | |
Header: &raft_cmdpb.RaftRequestHeader{ | |
RegionId: region.RegionId, | |
Peer: leader, | |
}, | |
Requests: []*raft_cmdpb.Request{ | |
&raft_cmdpb.Request{ | |
CmdType: raft_cmdpb.CmdType_Delete.Enum(), | |
Delete: &raft_cmdpb.DeleteRequest{ | |
Key: []byte(key), | |
}, | |
}, | |
}, | |
} | |
sendRaftCommnd(req) | |
} | |
func getPDLeader() string { | |
leader, err := server.GetLeader(client, path.Join(pdRootPath, "leader")) | |
perror(err) | |
return *leader.Addr | |
} | |
func getRegion(key string) *metapb.Region { | |
addr := getPDLeader() | |
conn, err := net.Dial("tcp", addr) | |
perror(err) | |
defer conn.Close() | |
req := &pdpb.Request{ | |
Header: &pdpb.RequestHeader{ | |
ClusterId: proto.Uint64(clusterID), | |
}, | |
CmdType: pdpb.CommandType_GetMeta.Enum(), | |
GetMeta: &pdpb.GetMetaRequest{ | |
MetaType: pdpb.MetaType_RegionType.Enum(), | |
RegionKey: []byte(key), | |
}, | |
} | |
err = util.WriteMessage(conn, 0, req) | |
perror(err) | |
var resp pdpb.Response | |
_, err = util.ReadMessage(conn, &resp) | |
perror(err) | |
return resp.GetMeta.Region | |
} | |
func getNodeAddr(nodeID uint64) string { | |
addr := getPDLeader() | |
conn, err := net.Dial("tcp", addr) | |
perror(err) | |
defer conn.Close() | |
req := &pdpb.Request{ | |
Header: &pdpb.RequestHeader{ | |
ClusterId: proto.Uint64(clusterID), | |
}, | |
CmdType: pdpb.CommandType_GetMeta.Enum(), | |
GetMeta: &pdpb.GetMetaRequest{ | |
MetaType: pdpb.MetaType_NodeType.Enum(), | |
NodeId: proto.Uint64(nodeID), | |
}, | |
} | |
err = util.WriteMessage(conn, 0, req) | |
perror(err) | |
var resp pdpb.Response | |
_, err = util.ReadMessage(conn, &resp) | |
perror(err) | |
return resp.GetMeta.Node.GetAddress() | |
} | |
func getRegionLeader(region *metapb.Region) *metapb.Peer { | |
regionID := region.GetRegionId() | |
peers := region.GetPeers() | |
req := &raft_cmdpb.RaftCmdRequest{ | |
Header: &raft_cmdpb.RaftRequestHeader{ | |
RegionId: proto.Uint64(regionID), | |
}, | |
StatusRequest: &raft_cmdpb.StatusRequest{ | |
CmdType: raft_cmdpb.StatusCmdType_RegionLeader.Enum(), | |
RegionLeader: &raft_cmdpb.RegionLeaderRequest{}, | |
}, | |
} | |
for _, peer := range peers { | |
req.Header.Peer = peer | |
resp := sendRaftCommnd(req) | |
leader := resp.StatusResponse.RegionLeader.Leader | |
if leader != nil { | |
return leader | |
} | |
} | |
return nil | |
} | |
func sendRaftCommnd(req *raft_cmdpb.RaftCmdRequest) *raft_cmdpb.RaftCmdResponse { | |
req.Header.Uuid = uuid.NewV4().Bytes() | |
addr := getNodeAddr(req.Header.Peer.GetNodeId()) | |
conn, err := net.Dial("tcp", addr) | |
perror(err) | |
defer conn.Close() | |
err = util.WriteMessage(conn, 0, &raft_serverpb.Message{ | |
MsgType: raft_serverpb.MessageType_Cmd.Enum(), | |
CmdReq: req, | |
}) | |
perror(err) | |
var resp raft_serverpb.Message | |
_, err = util.ReadMessage(conn, &resp) | |
perror(err) | |
return resp.CmdResp | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment