Last active
August 29, 2015 14:24
-
-
Save duyet/5ea78926d24406babef4 to your computer and use it in GitHub Desktop.
Apache Spark Machine Learning
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkContext | |
from pyspark.ml import Pipeline | |
from pyspark.ml.classification import LogisticRegression | |
from pyspark.ml.feature import HashingTF, Tokenizer | |
from pyspark.sql import Row, SQLContext | |
sc = SparkContext(appName="SimpleTextClassificationPipeline") | |
sqlContext = SQLContext(sc) | |
# Prepare training documents, which are labeled. | |
LabeledDocument = Row("id", "text", "label") | |
training = sc.parallelize([(0L, "a b c d e spark", 1.0), | |
(1L, "b d", 0.0), | |
(2L, "spark f g h", 1.0), | |
(3L, "hadoop mapreduce", 1.0)]) \ | |
.map(lambda x: LabeledDocument(*x)).toDF() | |
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. | |
tokenizer = Tokenizer(inputCol="text", outputCol="words") | |
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") | |
lr = LogisticRegression(maxIter=10, regParam=0.01) | |
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) | |
# Fit the pipeline to training documents. | |
model = pipeline.fit(training) | |
# Prepare test documents, which are unlabeled. | |
Document = Row("id", "text") | |
test = sc.parallelize([(4L, "spark i j k"), | |
(5L, "l m n"), | |
(6L, "mapreduce spark"), | |
(7L, "hadoop")]) \ | |
.map(lambda x: Document(*x)).toDF() | |
# Make predictions on test documents and print columns of interest. | |
prediction = model.transform(test) | |
selected = prediction.select("id", "text", "prediction") | |
for row in selected.collect(): | |
print row | |
sc.stop() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from pyspark import SparkContext | |
from pyspark.mllib.classification import SVMWithSGD, SVMModel | |
from pyspark.mllib.regression import LabeledPoint | |
sc = SparkContext("local", "Simple App") | |
current_folder = os.path.dirname(os.path.realpath(__file__)) | |
# Load and parse the data | |
def parsePoint(line): | |
values = [float(x) for x in line.split(' ')] | |
return LabeledPoint(values[0], values[1:]) | |
data = sc.textFile(current_folder + "/data/mllib/sample_svm_data.txt") | |
parsedData = data.map(parsePoint) | |
# Build the model | |
model = SVMWithSGD.train(parsedData, iterations=100) | |
# Evaluating the model on training data | |
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features))) | |
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count()) | |
print("Training Error = " + str(trainErr)) | |
# Save and load model | |
model.save(sc, current_folder + "/saved_model") | |
# sameModel = SVMModel.load(sc, "myModelPath") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import os | |
from pyspark import SparkContext | |
from pyspark.mllib.classification import SVMWithSGD, SVMModel, NaiveBayes, NaiveBayesModel | |
from pyspark.mllib.linalg import Vectors | |
from pyspark.mllib.regression import LabeledPoint | |
tokenizer = Tokenizer(inputCol="text", outputCol="words") | |
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") | |
def file_to_data(path): | |
training_array = [] | |
# Reading training file data | |
with open(path, "r") as f: | |
i = 0L | |
for line in f: | |
if line: | |
current_line = line.split('\t') | |
if current_line is not None: | |
try: | |
i += 1 | |
label = float(current_line[0]) | |
features = Vectors.dense(current_line[1].split(' ')) | |
training_array.append(LabeledPoint(label, features)) | |
except TypeError: | |
print ">>>>>>>ERROR: ", current_line | |
return training_array | |
current_folder = os.path.dirname(os.path.realpath(__file__)) | |
data_folder = current_folder + "/labeled_data/"; | |
training_file_path = data_folder + "training_mini.txt" | |
testing_file_path = data_folder + "testing_mini.txt" | |
sc = SparkContext(appName="LvDuit_Classify_Bayes", master="spark://lvduit:7077") | |
# sqlContext = SQLContext(sc) | |
# Load training and testing | |
training_array = file_to_data(training_file_path) | |
testing_array = file_to_data(testing_file_path) | |
# Prepare training documents, which are labeled. | |
LabeledDocument = Row("id", "label", "text") | |
training = sc.parallelize(training_array).map(lambda x: LabeledDocument(*x)).toDF() | |
# print [(i[0], i[1]) for i in training_array] | |
# Convert tranning data and testing data to Spark Context | |
training_data = sc.parallelize(training_array) | |
testing_data = sc.parallelize(testing_array) | |
print training_data | |
# Train model | |
model = SVMWithSGD.train(training_data, iterations=100) | |
# labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features))) | |
# Save model | |
model.save(sc, current_folder + "/bayes_model") | |
test = model.predict(testing_data).collect() | |
print test |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment