Skip to content

Instantly share code, notes, and snippets.

@popsUlfr
Created June 28, 2021 10:37
Show Gist options
  • Save popsUlfr/3cab45cc6203e10d11942f16f82c65c1 to your computer and use it in GitHub Desktop.
Save popsUlfr/3cab45cc6203e10d11942f16f82c65c1 to your computer and use it in GitHub Desktop.
libp2p minimal pubsub
package main
import (
"context"
"errors"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
discovery "github.com/libp2p/go-libp2p-discovery"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
)
const RendezvousString = "2d2yzgRJGsOVggRw"
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
// Generate key pair for identity
priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1)
if err != nil {
log.Println(err)
return
}
bootstrapPeerAddrs := dht.GetDefaultBootstrapPeerAddrInfos()
var dhtIpfs *dht.IpfsDHT
h, err := libp2p.New(ctx,
libp2p.DefaultListenAddrs,
libp2p.DefaultTransports,
libp2p.DefaultMuxers,
libp2p.DefaultSecurity,
libp2p.DefaultPeerstore,
libp2p.DefaultEnableRelay,
libp2p.Identity(priv),
libp2p.ListenAddrStrings(
"/ip4/0.0.0.0/udp/0/quic",
"/ip6/::/udp/0/quic",
),
libp2p.Transport(libp2pquic.NewTransport),
libp2p.DefaultStaticRelays(),
libp2p.EnableAutoRelay(),
libp2p.EnableNATService(),
libp2p.NATPortMap(),
libp2p.ConnectionManager(connmgr.NewConnManager(20, 40, time.Minute)),
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
var err error
dhtIpfs, err = dht.New(ctx, h,
dht.Mode(dht.ModeServer),
dht.BootstrapPeers(bootstrapPeerAddrs...))
return dhtIpfs, err
}))
if err != nil {
log.Println(err)
return
}
defer h.Close()
log.Println("Host ID:", h.ID())
log.Println("ListenAddresses:", h.Network().ListenAddresses())
err = dhtIpfs.Bootstrap(ctx)
if err != nil {
log.Println(err)
return
}
var wg sync.WaitGroup
for _, peerAddr := range bootstrapPeerAddrs {
wg.Add(1)
go func(peerAddr peer.AddrInfo) {
defer wg.Done()
err := h.Connect(ctx, peerAddr)
if err != nil {
log.Println("Bootstrap connect error:", err, peerAddr)
} else {
log.Println("Bootstrap connect:", peerAddr)
}
}(peerAddr)
}
wg.Wait()
routingDiscovery := discovery.NewRoutingDiscovery(dhtIpfs)
ps, err := pubsub.NewGossipSub(ctx, h, pubsub.WithDiscovery(routingDiscovery))
if err != nil {
log.Println(err)
return
}
log.Println("Advertising and finding peers...")
wg.Add(1)
go func() {
defer wg.Done()
for ctx.Err() == nil {
_, err := routingDiscovery.Advertise(ctx, RendezvousString)
if err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
log.Println(err)
return
} else {
log.Println("Advertise failed retrying...")
}
} else {
break
}
}
// see limit option
peerAddrsChan, err := routingDiscovery.FindPeers(ctx, RendezvousString)
if err != nil {
log.Println(err)
return
}
for {
select {
case peerAddr := <-peerAddrsChan:
// most the peers are empty!
if (len(peerAddr.Addrs) == 0 && peerAddr.ID == "") || peerAddr.ID == h.ID() {
continue
}
err := h.Connect(ctx, peerAddr)
if err != nil {
log.Println("Routing connect error:", err, peerAddr)
} else {
log.Println("Routing connect:", peerAddr)
}
case <-ctx.Done():
return
}
}
}()
defer wg.Wait()
topic, err := ps.Join(RendezvousString)
if err != nil {
log.Println(err)
return
}
defer topic.Close()
for ctx.Err() == nil {
log.Println("Sending message...")
// err = topic.Publish(ctx, []byte("hello world"),
// pubsub.WithReadiness(func(rt pubsub.PubSubRouter, topic string) (bool, error) {
// return rt.EnoughPeers(topic, 1), nil
// }))
err = topic.Publish(ctx, []byte("hello world"))
if err != nil {
log.Println(err)
return
}
select {
case <-time.After(2 * time.Second):
case <-ctx.Done():
return
}
}
}
package main
import (
"context"
"errors"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
discovery "github.com/libp2p/go-libp2p-discovery"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
)
const RendezvousString = "2d2yzgRJGsOVggRw"
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
// Generate key pair for identity
priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1)
if err != nil {
log.Println(err)
return
}
bootstrapPeerAddrs := dht.GetDefaultBootstrapPeerAddrInfos()
var dhtIpfs *dht.IpfsDHT
h, err := libp2p.New(ctx,
libp2p.DefaultListenAddrs,
libp2p.DefaultTransports,
libp2p.DefaultMuxers,
libp2p.DefaultSecurity,
libp2p.DefaultPeerstore,
libp2p.DefaultEnableRelay,
libp2p.Identity(priv),
libp2p.ListenAddrStrings(
"/ip4/0.0.0.0/udp/0/quic",
"/ip6/::/udp/0/quic",
),
libp2p.Transport(libp2pquic.NewTransport),
libp2p.DefaultStaticRelays(),
libp2p.EnableAutoRelay(),
libp2p.EnableNATService(),
libp2p.NATPortMap(),
libp2p.ConnectionManager(connmgr.NewConnManager(20, 40, time.Minute)),
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
var err error
dhtIpfs, err = dht.New(ctx, h,
dht.Mode(dht.ModeServer),
dht.BootstrapPeers(bootstrapPeerAddrs...))
return dhtIpfs, err
}))
if err != nil {
log.Println(err)
return
}
defer h.Close()
log.Println("Host ID:", h.ID())
log.Println("ListenAddresses:", h.Network().ListenAddresses())
err = dhtIpfs.Bootstrap(ctx)
if err != nil {
log.Println(err)
return
}
var wg sync.WaitGroup
for _, peerAddr := range bootstrapPeerAddrs {
wg.Add(1)
go func(peerAddr peer.AddrInfo) {
defer wg.Done()
err := h.Connect(ctx, peerAddr)
if err != nil {
log.Println("Bootstrap connect error:", err, peerAddr)
} else {
log.Println("Bootstrap connect:", peerAddr)
}
}(peerAddr)
}
wg.Wait()
routingDiscovery := discovery.NewRoutingDiscovery(dhtIpfs)
ps, err := pubsub.NewGossipSub(ctx, h, pubsub.WithDiscovery(routingDiscovery))
if err != nil {
log.Println(err)
return
}
log.Println("Advertising and finding peers...")
wg.Add(1)
go func() {
defer wg.Done()
for ctx.Err() == nil {
_, err := routingDiscovery.Advertise(ctx, RendezvousString)
if err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
log.Println(err)
return
} else {
log.Println("Advertise failed retrying...")
}
} else {
break
}
}
// see limit option
peerAddrsChan, err := routingDiscovery.FindPeers(ctx, RendezvousString)
if err != nil {
log.Println(err)
return
}
for {
select {
case peerAddr := <-peerAddrsChan:
// most of the peers are empty!
if (len(peerAddr.Addrs) == 0 && peerAddr.ID == "") || peerAddr.ID == h.ID() {
continue
}
err := h.Connect(ctx, peerAddr)
if err != nil {
log.Println("Routing connect error:", err, peerAddr)
} else {
log.Println("Routing connect:", peerAddr)
}
case <-ctx.Done():
return
}
}
}()
defer wg.Wait()
topic, err := ps.Join(RendezvousString)
if err != nil {
log.Println(err)
return
}
defer topic.Close()
sub, err := topic.Subscribe()
if err != nil {
log.Println(err)
return
}
defer sub.Cancel()
log.Println("Listening on messages...")
for ctx.Err() == nil {
mess, err := sub.Next(ctx)
if err != nil {
log.Println(err)
return
}
if mess.GetFrom() == h.ID() {
continue
}
log.Println("From:", mess.GetFrom())
log.Println("Data:", string(mess.GetData()))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment