-
-
Save thomasnield/19f052485598c6a8dcbdc90df5c8f64c to your computer and use it in GitHub Desktop.
| from rx import Observable, Observer | |
| from collections import defaultdict | |
| users = [ | |
| { "id" : 0, "name" : "Hero" }, | |
| { "id" : 1, "name" : "Dunn" }, | |
| { "id" : 2, "name" : "Sue" }, | |
| { "id" : 3, "name" : "Chi" }, | |
| { "id" : 4, "name" : "Thor" }, | |
| { "id" : 5, "name" : "Clive" }, | |
| { "id" : 6, "name" : "Hicks" }, | |
| { "id" : 7, "name" : "Devin" }, | |
| { "id" : 8, "name" : "Kate" }, | |
| { "id" : 9, "name" : "Klein" }, | |
| ] | |
| friendships = [ | |
| (0,1), | |
| (0,2), | |
| (1,2), | |
| (1,3), | |
| (2,3), | |
| (3,4), | |
| (4,5), | |
| (5,6), | |
| (5,7), | |
| (6,8), | |
| (7,8), | |
| (8,9) | |
| ] | |
| interests = [ | |
| (0, "Hadoop"), (0, "Big Data"), (0, "HBase"), (0, "Java"), | |
| (0, "Spark"), (0, "Storm"), (0, "Cassandra"), | |
| (1, "NoSQL"), (1, "MongoDB"), (1, "Cassandra"), (1, "HBase"), | |
| (1, "Postgres"), (2, "Python"), (2, "scikit-learn"), (2, "scipy"), | |
| (2, "numpy"), (2, "statsmodels"), (2, "pandas"), (3, "R"), (3, "Python"), | |
| (3, "statistics"), (3, "regression"), (3, "probability"), | |
| (4, "machine learning"), (4, "regression"), (4, "decision trees"), | |
| (4, "libsvm"), (5, "Python"), (5, "R"), (5, "Java"), (5, "C++"), | |
| (5, "Haskell"), (5, "programming languages"), (6, "statistics"), | |
| (6, "probability"), (6, "mathematics"), (6, "theory"), | |
| (7, "machine learning"), (7, "scikit-learn"), (7, "Mahout"), | |
| (7, "neural networks"), (8, "neural networks"), (8, "deep learning"), | |
| (8, "Big Data"), (8, "artificial intelligence"), (9, "Hadoop"), | |
| (9, "Java"), (9, "MapReduce"), (9, "Big Data") | |
| ] | |
| class SimplePrint(Observer): | |
| def on_next(self,t): | |
| print(t) | |
| def on_completed(self): | |
| print("") | |
| def on_error(self,e): | |
| print(e) | |
| # returns an Observable emitting friends of a given user | |
| def get_friends(user): | |
| return Observable.from_(friendships) \ | |
| .filter(lambda friendship: friendship[0] == user["id"] or friendship[1] == user["id"]) \ | |
| .flat_map(lambda friendship: Observable.from_(friendship)) \ | |
| .filter(lambda user_id: user_id != user["id"]) \ | |
| .flat_map(lambda friend_id: Observable.from_(users).filter(lambda user: user["id"] == friend_id)) | |
| # emit friends for "Chi" | |
| print("Friends of \"Chi\"") | |
| get_friends(users[3]).subscribe(SimplePrint()) | |
| # get a count of each user's friends, and order by reverse rank | |
| print("\r\nUsers and friend counts, sorted descending") | |
| Observable.from_(users) \ | |
| .flat_map(lambda user: get_friends(user).count().map(lambda ct: (user["name"], ct))) \ | |
| .to_list() \ | |
| .map(lambda list: sorted(list,key=lambda t: t[1],reverse=True)) \ | |
| .flat_map(lambda list: Observable.from_(list)) \ | |
| .subscribe(SimplePrint()) | |
| # get mutual friend for Hero and Sue | |
| print("\r\nMutual friends of Hero and Sue") | |
| def get_mutual_friends(user, other_user): | |
| return get_friends(other_user) \ | |
| .filter(lambda foaf: foaf["id"] != user["id"]) \ | |
| .flat_map(lambda foaf: get_friends(user) | |
| .filter(lambda user_friend: user_friend["id"] == foaf["id"]).count() | |
| .filter(lambda ct: ct > 0).map(lambda b: foaf) | |
| ) | |
| hero = users[0] | |
| chi = users[3] | |
| get_mutual_friends(hero,chi).subscribe(SimplePrint()) | |
| # rank friends of Chi by mutual friend count | |
| print("\r\nRanked friends of Chi by mutual friend count") | |
| get_friends(chi) \ | |
| .flat_map(lambda friend: get_mutual_friends(chi,friend).count().map(lambda ct: (friend["name"], ct))) \ | |
| .to_list() \ | |
| .map(lambda list: sorted(list,key=lambda t: t[1],reverse=True)) \ | |
| .flat_map(lambda list: Observable.from_(list)) \ | |
| .subscribe(SimplePrint()) | |
| # finding common interests | |
| def data_scientists_who_like(target_interest): | |
| return Observable.from_(interests) \ | |
| .filter(lambda applied_interest: applied_interest[1] == target_interest) \ | |
| .map(lambda applied_interest: applied_interest[0]) \ | |
| .flat_map(lambda user_id: Observable.from_(users).filter(lambda user: user["id"] == user_id)) | |
| def interests_for_data_scientist(user): | |
| return Observable.from_(interests) \ | |
| .filter(lambda applied_interest: applied_interest[0] == user["id"]) \ | |
| .map(lambda applied_interest: applied_interest[1]) | |
| def common_interests_between(user, other_user): | |
| return interests_for_data_scientist(user) \ | |
| .flat_map(lambda interest: interests_for_data_scientist(other_user) | |
| .filter(lambda other_interest: interest == other_interest) | |
| ) | |
| def common_interest_count(user): | |
| return Observable.from_(users) \ | |
| .filter(lambda other_user: other_user["id"] != user["id"]) \ | |
| .flat_map(lambda other_user: common_interests_between(user,other_user). | |
| count() | |
| .map(lambda ct: (other_user["name"],ct)) | |
| ).to_list() \ | |
| .map(lambda list: sorted(list, key=lambda t: t[1], reverse=True)) \ | |
| .flat_map(lambda list: Observable.from_(list)) | |
| print("\r\nCommon interest counts for Chi") | |
| common_interest_count(users[3]).subscribe(SimplePrint()) | |
| # Salary and Tenure | |
| print("\r\nAverage salary by tenure range") | |
| salaries_and_tenures = [(83000,8.7),(88000,8.1), | |
| (48000, 0.7), (76000, 6), | |
| (69000,6.5), (76000,7.5), | |
| (60000,2.5), (83000,10), | |
| (48000,1.9), (63000,4.2)] | |
| tenure_buckets = [(0,1.9),(2,5),(5.1,50)] | |
| class SalaryTenureBucket: | |
| def __init__(self, salary, tenure, bucket): | |
| self.salary = salary | |
| self.tenure = tenure | |
| self.bucket = bucket | |
| def get_bucket(tenure): | |
| return Observable.from_(tenure_buckets).filter(lambda tb: tb[0] <= tenure and tenure <= tb[1]) | |
| def average_salary_by_tenure(): | |
| return Observable.from_(salaries_and_tenures) \ | |
| .flat_map(lambda st: get_bucket(st[1]).map(lambda b: SalaryTenureBucket(st[0],st[1],b))) \ | |
| .group_by(lambda stb: stb.bucket) \ | |
| .flat_map(lambda grp: grp.average(lambda stb: stb.salary) | |
| .map(lambda salary: (grp.key, salary)) \ | |
| ).subscribe(SimplePrint()) | |
| average_salary_by_tenure() | |
| ## Words and Counts | |
| print("\r\nInterest occurrence count") | |
| def words_and_counts(): | |
| return Observable.from_(interests) \ | |
| .flat_map(lambda interest: Observable.from_(interest[1].lower().split())) \ | |
| .group_by(lambda s: s) \ | |
| .flat_map(lambda grp: grp.count().map(lambda ct: (grp.key,ct))) \ | |
| .to_list().map(lambda list: sorted(list,key=lambda t: t[1],reverse=True)).flat_map(lambda list: Observable.from_(list)) \ | |
| .subscribe(SimplePrint()) | |
| words_and_counts() | |
| # Friends of "Chi" | |
| # {'id': 1, 'name': 'Dunn'} | |
| # {'id': 2, 'name': 'Sue'} | |
| # {'id': 4, 'name': 'Thor'} | |
| # | |
| # | |
| # Users and friend counts, sorted descending | |
| # ('Dunn', 3) | |
| # ('Sue', 3) | |
| # ('Chi', 3) | |
| # ('Clive', 3) | |
| # ('Kate', 3) | |
| # ('Hero', 2) | |
| # ('Thor', 2) | |
| # ('Hicks', 2) | |
| # ('Devin', 2) | |
| # ('Klein', 1) | |
| # | |
| # | |
| # Mutual friends of Hero and Sue | |
| # {'id': 1, 'name': 'Dunn'} | |
| # {'id': 2, 'name': 'Sue'} | |
| # | |
| # | |
| # Ranked friends of Chi by mutual friend count | |
| # ('Dunn', 1) | |
| # ('Sue', 1) | |
| # ('Thor', 0) | |
| # | |
| # | |
| # Common interest counts for Chi | |
| # ('Clive', 2) | |
| # ('Hicks', 2) | |
| # ('Sue', 1) | |
| # ('Thor', 1) | |
| # ('Hero', 0) | |
| # ('Dunn', 0) | |
| # ('Devin', 0) | |
| # ('Kate', 0) | |
| # ('Klein', 0) | |
| # | |
| # | |
| # Average salary by tenure range | |
| # ((5.1, 50), 79166.66666666667) | |
| # ((0, 1.9), 48000.0) | |
| # ((2, 5), 61500.0) | |
| # | |
| # | |
| # Interest occurrence count | |
| # ('big', 3) | |
| # ('data', 3) | |
| # ('java', 3) | |
| # ('python', 3) | |
| # ('learning', 3) | |
| # ('hadoop', 2) | |
| # ('hbase', 2) | |
| # ('cassandra', 2) | |
| # ('scikit-learn', 2) | |
| # ('r', 2) | |
| # ('statistics', 2) | |
| # ('regression', 2) | |
| # ('probability', 2) | |
| # ('machine', 2) | |
| # ('neural', 2) | |
| # ('networks', 2) | |
| # ('spark', 1) | |
| # ('storm', 1) | |
| # ('nosql', 1) | |
| # ('mongodb', 1) | |
| # ('postgres', 1) | |
| # ('scipy', 1) | |
| # ('numpy', 1) | |
| # ('statsmodels', 1) | |
| # ('pandas', 1) | |
| # ('decision', 1) | |
| # ('trees', 1) | |
| # ('libsvm', 1) | |
| # ('c++', 1) | |
| # ('haskell', 1) | |
| # ('programming', 1) | |
| # ('languages', 1) | |
| # ('mathematics', 1) | |
| # ('theory', 1) | |
| # ('mahout', 1) | |
| # ('deep', 1) | |
| # ('artificial', 1) | |
| # ('intelligence', 1) | |
| # ('mapreduce', 1) |
Glad you like it. This is an example from Data Science from Scratch (Chapter 1) and I made it reactive. I may create a project or write blog posts showing more examples as I sift through data science topics. It's hard to break out of the Rx mindset no matter where I go.
Ah, cool. I'm currently rolling around in a push-based system that's all coroutines and threads. I got to have some fun with RxJS earlier in the year, so maybe it's time to give RxPY a try.
I imagine such a blog series would be well-received, especially if it covered translating from static to in-flight analysis. That's something that folks coming off of intro material like Data Science from Scratch might get a lot out of. Have fun!
Thanks I appreciate that. I've got several ideas I am sifting through with Kotlin and Rx being applied to data science. I'm going to do some blogging on that in the next few months. Regarding RxPy, I am actually preparing for a project with OReilly on that topic. Stay tuned!
Nice example! 👍