Created
September 26, 2023 13:35
-
-
Save ferranbt/052b871224cc370829eeb698ee2a5674 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"crypto/rand" | |
"crypto/sha512" | |
"encoding/hex" | |
"encoding/json" | |
"flag" | |
"fmt" | |
mrand "math/rand" | |
"os" | |
"os/signal" | |
"sync" | |
"syscall" | |
"time" | |
consensus "github.com/0xPolygon/pbft-consensus" | |
"github.com/hashicorp/go-hclog" | |
memdb "github.com/hashicorp/go-memdb" | |
"github.com/libp2p/go-libp2p" | |
pubsub "github.com/libp2p/go-libp2p-pubsub" | |
"github.com/libp2p/go-libp2p/core/crypto" | |
"github.com/libp2p/go-libp2p/core/host" | |
"github.com/libp2p/go-libp2p/core/network" | |
"github.com/libp2p/go-libp2p/core/peer" | |
"github.com/libp2p/go-libp2p/p2p/security/noise" | |
"github.com/multiformats/go-multiaddr" | |
) | |
type valNode struct { | |
addr string | |
priv crypto.PrivKey | |
id string | |
port uint64 | |
} | |
func generateNode(id string, port uint64, addr string) valNode { | |
r := mrand.New(mrand.NewSource(int64(port))) | |
// You need to use Ed25519 for deterministic keys! | |
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 256, r) | |
if err != nil { | |
panic(err) | |
} | |
return valNode{ | |
addr: addr, | |
id: id, | |
priv: priv, | |
port: port, | |
} | |
} | |
var valNodes = map[string]valNode{ | |
"a": generateNode("a", 30303, "/ip4/127.0.0.1/tcp/30303/p2p/12D3KooWEKauHBKgW9HryEQS72cUDkWwyajF6V2p8qyW6rKinw5R"), | |
"b": generateNode("b", 30304, "/ip4/127.0.0.1/tcp/30304/p2p/12D3KooWPRvetqQJYw7ASHkqDrMrsRSugn8MRwSqk7h9k9wXTKfJ"), | |
"c": generateNode("c", 30305, "/ip4/127.0.0.1/tcp/30305/p2p/12D3KooWSnGnTPSDN6b9wGJZz4aWKQq6SP4DeA1XSyzTWQDpTEgB"), | |
} | |
// the validator set is 5 nodes such that we simulate only 3, the minimum required to work. | |
// otherwise, the proposer (since it is fixed) would start on its own and not wait for the other | |
// nodes. That is not good because there is no sync mechanism for the other nodes to catch up. | |
var valSet = &validatorSet{ | |
nodes: []consensus.NodeID{"a", "b", "c", "d", "e"}, | |
} | |
func main() { | |
config := &Config{} | |
var emit bool | |
var seal bool | |
flag.StringVar(&config.ID, "id", "a", "") | |
flag.BoolVar(&emit, "emit", false, "") | |
flag.BoolVar(&seal, "seal", false, "") | |
flag.Parse() | |
node := valNodes[config.ID] | |
config.PrivID = node.priv | |
config.P2PPort = node.port | |
logger := hclog.New(&hclog.LoggerOptions{Output: os.Stdout, Level: hclog.Info}) | |
da := NewDaStore(logger, config) | |
if seal { | |
da.run(context.Background()) | |
} | |
for id, n := range valNodes { | |
if id == config.ID { | |
continue | |
} | |
if err := da.join(n.addr); err != nil { | |
logger.Error("failed to connect", "addr", n.addr) | |
} | |
} | |
if emit { | |
if seal { | |
// wait for the consensus to have started, this is, all the nodes connected to each | |
// other. Otherwise, since there is no sync mechanism for the bids, the leader will | |
// have bids that the other nodes do not have. An alternative is to start the leader (node 'a') | |
// as the last element. | |
logger.Info("waiting for consensus to start to emit bids") | |
<-da.consensusStartedCh | |
} | |
logger.Info("emitting bids over p2p") | |
go func() { | |
for { | |
time.Sleep(1 * time.Second) | |
data := make([]byte, 100) | |
rand.Read(data) | |
bid := Bid{ | |
Data: data, | |
} | |
bid.Id = bid.getHash() | |
logger.Info("send bid", "id", bid.Id) | |
da.sendBid(bid) | |
} | |
}() | |
} | |
signalCh := make(chan os.Signal, 4) | |
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) | |
<-signalCh | |
da.Close() | |
} | |
type Config struct { | |
PrivID crypto.PrivKey | |
ID string | |
P2PPort uint64 | |
} | |
type DaStore struct { | |
logger hclog.Logger | |
config *Config | |
p2p host.Host | |
pubsub *pubsub.PubSub | |
consensusTopic *pubsub.Topic | |
newPeerCh chan struct{} | |
state *memdb.MemDB | |
mempool *mempool | |
mempoolTopic *pubsub.Topic | |
consensusStartedCh chan struct{} | |
} | |
func NewDaStore(logger hclog.Logger, config *Config) *DaStore { | |
ctx := context.Background() | |
listenAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", "0.0.0.0", config.P2PPort)) | |
if err != nil { | |
panic(err) | |
} | |
host, err := libp2p.New( | |
libp2p.Identity(config.PrivID), | |
libp2p.Security(noise.ID, noise.New), | |
libp2p.ListenAddrs(listenAddr), | |
) | |
if err != nil { | |
panic(err) | |
} | |
peerInfo := peer.AddrInfo{ | |
ID: host.ID(), | |
Addrs: host.Addrs(), | |
} | |
addrs, err := peer.AddrInfoToP2pAddrs(&peerInfo) | |
if err != nil { | |
panic(err) | |
} | |
for _, addr := range addrs { | |
logger.Info("p2p bind", "addr", addr) | |
} | |
ps, err := pubsub.NewGossipSub(ctx, host) | |
if err != nil { | |
panic(err) | |
} | |
state, err := memdb.NewMemDB(schema) | |
if err != nil { | |
panic(err) | |
} | |
da := &DaStore{ | |
logger: logger, | |
config: config, | |
p2p: host, | |
pubsub: ps, | |
state: state, | |
newPeerCh: make(chan struct{}), | |
mempool: newMempool(), | |
consensusStartedCh: make(chan struct{}), | |
} | |
da.setupMempool() | |
// watch for new connected peers | |
host.Network().Notify(&network.NotifyBundle{ | |
ConnectedF: func(net network.Network, conn network.Conn) { | |
logger.Info("new peer connected", "id", conn.LocalMultiaddr()) | |
da.notifyNewPeer() | |
}, | |
}) | |
return da | |
} | |
var schema = &memdb.DBSchema{ | |
Tables: map[string]*memdb.TableSchema{ | |
"bid": { | |
Name: "bid", | |
Indexes: map[string]*memdb.IndexSchema{ | |
"id": { | |
Name: "id", | |
Unique: true, | |
Indexer: &memdb.StringFieldIndex{Field: "Id"}, | |
}, | |
}, | |
}, | |
}, | |
} | |
func (d *DaStore) getBidById(id string) (*Bid, bool) { | |
txn := d.state.Txn(false) | |
defer txn.Abort() | |
raw, err := txn.First("bid", id) | |
if err != nil { | |
panic(err) | |
} | |
if raw == nil { | |
return nil, false | |
} | |
return raw.(*Bid), true | |
} | |
func (d *DaStore) setupMempool() { | |
topic, err := d.pubsub.Join("mempool") | |
if err != nil { | |
panic(err) | |
} | |
d.mempoolTopic = topic | |
sub, err := topic.Subscribe() | |
if err != nil { | |
panic(err) | |
} | |
go func() { | |
for { | |
rawMsg, err := sub.Next(context.Background()) | |
if err != nil { | |
panic(err) | |
} | |
var msg BidPoolMsg | |
if err := json.Unmarshal(rawMsg.Data, &msg); err != nil { | |
panic(err) | |
} | |
if msg.Sender == d.config.ID { | |
continue | |
} | |
d.logger.Info("received new bid from pool", "id", msg.Bid.Id, "from", msg.Sender) | |
d.mempool.add(msg.Bid) | |
} | |
}() | |
} | |
type BidPoolMsg struct { | |
Bid Bid | |
Sender string | |
} | |
func (d *DaStore) sendBid(bid Bid) { | |
d.mempool.add(bid) | |
msg := &BidPoolMsg{ | |
Bid: bid, | |
Sender: d.config.ID, | |
} | |
// send over p2p network | |
rawMsg, err := json.Marshal(msg) | |
if err != nil { | |
panic(err) | |
} | |
if err := d.mempoolTopic.Publish(context.Background(), rawMsg); err != nil { | |
panic(err) | |
} | |
} | |
type mempool struct { | |
bids []Bid | |
lock sync.Mutex | |
} | |
func newMempool() *mempool { | |
return &mempool{bids: []Bid{}} | |
} | |
func (m *mempool) add(bid Bid) { | |
m.lock.Lock() | |
m.bids = append(m.bids, bid) | |
m.lock.Unlock() | |
} | |
func (m *mempool) validateBundle(header *DaHeader) error { | |
m.lock.Lock() | |
defer m.lock.Unlock() | |
idSet := make(map[string]bool) | |
for _, bid := range m.bids { | |
idSet[bid.Id] = true | |
} | |
for _, bidId := range header.Bids { | |
if _, ok := idSet[bidId]; !ok { | |
return fmt.Errorf("bid not found %s", bidId) | |
} | |
} | |
return nil | |
} | |
func (m *mempool) cleanBundle(header *DaHeader) []Bid { | |
if err := m.validateBundle(header); err != nil { | |
panic(err) | |
} | |
m.lock.Lock() | |
defer m.lock.Unlock() | |
// Create a map to efficiently check if an ID should be removed | |
idSet := make(map[string]bool) | |
for _, id := range header.Bids { | |
idSet[id] = true | |
} | |
// Create a new slice to store the filtered bids | |
var filteredBids []Bid | |
// Iterate through the original list of bids | |
resBids := []Bid{} | |
for _, bid := range m.bids { | |
// Check if the bid's ID should be removed | |
if !idSet[bid.Id] { | |
// If not, add it to the filtered list | |
filteredBids = append(filteredBids, bid) | |
} else { | |
resBids = append(resBids, bid) | |
} | |
} | |
m.bids = filteredBids | |
return resBids | |
} | |
var numHeaderBids = uint64(10) | |
func (m *mempool) generateBundle() *DaHeader { | |
m.lock.Lock() | |
num := numHeaderBids | |
if size := uint64(len(m.bids)); size < num { | |
num = size | |
} | |
bids := m.bids[:num] | |
m.lock.Unlock() | |
// build the header with the ids of the bids | |
bidIDs := []string{} | |
for _, bid := range bids { | |
bidIDs = append(bidIDs, bid.Id) | |
} | |
return &DaHeader{ | |
Bids: bidIDs, | |
} | |
} | |
func (d *DaStore) join(addrRaw string) error { | |
addr, err := peer.AddrInfoFromString(addrRaw) | |
if err != nil { | |
return err | |
} | |
if err := d.p2p.Connect(context.Background(), *addr); err != nil { | |
return err | |
} | |
d.notifyNewPeer() | |
return nil | |
} | |
func (d *DaStore) notifyNewPeer() { | |
select { | |
case d.newPeerCh <- struct{}{}: | |
default: | |
} | |
} | |
// Gossip implements the pbft-consensus transport interface | |
func (d *DaStore) Gossip(msg *consensus.MessageReq) error { | |
rawData, err := json.Marshal(msg) | |
if err != nil { | |
return err | |
} | |
if err := d.consensusTopic.Publish(context.Background(), rawData); err != nil { | |
return err | |
} | |
return nil | |
} | |
func (d *DaStore) run(ctx context.Context) { | |
go d.runConsensus(ctx) | |
} | |
// run runs a PBFT consensus protocol | |
func (d *DaStore) runConsensus(ctx context.Context) { | |
var err error | |
d.consensusTopic, err = d.pubsub.Join("consensus") | |
if err != nil { | |
panic(err) | |
} | |
d.logger.Info("consensus running", "id", d.config.ID) | |
// initialize the pbft-consensus library | |
key := &signKey{ | |
id: consensus.NodeID(d.config.ID), | |
} | |
pbftConsensus := consensus.New(key, d) | |
sub, err := d.consensusTopic.Subscribe() | |
if err != nil { | |
panic(err) | |
} | |
go func() { | |
for { | |
rawMsg, err := sub.Next(ctx) | |
if err != nil { | |
panic(err) | |
} | |
var msg consensus.MessageReq | |
if err := json.Unmarshal(rawMsg.Data, &msg); err != nil { | |
panic(err) | |
} | |
pbftConsensus.PushMessage(&msg) | |
} | |
}() | |
// wait for enough peers to be connected | |
for { | |
select { | |
case <-d.newPeerCh: | |
case <-ctx.Done(): | |
return | |
} | |
// wait until you have connected with the other nodes in the network | |
if len(d.p2p.Network().Peers()) >= len(valNodes)-1 { | |
break | |
} | |
} | |
d.logger.Info("consensus starting") | |
close(d.consensusStartedCh) | |
// start the consensus protocol | |
blockNum := uint64(0) | |
for { | |
select { | |
case <-time.After(1 * time.Second): | |
case <-ctx.Done(): | |
return | |
} | |
d.logger.Info("Sealing block", "num", blockNum) | |
instance := &daSequenceInstance{ | |
logger: d.logger, | |
blockNum: blockNum, | |
timestamp: time.Now(), | |
header: d.mempool.generateBundle(), | |
validateBids: d.mempool.validateBundle, | |
} | |
pbftConsensus.SetBackend(instance) | |
pbftConsensus.Run(ctx) | |
d.insertFinalizedProposal(instance.final) | |
blockNum++ | |
} | |
} | |
func (d *DaStore) insertFinalizedProposal(proposal *consensus.SealedProposal) { | |
// clean the memory pool from the finalized bids | |
// it should not fail because the `Validate` stage made sure that | |
// all the transactions are in the pool | |
var header DaHeader | |
if err := json.Unmarshal(proposal.Proposal.Data, &header); err != nil { | |
panic(err) | |
} | |
d.logger.Info("finalized header", "bids", len(header.Bids)) | |
bids := d.mempool.cleanBundle(&header) | |
// insert the data in the store and the indexer | |
txn := d.state.Txn(true) | |
for _, bid := range bids { | |
if err := txn.Insert("bid", bid); err != nil { | |
panic(err) | |
} | |
} | |
txn.Commit() | |
} | |
func (d *DaStore) Close() { | |
} | |
type DaHeader struct { | |
Number uint64 | |
Bids []string | |
} | |
type DaBlock struct { | |
Header *DaHeader | |
Bids []Bid | |
} | |
type Bid struct { | |
Id string | |
Data []byte | |
} | |
func (b *Bid) getHash() string { | |
hash := sha512.Sum512_224(b.Data) | |
return hex.EncodeToString(hash[:]) | |
} | |
type daSequenceInstance struct { | |
logger hclog.Logger | |
header *DaHeader | |
blockNum uint64 | |
final *consensus.SealedProposal | |
timestamp time.Time | |
validateBids func(bundle *DaHeader) error | |
} | |
// BuildProposal builds a proposal for the current round (used if proposer) | |
func (d *daSequenceInstance) BuildProposal() (*consensus.Proposal, error) { | |
d.header.Number = d.blockNum | |
data, err := json.Marshal(d.header) | |
if err != nil { | |
return nil, err | |
} | |
hash := sha512.Sum512(data) | |
return &consensus.Proposal{Data: data, Hash: hash[:], Time: d.timestamp}, nil | |
} | |
// Height returns the height for the current round | |
func (d *daSequenceInstance) Height() uint64 { | |
return d.blockNum | |
} | |
// Init is used to signal the backend that a new round is going to start. | |
func (d *daSequenceInstance) Init(*consensus.RoundInfo) { | |
} | |
// Insert inserts the sealed proposal | |
func (d *daSequenceInstance) Insert(p *consensus.SealedProposal) error { | |
d.final = p | |
return nil | |
} | |
// IsStuck returns whether the pbft is stucked | |
func (d *daSequenceInstance) IsStuck(num uint64) (uint64, bool) { | |
return 0, false | |
} | |
// Validate validates a raw proposal (used if non-proposer) | |
func (d *daSequenceInstance) Validate(proposal *consensus.Proposal) error { | |
var header DaHeader | |
if err := json.Unmarshal(proposal.Data, &header); err != nil { | |
panic(err) | |
} | |
if err := d.validateBids(&header); err != nil { | |
d.logger.Error("failed to validate bid header", "err", err) | |
return err | |
} | |
return nil | |
} | |
// ValidatorSet returns the validator set for the current round | |
func (d *daSequenceInstance) ValidatorSet() consensus.ValidatorSet { | |
return valSet | |
} | |
// ValidateCommit is used to validate that a given commit is valid | |
func (d *daSequenceInstance) ValidateCommit(from consensus.NodeID, seal []byte) error { | |
return nil | |
} | |
type validatorSet struct { | |
nodes []consensus.NodeID | |
} | |
func (v *validatorSet) CalcProposer(round uint64) consensus.NodeID { | |
// the proposer is always the first node | |
return v.nodes[0] | |
} | |
func (v *validatorSet) Includes(id consensus.NodeID) bool { | |
for _, n := range v.nodes { | |
if n == id { | |
return true | |
} | |
} | |
return false | |
} | |
func (v *validatorSet) Len() int { | |
return len(v.nodes) | |
} | |
func (v *validatorSet) VotingPower() map[consensus.NodeID]uint64 { | |
votes := map[consensus.NodeID]uint64{} | |
for _, n := range v.nodes { | |
votes[n] = 1 | |
} | |
return votes | |
} | |
// signKey is a mock signing key for consensus messages | |
type signKey struct { | |
id consensus.NodeID | |
} | |
func (s *signKey) NodeID() consensus.NodeID { | |
return s.id | |
} | |
func (s *signKey) Sign(b []byte) ([]byte, error) { | |
return []byte(s.id), nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Start three instances of the Da-store