- name: Kelechi Oliver A.
- email: [email protected]
I added all my solution code to "custom_method", and then invoked it in "run_test".
please note that code comments from the example_test.py file and some methods were removed for clarity.
attached is a screenshot of the test result.
from collections import defaultdict
from test_framework.blocktools import (create_block, create_coinbase)
from test_framework.messages import CInv, MSG_BLOCK
from test_framework.p2p import (
P2PInterface,
msg_block,
msg_getdata,
p2p_lock,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
)
class ExampleTest(BitcoinTestFramework):
def set_test_params(self):
"""Override test parameters for your individual test.
This method must be overridden and num_nodes must be explicitly set."""
self.setup_clean_chain = True
self.num_nodes = 3
# Use self.extra_args to change command-line arguments for the nodes
self.extra_args = [[], ["-logips"], []]
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def setup_network(self):
self.setup_nodes()
self.connect_nodes(0, 1)
self.sync_all(self.nodes[0:2])
# my exercise function
def custom_method(self):
"""create new block on node 1 and send to node 2 test.
this custom function mine new block on node 1 and sends it to node2,
then checking that node 2 received it"""
self.log.info("Running custom_method (node 1 & 2 relationship)")
peer_messaging = self.nodes[0].add_p2p_connection(BaseNode())
# get node 1 height
height = self.nodes[1].getblockcount()
self.log.info("Create a block on node 1")
self.tip = int(self.nodes[1].getbestblockhash(), 16)
self.block_time = self.nodes[1].getblock(self.nodes[1].getbestblockhash())['time'] + 1
block = create_block(self.tip, create_coinbase(height+1), self.block_time)
block.solve()
block_message = msg_block(block)
# Send message is used to send a P2P message to the node over our P2PInterface
peer_messaging.send_message(block_message)
self.tip = block.sha256
self.block_time += 1
height += 1
self.log.info("Connect node2 and node1")
self.connect_nodes(1, 2)
self.log.info("sync block accross nodes")
self.sync_all()
self.log.info("Ensure node 2 gets the new block")
self.nodes[2].waitforblockheight(height)
self.log.info("Assert that node 2 has the new block by checking the height")
assert_equal(self.nodes[2].getblockcount(), height)
def run_test(self):
"""Main test logic"""
# Create P2P connections will wait for a verack to make sure the connection is fully up
peer_messaging = self.nodes[0].add_p2p_connection(BaseNode())
# Generating a block on one of the nodes will get us out of IBD
blocks = [int(self.nodes[0].generate(nblocks=1)[0], 16)]
self.sync_all(self.nodes[0:2])
self.log.info("Starting test!")
# using my function
self.log.info("Calling a custom method")
self.custom_method()
self.log.info("Create some blocks")
self.tip = int(self.nodes[0].getbestblockhash(), 16)
self.block_time = self.nodes[0].getblock(self.nodes[0].getbestblockhash())['time'] + 1
height = self.nodes[0].getblockcount()
for _ in range(10):
# Use the blocktools functionality to manually build a block.
# Calling the generate() rpc is easier, but this allows us to exactly
# control the blocks and transactions.
block = create_block(self.tip, create_coinbase(height+1), self.block_time)
block.solve()
block_message = msg_block(block)
# Send message is used to send a P2P message to the node over our P2PInterface
peer_messaging.send_message(block_message)
self.tip = block.sha256
blocks.append(self.tip)
self.block_time += 1
height += 1
self.log.info("Wait for node1 to reach current tip (height 11) using RPC")
self.nodes[1].waitforblockheight(11)
self.log.info("Connect node2 and node1")
self.connect_nodes(1, 2)
self.log.info("Wait for node2 to receive all the blocks from node1")
self.sync_all()
self.log.info("Calling custom node1 and node2 interaction function")
custom_function()
self.log.info("Add P2P connection to node2")
self.nodes[0].disconnect_p2ps()
peer_receiving = self.nodes[2].add_p2p_connection(BaseNode())
self.log.info("Test that node2 propagates all the blocks to us")
getdata_request = msg_getdata()
for block in blocks:
getdata_request.inv.append(CInv(MSG_BLOCK, block))
peer_receiving.send_message(getdata_request)
peer_receiving.wait_until(lambda: sorted(blocks) == sorted(list(peer_receiving.block_receive_map.keys())), timeout=5)
self.log.info("Check that each block was received only once")
with p2p_lock:
for block in peer_receiving.block_receive_map.values():
assert_equal(block, 1)
if __name__ == '__main__':
ExampleTest().main()