Skip to content

Instantly share code, notes, and snippets.

Created September 26, 2023 13:35
Show Gist options
  • Save ferranbt/052b871224cc370829eeb698ee2a5674 to your computer and use it in GitHub Desktop.
Save ferranbt/052b871224cc370829eeb698ee2a5674 to your computer and use it in GitHub Desktop.
package main
import (
mrand "math/rand"
consensus ""
memdb ""
pubsub ""
type valNode struct {
addr string
priv crypto.PrivKey
id string
port uint64
func generateNode(id string, port uint64, addr string) valNode {
r := mrand.New(mrand.NewSource(int64(port)))
// You need to use Ed25519 for deterministic keys!
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 256, r)
if err != nil {
return valNode{
addr: addr,
id: id,
priv: priv,
port: port,
var valNodes = map[string]valNode{
"a": generateNode("a", 30303, "/ip4/"),
"b": generateNode("b", 30304, "/ip4/"),
"c": generateNode("c", 30305, "/ip4/"),
// the validator set is 5 nodes such that we simulate only 3, the minimum required to work.
// otherwise, the proposer (since it is fixed) would start on its own and not wait for the other
// nodes. That is not good because there is no sync mechanism for the other nodes to catch up.
var valSet = &validatorSet{
nodes: []consensus.NodeID{"a", "b", "c", "d", "e"},
func main() {
config := &Config{}
var emit bool
var seal bool
flag.StringVar(&config.ID, "id", "a", "")
flag.BoolVar(&emit, "emit", false, "")
flag.BoolVar(&seal, "seal", false, "")
node := valNodes[config.ID]
config.PrivID = node.priv
config.P2PPort = node.port
logger := hclog.New(&hclog.LoggerOptions{Output: os.Stdout, Level: hclog.Info})
da := NewDaStore(logger, config)
if seal {
for id, n := range valNodes {
if id == config.ID {
if err := da.join(n.addr); err != nil {
logger.Error("failed to connect", "addr", n.addr)
if emit {
if seal {
// wait for the consensus to have started, this is, all the nodes connected to each
// other. Otherwise, since there is no sync mechanism for the bids, the leader will
// have bids that the other nodes do not have. An alternative is to start the leader (node 'a')
// as the last element.
logger.Info("waiting for consensus to start to emit bids")
logger.Info("emitting bids over p2p")
go func() {
for {
time.Sleep(1 * time.Second)
data := make([]byte, 100)
bid := Bid{
Data: data,
bid.Id = bid.getHash()
logger.Info("send bid", "id", bid.Id)
signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
type Config struct {
PrivID crypto.PrivKey
ID string
P2PPort uint64
type DaStore struct {
logger hclog.Logger
config *Config
p2p host.Host
pubsub *pubsub.PubSub
consensusTopic *pubsub.Topic
newPeerCh chan struct{}
state *memdb.MemDB
mempool *mempool
mempoolTopic *pubsub.Topic
consensusStartedCh chan struct{}
func NewDaStore(logger hclog.Logger, config *Config) *DaStore {
ctx := context.Background()
listenAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", "", config.P2PPort))
if err != nil {
host, err := libp2p.New(
libp2p.Security(noise.ID, noise.New),
if err != nil {
peerInfo := peer.AddrInfo{
ID: host.ID(),
Addrs: host.Addrs(),
addrs, err := peer.AddrInfoToP2pAddrs(&peerInfo)
if err != nil {
for _, addr := range addrs {
logger.Info("p2p bind", "addr", addr)
ps, err := pubsub.NewGossipSub(ctx, host)
if err != nil {
state, err := memdb.NewMemDB(schema)
if err != nil {
da := &DaStore{
logger: logger,
config: config,
p2p: host,
pubsub: ps,
state: state,
newPeerCh: make(chan struct{}),
mempool: newMempool(),
consensusStartedCh: make(chan struct{}),
// watch for new connected peers
ConnectedF: func(net network.Network, conn network.Conn) {
logger.Info("new peer connected", "id", conn.LocalMultiaddr())
return da
var schema = &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
"bid": {
Name: "bid",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "Id"},
func (d *DaStore) getBidById(id string) (*Bid, bool) {
txn := d.state.Txn(false)
defer txn.Abort()
raw, err := txn.First("bid", id)
if err != nil {
if raw == nil {
return nil, false
return raw.(*Bid), true
func (d *DaStore) setupMempool() {
topic, err := d.pubsub.Join("mempool")
if err != nil {
d.mempoolTopic = topic
sub, err := topic.Subscribe()
if err != nil {
go func() {
for {
rawMsg, err := sub.Next(context.Background())
if err != nil {
var msg BidPoolMsg
if err := json.Unmarshal(rawMsg.Data, &msg); err != nil {
if msg.Sender == d.config.ID {
d.logger.Info("received new bid from pool", "id", msg.Bid.Id, "from", msg.Sender)
type BidPoolMsg struct {
Bid Bid
Sender string
func (d *DaStore) sendBid(bid Bid) {
msg := &BidPoolMsg{
Bid: bid,
Sender: d.config.ID,
// send over p2p network
rawMsg, err := json.Marshal(msg)
if err != nil {
if err := d.mempoolTopic.Publish(context.Background(), rawMsg); err != nil {
type mempool struct {
bids []Bid
lock sync.Mutex
func newMempool() *mempool {
return &mempool{bids: []Bid{}}
func (m *mempool) add(bid Bid) {
m.bids = append(m.bids, bid)
func (m *mempool) validateBundle(header *DaHeader) error {
defer m.lock.Unlock()
idSet := make(map[string]bool)
for _, bid := range m.bids {
idSet[bid.Id] = true
for _, bidId := range header.Bids {
if _, ok := idSet[bidId]; !ok {
return fmt.Errorf("bid not found %s", bidId)
return nil
func (m *mempool) cleanBundle(header *DaHeader) []Bid {
if err := m.validateBundle(header); err != nil {
defer m.lock.Unlock()
// Create a map to efficiently check if an ID should be removed
idSet := make(map[string]bool)
for _, id := range header.Bids {
idSet[id] = true
// Create a new slice to store the filtered bids
var filteredBids []Bid
// Iterate through the original list of bids
resBids := []Bid{}
for _, bid := range m.bids {
// Check if the bid's ID should be removed
if !idSet[bid.Id] {
// If not, add it to the filtered list
filteredBids = append(filteredBids, bid)
} else {
resBids = append(resBids, bid)
m.bids = filteredBids
return resBids
var numHeaderBids = uint64(10)
func (m *mempool) generateBundle() *DaHeader {
num := numHeaderBids
if size := uint64(len(m.bids)); size < num {
num = size
bids := m.bids[:num]
// build the header with the ids of the bids
bidIDs := []string{}
for _, bid := range bids {
bidIDs = append(bidIDs, bid.Id)
return &DaHeader{
Bids: bidIDs,
func (d *DaStore) join(addrRaw string) error {
addr, err := peer.AddrInfoFromString(addrRaw)
if err != nil {
return err
if err := d.p2p.Connect(context.Background(), *addr); err != nil {
return err
return nil
func (d *DaStore) notifyNewPeer() {
select {
case d.newPeerCh <- struct{}{}:
// Gossip implements the pbft-consensus transport interface
func (d *DaStore) Gossip(msg *consensus.MessageReq) error {
rawData, err := json.Marshal(msg)
if err != nil {
return err
if err := d.consensusTopic.Publish(context.Background(), rawData); err != nil {
return err
return nil
func (d *DaStore) run(ctx context.Context) {
go d.runConsensus(ctx)
// run runs a PBFT consensus protocol
func (d *DaStore) runConsensus(ctx context.Context) {
var err error
d.consensusTopic, err = d.pubsub.Join("consensus")
if err != nil {
d.logger.Info("consensus running", "id", d.config.ID)
// initialize the pbft-consensus library
key := &signKey{
id: consensus.NodeID(d.config.ID),
pbftConsensus := consensus.New(key, d)
sub, err := d.consensusTopic.Subscribe()
if err != nil {
go func() {
for {
rawMsg, err := sub.Next(ctx)
if err != nil {
var msg consensus.MessageReq
if err := json.Unmarshal(rawMsg.Data, &msg); err != nil {
// wait for enough peers to be connected
for {
select {
case <-d.newPeerCh:
case <-ctx.Done():
// wait until you have connected with the other nodes in the network
if len(d.p2p.Network().Peers()) >= len(valNodes)-1 {
d.logger.Info("consensus starting")
// start the consensus protocol
blockNum := uint64(0)
for {
select {
case <-time.After(1 * time.Second):
case <-ctx.Done():
d.logger.Info("Sealing block", "num", blockNum)
instance := &daSequenceInstance{
logger: d.logger,
blockNum: blockNum,
timestamp: time.Now(),
header: d.mempool.generateBundle(),
validateBids: d.mempool.validateBundle,
func (d *DaStore) insertFinalizedProposal(proposal *consensus.SealedProposal) {
// clean the memory pool from the finalized bids
// it should not fail because the `Validate` stage made sure that
// all the transactions are in the pool
var header DaHeader
if err := json.Unmarshal(proposal.Proposal.Data, &header); err != nil {
d.logger.Info("finalized header", "bids", len(header.Bids))
bids := d.mempool.cleanBundle(&header)
// insert the data in the store and the indexer
txn := d.state.Txn(true)
for _, bid := range bids {
if err := txn.Insert("bid", bid); err != nil {
func (d *DaStore) Close() {
type DaHeader struct {
Number uint64
Bids []string
type DaBlock struct {
Header *DaHeader
Bids []Bid
type Bid struct {
Id string
Data []byte
func (b *Bid) getHash() string {
hash := sha512.Sum512_224(b.Data)
return hex.EncodeToString(hash[:])
type daSequenceInstance struct {
logger hclog.Logger
header *DaHeader
blockNum uint64
final *consensus.SealedProposal
timestamp time.Time
validateBids func(bundle *DaHeader) error
// BuildProposal builds a proposal for the current round (used if proposer)
func (d *daSequenceInstance) BuildProposal() (*consensus.Proposal, error) {
d.header.Number = d.blockNum
data, err := json.Marshal(d.header)
if err != nil {
return nil, err
hash := sha512.Sum512(data)
return &consensus.Proposal{Data: data, Hash: hash[:], Time: d.timestamp}, nil
// Height returns the height for the current round
func (d *daSequenceInstance) Height() uint64 {
return d.blockNum
// Init is used to signal the backend that a new round is going to start.
func (d *daSequenceInstance) Init(*consensus.RoundInfo) {
// Insert inserts the sealed proposal
func (d *daSequenceInstance) Insert(p *consensus.SealedProposal) error { = p
return nil
// IsStuck returns whether the pbft is stucked
func (d *daSequenceInstance) IsStuck(num uint64) (uint64, bool) {
return 0, false
// Validate validates a raw proposal (used if non-proposer)
func (d *daSequenceInstance) Validate(proposal *consensus.Proposal) error {
var header DaHeader
if err := json.Unmarshal(proposal.Data, &header); err != nil {
if err := d.validateBids(&header); err != nil {
d.logger.Error("failed to validate bid header", "err", err)
return err
return nil
// ValidatorSet returns the validator set for the current round
func (d *daSequenceInstance) ValidatorSet() consensus.ValidatorSet {
return valSet
// ValidateCommit is used to validate that a given commit is valid
func (d *daSequenceInstance) ValidateCommit(from consensus.NodeID, seal []byte) error {
return nil
type validatorSet struct {
nodes []consensus.NodeID
func (v *validatorSet) CalcProposer(round uint64) consensus.NodeID {
// the proposer is always the first node
return v.nodes[0]
func (v *validatorSet) Includes(id consensus.NodeID) bool {
for _, n := range v.nodes {
if n == id {
return true
return false
func (v *validatorSet) Len() int {
return len(v.nodes)
func (v *validatorSet) VotingPower() map[consensus.NodeID]uint64 {
votes := map[consensus.NodeID]uint64{}
for _, n := range v.nodes {
votes[n] = 1
return votes
// signKey is a mock signing key for consensus messages
type signKey struct {
id consensus.NodeID
func (s *signKey) NodeID() consensus.NodeID {
func (s *signKey) Sign(b []byte) ([]byte, error) {
return []byte(, nil
Copy link

Start three instances of the Da-store

$ go run main.go --id a --emit --seal
$ go run main.go --id b --emit --seal
$ go run main.go --id c --emit --seal

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment