Last active
June 22, 2017 03:40
-
-
Save paulocoutinhox/b406ae98ea24120436954967e37103f6 to your computer and use it in GitHub Desktop.
Flink example Recommendation - Java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.myorg.quickstart; | |
| import org.apache.flink.api.common.functions.CrossFunction; | |
| import org.apache.flink.api.common.functions.FlatMapFunction; | |
| import org.apache.flink.api.common.functions.GroupReduceFunction; | |
| import org.apache.flink.api.java.DataSet; | |
| import org.apache.flink.api.java.ExecutionEnvironment; | |
| import org.apache.flink.api.java.functions.KeySelector; | |
| import org.apache.flink.api.java.tuple.Tuple2; | |
| import org.apache.flink.api.java.tuple.Tuple3; | |
| import org.apache.flink.util.Collector; | |
| import java.io.Serializable; | |
| import java.util.ArrayList; | |
| public class UserRecommendation { | |
| public static void main(String[] args) throws Exception { | |
| final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | |
| // le o arquivo cm o dataset | |
| DataSet<String> text = env.readTextFile("/Users/paulo/Downloads/dataset.csv"); | |
| // cria tuple com: customer | item | count | |
| DataSet<Tuple3<Long, Long, Integer>> csv = text.flatMap(new LineFieldSplitter()).groupBy(0, 1).reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Integer>>() { | |
| @Override | |
| public void reduce(Iterable<Tuple2<Long, Long>> iterable, Collector<Tuple3<Long, Long, Integer>> collector) throws Exception { | |
| Long customerId = 0L; | |
| Long itemId = 0L; | |
| Integer count = 0; | |
| for (Tuple2<Long, Long> item : iterable) { | |
| customerId = item.f0; | |
| itemId = item.f1; | |
| count = count + 1; | |
| } | |
| collector.collect(new Tuple3<>(customerId, itemId, count)); | |
| } | |
| }); | |
| // agrupa os items do customer dentro do customer | |
| final DataSet<CustomerItems> customerItems = csv.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple3<Long, Long, Integer>, CustomerItems>() { | |
| @Override | |
| public void reduce(Iterable<Tuple3<Long, Long, Integer>> iterable, Collector<CustomerItems> collector) throws Exception { | |
| ArrayList<Long> newItems = new ArrayList<>(); | |
| Long customerId = 0L; | |
| for (Tuple3<Long, Long, Integer> item : iterable) { | |
| customerId = item.f0; | |
| newItems.add(item.f1); | |
| } | |
| collector.collect(new CustomerItems(customerId, newItems)); | |
| } | |
| }); | |
| // obtém todos os itens do customer que pertence a um usuário parecido | |
| DataSet<CustomerItems> ci = customerItems.cross(customerItems).with(new CrossFunction<CustomerItems, CustomerItems, CustomerItems>() { | |
| @Override | |
| public CustomerItems cross(CustomerItems customerItems, CustomerItems customerItems2) throws Exception { | |
| if (!customerItems.customerId.equals(customerItems2.customerId)) { | |
| boolean has = false; | |
| for (Long item : customerItems2.items) { | |
| if (customerItems.items.contains(item)) { | |
| has = true; | |
| break; | |
| } | |
| } | |
| if (has) { | |
| for (Long item : customerItems2.items) { | |
| if (!customerItems.items.contains(item)) { | |
| customerItems.ritems.add(item); | |
| } | |
| } | |
| } | |
| } | |
| return customerItems; | |
| } | |
| }).groupBy(new KeySelector<CustomerItems, Long>() { | |
| @Override | |
| public Long getKey(CustomerItems customerItems) throws Exception { | |
| return customerItems.customerId; | |
| } | |
| }).reduceGroup(new GroupReduceFunction<CustomerItems, CustomerItems>() { | |
| @Override | |
| public void reduce(Iterable<CustomerItems> iterable, Collector<CustomerItems> collector) throws Exception { | |
| CustomerItems c = new CustomerItems(); | |
| for (CustomerItems current : iterable) { | |
| c.customerId = current.customerId; | |
| for (Long item : current.ritems) { | |
| if (!c.ritems.contains(item)) { | |
| c.ritems.add(item); | |
| } | |
| } | |
| } | |
| collector.collect(c); | |
| } | |
| }); | |
| ci.first(100).print(); | |
| System.out.println(ci.count()); | |
| } | |
| public static class CustomerItems implements Serializable { | |
| public Long customerId; | |
| public ArrayList<Long> items = new ArrayList<>(); | |
| public ArrayList<Long> ritems = new ArrayList<>(); | |
| public CustomerItems() { | |
| } | |
| public CustomerItems(Long customerId, ArrayList<Long> items) { | |
| this.customerId = customerId; | |
| this.items = items; | |
| } | |
| @Override | |
| public String toString() { | |
| StringBuilder itemsData = new StringBuilder(); | |
| if (items != null) { | |
| for (Long item : items) { | |
| if (itemsData.length() == 0) { | |
| itemsData.append(item); | |
| } else { | |
| itemsData.append(", ").append(item); | |
| } | |
| } | |
| } | |
| StringBuilder ritemsData = new StringBuilder(); | |
| if (ritems != null) { | |
| for (Long item : ritems) { | |
| if (ritemsData.length() == 0) { | |
| ritemsData.append(item); | |
| } else { | |
| ritemsData.append(", ").append(item); | |
| } | |
| } | |
| } | |
| return String.format("[ID: %d, Items: %s, RItems: %s]", customerId, itemsData, ritemsData); | |
| } | |
| } | |
| public static final class LineFieldSplitter implements FlatMapFunction<String, Tuple2<Long, Long>> { | |
| @Override | |
| public void flatMap(String value, Collector<Tuple2<Long, Long>> out) { | |
| // normalize and split the line | |
| String[] tokens = value.split("\t"); | |
| if (tokens.length > 1) { | |
| out.collect(new Tuple2<>(Long.valueOf(tokens[0]), Long.valueOf(tokens[1]))); | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment