Skip to content

Instantly share code, notes, and snippets.

@amitkot
Created May 3, 2018 14:40
Show Gist options
  • Save amitkot/43dabe5284f853f892ae0cfca7e18871 to your computer and use it in GitHub Desktop.
Save amitkot/43dabe5284f853f892ae0cfca7e18871 to your computer and use it in GitHub Desktop.
Alternative version of all_token_prices that uses gather
import asyncio
import itertools
from typing import List, Any, Iterable
from pricemonitor.config import Coin
from pricemonitor.producing.data_producer import DataProducer, PairPrice
from pricemonitor.producing.exchange_prices import ExchangePrices
from pricemonitor.producing.feed_prices import FeedPrices
class AllTokenPrices(DataProducer):
def __init__(self, coins: List[Coin], market: Coin, exchange_data_action=None) -> None:
super().__init__(coins=coins, market=market)
self._sources = [
ExchangePrices(coins=coins, market=market, exchange_data_action=exchange_data_action),
FeedPrices(coins=coins, market=market)
]
async def get_data(self, loop) -> List[PairPrice]:
sources = (source.get_data(loop) for source in self._sources)
price_pair_lists_and_exceptions = await asyncio.gather(*sources, return_exceptions=True)
price_pair_lists = self._remove_exceptions(price_pair_lists_and_exceptions)
# TODO: return an itertools instead of list
return list(itertools.chain(*price_pair_lists))
@staticmethod
def _remove_exceptions(prices_with_failures: List[Any]) -> Iterable[PairPrice]:
return (price for price in prices_with_failures if not issubclass(price.__class__, Exception))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment