Created
June 28, 2021 10:38
-
-
Save popsUlfr/2c59c16103d16ffa2b93786504c9d79b to your computer and use it in GitHub Desktop.
libp2p minimal streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"context" | |
"errors" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"os/signal" | |
"sync" | |
"syscall" | |
"time" | |
"github.com/libp2p/go-libp2p" | |
connmgr "github.com/libp2p/go-libp2p-connmgr" | |
"github.com/libp2p/go-libp2p-core/crypto" | |
"github.com/libp2p/go-libp2p-core/host" | |
"github.com/libp2p/go-libp2p-core/network" | |
"github.com/libp2p/go-libp2p-core/peer" | |
"github.com/libp2p/go-libp2p-core/protocol" | |
"github.com/libp2p/go-libp2p-core/routing" | |
discovery "github.com/libp2p/go-libp2p-discovery" | |
dht "github.com/libp2p/go-libp2p-kad-dht" | |
libp2pquic "github.com/libp2p/go-libp2p-quic-transport" | |
) | |
const RendezvousString = "2d2yzgRJGsOVggRw" | |
var ProtocolID = protocol.ID(fmt.Sprintf("/%s/1.0.0", RendezvousString)) | |
func handleStream(ctx context.Context, wg *sync.WaitGroup, stream network.Stream) { | |
ctx, cancel := context.WithCancel(ctx) | |
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) | |
wg.Add(3) | |
go func() { | |
defer wg.Done() | |
defer stream.Close() | |
<-ctx.Done() | |
}() | |
go func() { | |
defer wg.Done() | |
defer cancel() | |
for { | |
str, err := rw.ReadString('\n') | |
if err != nil { | |
if !errors.Is(err, io.EOF) { | |
log.Println("Stream read error:", err) | |
} | |
return | |
} | |
if str == "" { | |
return | |
} else if str != "\n" { | |
log.Println("Stream read:", str) | |
} | |
} | |
}() | |
go func() { | |
defer wg.Done() | |
defer cancel() | |
for { | |
_, err := rw.WriteString("hello world\n") | |
if err != nil { | |
log.Println("Stream write error:", err) | |
return | |
} | |
err = rw.Flush() | |
if err != nil { | |
log.Println("Stream flush error:", err) | |
return | |
} | |
select { | |
case <-time.After(2 * time.Second): | |
case <-ctx.Done(): | |
return | |
} | |
} | |
}() | |
} | |
func main() { | |
log.SetFlags(log.LstdFlags | log.Lshortfile) | |
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) | |
defer stop() | |
// Generate key pair for identity | |
priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
bootstrapPeerAddrs := dht.GetDefaultBootstrapPeerAddrInfos() | |
var dhtIpfs *dht.IpfsDHT | |
h, err := libp2p.New(ctx, | |
libp2p.DefaultListenAddrs, | |
libp2p.DefaultTransports, | |
libp2p.DefaultMuxers, | |
libp2p.DefaultSecurity, | |
libp2p.DefaultPeerstore, | |
libp2p.DefaultEnableRelay, | |
libp2p.Identity(priv), | |
libp2p.ListenAddrStrings( | |
"/ip4/0.0.0.0/udp/0/quic", | |
"/ip6/::/udp/0/quic", | |
), | |
libp2p.Transport(libp2pquic.NewTransport), | |
libp2p.DefaultStaticRelays(), | |
libp2p.EnableAutoRelay(), | |
libp2p.EnableNATService(), | |
libp2p.NATPortMap(), | |
libp2p.ConnectionManager(connmgr.NewConnManager(20, 40, time.Minute)), | |
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { | |
var err error | |
dhtIpfs, err = dht.New(ctx, h, | |
dht.Mode(dht.ModeServer), | |
dht.BootstrapPeers(bootstrapPeerAddrs...)) | |
return dhtIpfs, err | |
})) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
defer h.Close() | |
log.Println("Host ID:", h.ID()) | |
log.Println("ListenAddresses:", h.Network().ListenAddresses()) | |
var wg sync.WaitGroup | |
h.SetStreamHandler(ProtocolID, func(s network.Stream) { | |
handleStream(ctx, &wg, s) | |
}) | |
err = dhtIpfs.Bootstrap(ctx) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
for _, peerAddr := range bootstrapPeerAddrs { | |
wg.Add(1) | |
go func(peerAddr peer.AddrInfo) { | |
defer wg.Done() | |
err := h.Connect(ctx, peerAddr) | |
if err != nil { | |
log.Println("Bootstrap connect error:", err, peerAddr) | |
} else { | |
log.Println("Bootstrap connect:", peerAddr) | |
} | |
}(peerAddr) | |
} | |
wg.Wait() | |
routingDiscovery := discovery.NewRoutingDiscovery(dhtIpfs) | |
log.Println("Advertising and finding peers...") | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for ctx.Err() == nil { | |
_, err := routingDiscovery.Advertise(ctx, RendezvousString) | |
if err != nil { | |
if !errors.Is(err, context.DeadlineExceeded) { | |
log.Println(err) | |
return | |
} else { | |
log.Println("Advertise failed retrying...") | |
} | |
} else { | |
break | |
} | |
} | |
// see limit option | |
peerAddrsChan, err := routingDiscovery.FindPeers(ctx, RendezvousString) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
for { | |
select { | |
case peerAddr := <-peerAddrsChan: | |
// most of the peers are empty! | |
if (len(peerAddr.Addrs) == 0 && peerAddr.ID == "") || peerAddr.ID == h.ID() { | |
continue | |
} | |
stream, err := h.NewStream(ctx, peerAddr.ID, ProtocolID) | |
if err != nil { | |
log.Println("Stream error:", err, peerAddr) | |
} else { | |
log.Println("Stream opened:", peerAddr) | |
handleStream(ctx, &wg, stream) | |
} | |
case <-ctx.Done(): | |
return | |
} | |
} | |
}() | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment