Skip to content

Instantly share code, notes, and snippets.

@ferranbt
Created September 26, 2023 13:35
Show Gist options
  • Save ferranbt/052b871224cc370829eeb698ee2a5674 to your computer and use it in GitHub Desktop.
Save ferranbt/052b871224cc370829eeb698ee2a5674 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"crypto/rand"
"crypto/sha512"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
mrand "math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
consensus "github.com/0xPolygon/pbft-consensus"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/security/noise"
"github.com/multiformats/go-multiaddr"
)
type valNode struct {
addr string
priv crypto.PrivKey
id string
port uint64
}
func generateNode(id string, port uint64, addr string) valNode {
r := mrand.New(mrand.NewSource(int64(port)))
// You need to use Ed25519 for deterministic keys!
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 256, r)
if err != nil {
panic(err)
}
return valNode{
addr: addr,
id: id,
priv: priv,
port: port,
}
}
var valNodes = map[string]valNode{
"a": generateNode("a", 30303, "/ip4/127.0.0.1/tcp/30303/p2p/12D3KooWEKauHBKgW9HryEQS72cUDkWwyajF6V2p8qyW6rKinw5R"),
"b": generateNode("b", 30304, "/ip4/127.0.0.1/tcp/30304/p2p/12D3KooWPRvetqQJYw7ASHkqDrMrsRSugn8MRwSqk7h9k9wXTKfJ"),
"c": generateNode("c", 30305, "/ip4/127.0.0.1/tcp/30305/p2p/12D3KooWSnGnTPSDN6b9wGJZz4aWKQq6SP4DeA1XSyzTWQDpTEgB"),
}
// the validator set is 5 nodes such that we simulate only 3, the minimum required to work.
// otherwise, the proposer (since it is fixed) would start on its own and not wait for the other
// nodes. That is not good because there is no sync mechanism for the other nodes to catch up.
var valSet = &validatorSet{
nodes: []consensus.NodeID{"a", "b", "c", "d", "e"},
}
func main() {
config := &Config{}
var emit bool
var seal bool
flag.StringVar(&config.ID, "id", "a", "")
flag.BoolVar(&emit, "emit", false, "")
flag.BoolVar(&seal, "seal", false, "")
flag.Parse()
node := valNodes[config.ID]
config.PrivID = node.priv
config.P2PPort = node.port
logger := hclog.New(&hclog.LoggerOptions{Output: os.Stdout, Level: hclog.Info})
da := NewDaStore(logger, config)
if seal {
da.run(context.Background())
}
for id, n := range valNodes {
if id == config.ID {
continue
}
if err := da.join(n.addr); err != nil {
logger.Error("failed to connect", "addr", n.addr)
}
}
if emit {
if seal {
// wait for the consensus to have started, this is, all the nodes connected to each
// other. Otherwise, since there is no sync mechanism for the bids, the leader will
// have bids that the other nodes do not have. An alternative is to start the leader (node 'a')
// as the last element.
logger.Info("waiting for consensus to start to emit bids")
<-da.consensusStartedCh
}
logger.Info("emitting bids over p2p")
go func() {
for {
time.Sleep(1 * time.Second)
data := make([]byte, 100)
rand.Read(data)
bid := Bid{
Data: data,
}
bid.Id = bid.getHash()
logger.Info("send bid", "id", bid.Id)
da.sendBid(bid)
}
}()
}
signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
<-signalCh
da.Close()
}
type Config struct {
PrivID crypto.PrivKey
ID string
P2PPort uint64
}
type DaStore struct {
logger hclog.Logger
config *Config
p2p host.Host
pubsub *pubsub.PubSub
consensusTopic *pubsub.Topic
newPeerCh chan struct{}
state *memdb.MemDB
mempool *mempool
mempoolTopic *pubsub.Topic
consensusStartedCh chan struct{}
}
func NewDaStore(logger hclog.Logger, config *Config) *DaStore {
ctx := context.Background()
listenAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", "0.0.0.0", config.P2PPort))
if err != nil {
panic(err)
}
host, err := libp2p.New(
libp2p.Identity(config.PrivID),
libp2p.Security(noise.ID, noise.New),
libp2p.ListenAddrs(listenAddr),
)
if err != nil {
panic(err)
}
peerInfo := peer.AddrInfo{
ID: host.ID(),
Addrs: host.Addrs(),
}
addrs, err := peer.AddrInfoToP2pAddrs(&peerInfo)
if err != nil {
panic(err)
}
for _, addr := range addrs {
logger.Info("p2p bind", "addr", addr)
}
ps, err := pubsub.NewGossipSub(ctx, host)
if err != nil {
panic(err)
}
state, err := memdb.NewMemDB(schema)
if err != nil {
panic(err)
}
da := &DaStore{
logger: logger,
config: config,
p2p: host,
pubsub: ps,
state: state,
newPeerCh: make(chan struct{}),
mempool: newMempool(),
consensusStartedCh: make(chan struct{}),
}
da.setupMempool()
// watch for new connected peers
host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(net network.Network, conn network.Conn) {
logger.Info("new peer connected", "id", conn.LocalMultiaddr())
da.notifyNewPeer()
},
})
return da
}
var schema = &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
"bid": {
Name: "bid",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "Id"},
},
},
},
},
}
func (d *DaStore) getBidById(id string) (*Bid, bool) {
txn := d.state.Txn(false)
defer txn.Abort()
raw, err := txn.First("bid", id)
if err != nil {
panic(err)
}
if raw == nil {
return nil, false
}
return raw.(*Bid), true
}
func (d *DaStore) setupMempool() {
topic, err := d.pubsub.Join("mempool")
if err != nil {
panic(err)
}
d.mempoolTopic = topic
sub, err := topic.Subscribe()
if err != nil {
panic(err)
}
go func() {
for {
rawMsg, err := sub.Next(context.Background())
if err != nil {
panic(err)
}
var msg BidPoolMsg
if err := json.Unmarshal(rawMsg.Data, &msg); err != nil {
panic(err)
}
if msg.Sender == d.config.ID {
continue
}
d.logger.Info("received new bid from pool", "id", msg.Bid.Id, "from", msg.Sender)
d.mempool.add(msg.Bid)
}
}()
}
type BidPoolMsg struct {
Bid Bid
Sender string
}
func (d *DaStore) sendBid(bid Bid) {
d.mempool.add(bid)
msg := &BidPoolMsg{
Bid: bid,
Sender: d.config.ID,
}
// send over p2p network
rawMsg, err := json.Marshal(msg)
if err != nil {
panic(err)
}
if err := d.mempoolTopic.Publish(context.Background(), rawMsg); err != nil {
panic(err)
}
}
type mempool struct {
bids []Bid
lock sync.Mutex
}
func newMempool() *mempool {
return &mempool{bids: []Bid{}}
}
func (m *mempool) add(bid Bid) {
m.lock.Lock()
m.bids = append(m.bids, bid)
m.lock.Unlock()
}
func (m *mempool) validateBundle(header *DaHeader) error {
m.lock.Lock()
defer m.lock.Unlock()
idSet := make(map[string]bool)
for _, bid := range m.bids {
idSet[bid.Id] = true
}
for _, bidId := range header.Bids {
if _, ok := idSet[bidId]; !ok {
return fmt.Errorf("bid not found %s", bidId)
}
}
return nil
}
func (m *mempool) cleanBundle(header *DaHeader) []Bid {
if err := m.validateBundle(header); err != nil {
panic(err)
}
m.lock.Lock()
defer m.lock.Unlock()
// Create a map to efficiently check if an ID should be removed
idSet := make(map[string]bool)
for _, id := range header.Bids {
idSet[id] = true
}
// Create a new slice to store the filtered bids
var filteredBids []Bid
// Iterate through the original list of bids
resBids := []Bid{}
for _, bid := range m.bids {
// Check if the bid's ID should be removed
if !idSet[bid.Id] {
// If not, add it to the filtered list
filteredBids = append(filteredBids, bid)
} else {
resBids = append(resBids, bid)
}
}
m.bids = filteredBids
return resBids
}
var numHeaderBids = uint64(10)
func (m *mempool) generateBundle() *DaHeader {
m.lock.Lock()
num := numHeaderBids
if size := uint64(len(m.bids)); size < num {
num = size
}
bids := m.bids[:num]
m.lock.Unlock()
// build the header with the ids of the bids
bidIDs := []string{}
for _, bid := range bids {
bidIDs = append(bidIDs, bid.Id)
}
return &DaHeader{
Bids: bidIDs,
}
}
func (d *DaStore) join(addrRaw string) error {
addr, err := peer.AddrInfoFromString(addrRaw)
if err != nil {
return err
}
if err := d.p2p.Connect(context.Background(), *addr); err != nil {
return err
}
d.notifyNewPeer()
return nil
}
func (d *DaStore) notifyNewPeer() {
select {
case d.newPeerCh <- struct{}{}:
default:
}
}
// Gossip implements the pbft-consensus transport interface
func (d *DaStore) Gossip(msg *consensus.MessageReq) error {
rawData, err := json.Marshal(msg)
if err != nil {
return err
}
if err := d.consensusTopic.Publish(context.Background(), rawData); err != nil {
return err
}
return nil
}
func (d *DaStore) run(ctx context.Context) {
go d.runConsensus(ctx)
}
// run runs a PBFT consensus protocol
func (d *DaStore) runConsensus(ctx context.Context) {
var err error
d.consensusTopic, err = d.pubsub.Join("consensus")
if err != nil {
panic(err)
}
d.logger.Info("consensus running", "id", d.config.ID)
// initialize the pbft-consensus library
key := &signKey{
id: consensus.NodeID(d.config.ID),
}
pbftConsensus := consensus.New(key, d)
sub, err := d.consensusTopic.Subscribe()
if err != nil {
panic(err)
}
go func() {
for {
rawMsg, err := sub.Next(ctx)
if err != nil {
panic(err)
}
var msg consensus.MessageReq
if err := json.Unmarshal(rawMsg.Data, &msg); err != nil {
panic(err)
}
pbftConsensus.PushMessage(&msg)
}
}()
// wait for enough peers to be connected
for {
select {
case <-d.newPeerCh:
case <-ctx.Done():
return
}
// wait until you have connected with the other nodes in the network
if len(d.p2p.Network().Peers()) >= len(valNodes)-1 {
break
}
}
d.logger.Info("consensus starting")
close(d.consensusStartedCh)
// start the consensus protocol
blockNum := uint64(0)
for {
select {
case <-time.After(1 * time.Second):
case <-ctx.Done():
return
}
d.logger.Info("Sealing block", "num", blockNum)
instance := &daSequenceInstance{
logger: d.logger,
blockNum: blockNum,
timestamp: time.Now(),
header: d.mempool.generateBundle(),
validateBids: d.mempool.validateBundle,
}
pbftConsensus.SetBackend(instance)
pbftConsensus.Run(ctx)
d.insertFinalizedProposal(instance.final)
blockNum++
}
}
func (d *DaStore) insertFinalizedProposal(proposal *consensus.SealedProposal) {
// clean the memory pool from the finalized bids
// it should not fail because the `Validate` stage made sure that
// all the transactions are in the pool
var header DaHeader
if err := json.Unmarshal(proposal.Proposal.Data, &header); err != nil {
panic(err)
}
d.logger.Info("finalized header", "bids", len(header.Bids))
bids := d.mempool.cleanBundle(&header)
// insert the data in the store and the indexer
txn := d.state.Txn(true)
for _, bid := range bids {
if err := txn.Insert("bid", bid); err != nil {
panic(err)
}
}
txn.Commit()
}
func (d *DaStore) Close() {
}
type DaHeader struct {
Number uint64
Bids []string
}
type DaBlock struct {
Header *DaHeader
Bids []Bid
}
type Bid struct {
Id string
Data []byte
}
func (b *Bid) getHash() string {
hash := sha512.Sum512_224(b.Data)
return hex.EncodeToString(hash[:])
}
type daSequenceInstance struct {
logger hclog.Logger
header *DaHeader
blockNum uint64
final *consensus.SealedProposal
timestamp time.Time
validateBids func(bundle *DaHeader) error
}
// BuildProposal builds a proposal for the current round (used if proposer)
func (d *daSequenceInstance) BuildProposal() (*consensus.Proposal, error) {
d.header.Number = d.blockNum
data, err := json.Marshal(d.header)
if err != nil {
return nil, err
}
hash := sha512.Sum512(data)
return &consensus.Proposal{Data: data, Hash: hash[:], Time: d.timestamp}, nil
}
// Height returns the height for the current round
func (d *daSequenceInstance) Height() uint64 {
return d.blockNum
}
// Init is used to signal the backend that a new round is going to start.
func (d *daSequenceInstance) Init(*consensus.RoundInfo) {
}
// Insert inserts the sealed proposal
func (d *daSequenceInstance) Insert(p *consensus.SealedProposal) error {
d.final = p
return nil
}
// IsStuck returns whether the pbft is stucked
func (d *daSequenceInstance) IsStuck(num uint64) (uint64, bool) {
return 0, false
}
// Validate validates a raw proposal (used if non-proposer)
func (d *daSequenceInstance) Validate(proposal *consensus.Proposal) error {
var header DaHeader
if err := json.Unmarshal(proposal.Data, &header); err != nil {
panic(err)
}
if err := d.validateBids(&header); err != nil {
d.logger.Error("failed to validate bid header", "err", err)
return err
}
return nil
}
// ValidatorSet returns the validator set for the current round
func (d *daSequenceInstance) ValidatorSet() consensus.ValidatorSet {
return valSet
}
// ValidateCommit is used to validate that a given commit is valid
func (d *daSequenceInstance) ValidateCommit(from consensus.NodeID, seal []byte) error {
return nil
}
type validatorSet struct {
nodes []consensus.NodeID
}
func (v *validatorSet) CalcProposer(round uint64) consensus.NodeID {
// the proposer is always the first node
return v.nodes[0]
}
func (v *validatorSet) Includes(id consensus.NodeID) bool {
for _, n := range v.nodes {
if n == id {
return true
}
}
return false
}
func (v *validatorSet) Len() int {
return len(v.nodes)
}
func (v *validatorSet) VotingPower() map[consensus.NodeID]uint64 {
votes := map[consensus.NodeID]uint64{}
for _, n := range v.nodes {
votes[n] = 1
}
return votes
}
// signKey is a mock signing key for consensus messages
type signKey struct {
id consensus.NodeID
}
func (s *signKey) NodeID() consensus.NodeID {
return s.id
}
func (s *signKey) Sign(b []byte) ([]byte, error) {
return []byte(s.id), nil
}
@ferranbt
Copy link
Author

Start three instances of the Da-store

$ go run main.go --id a --emit --seal
$ go run main.go --id b --emit --seal
$ go run main.go --id c --emit --seal

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment