Last active
December 19, 2016 04:55
-
-
Save colspan/8c89b12e990bf4fa72ef to your computer and use it in GitHub Desktop.
データフロー制御フレームワークLuigiを使ってビッグデータ解析をする ref: http://qiita.com/colspan/items/453aeec7f4f420b91241
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sudo pip install luigi |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
python top_artists.py Top10Artists --date-interval 2015-03 --local-scheduler |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
python top_artists.py Top10Artists --date-interval 2015-03 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Top10Artists(luigi.Task): | |
""" | |
This task runs over the target data returned by :py:meth:`~/.AggregateArtists.output` or | |
:py:meth:`~/.AggregateArtistsHadoop.output` in case :py:attr:`~/.Top10Artists.use_hadoop` is set and | |
writes the result into its :py:meth:`~.Top10Artists.output` target (a file in local filesystem). | |
""" | |
date_interval = luigi.DateIntervalParameter() | |
use_hadoop = luigi.BoolParameter() | |
def requires(self): | |
""" | |
This task's dependencies: | |
* :py:class:`~.AggregateArtists` or | |
* :py:class:`~.AggregateArtistsHadoop` if :py:attr:`~/.Top10Artists.use_hadoop` is set. | |
:return: object (:py:class:`luigi.task.Task`) | |
""" | |
if self.use_hadoop: | |
return AggregateArtistsHadoop(self.date_interval) | |
else: | |
return AggregateArtists(self.date_interval) | |
def output(self): | |
""" | |
Returns the target output for this task. | |
In this case, a successful execution of this task will create a file on the local filesystem. | |
:return: the target output for this task. | |
:rtype: object (:py:class:`luigi.target.Target`) | |
""" | |
return luigi.LocalTarget("data/top_artists_%s.tsv" % self.date_interval) | |
def run(self): | |
top_10 = nlargest(10, self._input_iterator()) | |
with self.output().open('w') as out_file: | |
for streams, artist in top_10: | |
out_line = '\t'.join([ | |
str(self.date_interval.date_a), | |
str(self.date_interval.date_b), | |
artist, | |
str(streams) | |
]) | |
out_file.write((out_line + '\n')) | |
def _input_iterator(self): | |
with self.input().open('r') as in_file: | |
for line in in_file: | |
artist, streams = line.strip().split() | |
yield int(streams), artist |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment