Last active
April 25, 2018 07:37
-
-
Save quake/a395da438d8110acb5acee517b84ec1f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[dev-dependencies] | |
tokio-core = "0.1" | |
libp2p-tcp-transport = { path = "../tcp-transport"} | |
libp2p-peerstore = { path = "../peerstore"} | |
multiplex = { path = "../multiplex-rs" } | |
rand = "0.4" | |
env_logger = "0.5.4" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate futures; | |
extern crate libp2p_swarm; | |
extern crate libp2p_floodsub; | |
extern crate libp2p_tcp_transport; | |
extern crate libp2p_peerstore; | |
extern crate multiplex; | |
extern crate tokio_core; | |
extern crate tokio_io; | |
extern crate rand; | |
extern crate env_logger; | |
use libp2p_swarm::{swarm, Multiaddr, MuxedTransport, StreamMuxer, Transport, SwarmController}; | |
use libp2p_tcp_transport::TcpConfig; | |
use libp2p_floodsub::{FloodSubUpgrade, TopicBuilder, FloodSubController, FloodSubReceiver}; | |
use tokio_core::reactor::Core; | |
use tokio_io::codec::length_delimited::Framed; | |
use std::sync::{atomic, mpsc}; | |
use std::thread; | |
use std::time::Duration; | |
use futures::sync::oneshot; | |
use libp2p_peerstore::PeerId; | |
use futures::{Future, Stream}; | |
use multiplex::MultiplexConfig; | |
#[test] | |
fn basic_floodsub() { | |
let _ = env_logger::try_init(); | |
let (addr_tx, addr_rx) = mpsc::channel(); | |
let (msg_tx, msg_rx) = mpsc::channel::<&[u8]>(); | |
let bg_thread = thread::spawn(move || { | |
let my_id = { | |
let key = (0..2048).map(|_| rand::random::<u8>()).collect::<Vec<_>>(); | |
PeerId::from_public_key(&key) | |
}; | |
let (floodsub_upgrade, floodsub_rx) = FloodSubUpgrade::new(my_id); | |
let mut core = Core::new().unwrap(); | |
let transport = TcpConfig::new(core.handle()).with_upgrade(floodsub_upgrade.clone()); | |
let (listener, addr) = transport | |
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) | |
.unwrap(); | |
addr_tx.send(addr).unwrap(); | |
let future = listener | |
.into_future() | |
.map_err(|(err, _)| err) | |
.and_then(|(client, _)| client.unwrap().map(|v| v.0)) | |
.and_then(|floodsub_future| floodsub_future); | |
let topic = TopicBuilder::new("test").build(); | |
let floodsub_ctl = FloodSubController::new(&floodsub_upgrade); | |
floodsub_ctl.publish(&topic, msg_rx.recv().unwrap().to_vec()); | |
core.run(future); | |
}); | |
let my_id = { | |
let key = (0..2048).map(|_| rand::random::<u8>()).collect::<Vec<_>>(); | |
PeerId::from_public_key(&key) | |
}; | |
let (floodsub_upgrade, floodsub_rx) = FloodSubUpgrade::new(my_id); | |
let mut core = Core::new().unwrap(); | |
let transport = TcpConfig::new(core.handle()).with_upgrade(floodsub_upgrade.clone()); | |
let future = transport | |
.dial(addr_rx.recv().unwrap()) | |
.unwrap_or_else(|_| panic!()) | |
.and_then(|(floodsub_future, _)| floodsub_future ); | |
let topic = TopicBuilder::new("test").build(); | |
let floodsub_ctl = FloodSubController::new(&floodsub_upgrade); | |
floodsub_ctl.subscribe(&topic); | |
msg_tx.send(b"hello"); | |
let rx = floodsub_rx.for_each(|msg| { | |
assert_eq!(b"hello".to_vec(), msg.data); | |
Ok(()) | |
}); | |
core.run(future.select(rx)); | |
bg_thread.join().unwrap(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DEBUG 2018-04-25T07:33:55Z: libp2p_floodsub: Queuing sub/unsub message ; sub = ["5zKtTsxw"] ; unsub = [] | |
DEBUG 2018-04-25T07:33:55Z: libp2p_floodsub: Message queued for 0 remotes | |
DEBUG 2018-04-25T07:33:55Z: libp2p_floodsub: Queueing publish message ; topics = ["5zKtTsxw"] ; data_len = 5 | |
DEBUG 2018-04-25T07:33:55Z: libp2p_floodsub: Message queued for 0 remotes | |
DEBUG 2018-04-25T07:33:55Z: libp2p_floodsub: Upgrading connection to /ip4/127.0.0.1/tcp/64984 as floodsub | |
TRACE 2018-04-25T07:33:55Z: libp2p_floodsub: Effectively sending message to remote | |
DEBUG 2018-04-25T07:33:55Z: libp2p_floodsub: Upgrading connection to /ip4/127.0.0.1/tcp/64983 as floodsub | |
TRACE 2018-04-25T07:33:55Z: libp2p_floodsub: Effectively sending message to remote | |
TRACE 2018-04-25T07:33:55Z: libp2p_floodsub: Received packet from /ip4/127.0.0.1/tcp/64983 | |
TRACE 2018-04-25T07:33:55Z: libp2p_floodsub: Received packet from /ip4/127.0.0.1/tcp/64984 | |
TRACE 2018-04-25T07:33:55Z: libp2p_floodsub: Remote /ip4/127.0.0.1/tcp/64984 subscribed to TopicHash { hash: "5zKtTsxw" } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment