Created
May 3, 2018 14:40
-
-
Save amitkot/43dabe5284f853f892ae0cfca7e18871 to your computer and use it in GitHub Desktop.
Alternative version of all_token_prices that uses gather
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import itertools | |
from typing import List, Any, Iterable | |
from pricemonitor.config import Coin | |
from pricemonitor.producing.data_producer import DataProducer, PairPrice | |
from pricemonitor.producing.exchange_prices import ExchangePrices | |
from pricemonitor.producing.feed_prices import FeedPrices | |
class AllTokenPrices(DataProducer): | |
def __init__(self, coins: List[Coin], market: Coin, exchange_data_action=None) -> None: | |
super().__init__(coins=coins, market=market) | |
self._sources = [ | |
ExchangePrices(coins=coins, market=market, exchange_data_action=exchange_data_action), | |
FeedPrices(coins=coins, market=market) | |
] | |
async def get_data(self, loop) -> List[PairPrice]: | |
sources = (source.get_data(loop) for source in self._sources) | |
price_pair_lists_and_exceptions = await asyncio.gather(*sources, return_exceptions=True) | |
price_pair_lists = self._remove_exceptions(price_pair_lists_and_exceptions) | |
# TODO: return an itertools instead of list | |
return list(itertools.chain(*price_pair_lists)) | |
@staticmethod | |
def _remove_exceptions(prices_with_failures: List[Any]) -> Iterable[PairPrice]: | |
return (price for price in prices_with_failures if not issubclass(price.__class__, Exception)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment