Skip to content

Instantly share code, notes, and snippets.

@duyet
Last active August 29, 2015 14:24
Show Gist options
  • Save duyet/5ea78926d24406babef4 to your computer and use it in GitHub Desktop.
Save duyet/5ea78926d24406babef4 to your computer and use it in GitHub Desktop.
Apache Spark Machine Learning
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row, SQLContext
sc = SparkContext(appName="SimpleTextClassificationPipeline")
sqlContext = SQLContext(sc)
# Prepare training documents, which are labeled.
LabeledDocument = Row("id", "text", "label")
training = sc.parallelize([(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 1.0)]) \
.map(lambda x: LabeledDocument(*x)).toDF()
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Fit the pipeline to training documents.
model = pipeline.fit(training)
# Prepare test documents, which are unlabeled.
Document = Row("id", "text")
test = sc.parallelize([(4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(7L, "hadoop")]) \
.map(lambda x: Document(*x)).toDF()
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
print row
sc.stop()
import os
from pyspark import SparkContext
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint
sc = SparkContext("local", "Simple App")
current_folder = os.path.dirname(os.path.realpath(__file__))
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.split(' ')]
return LabeledPoint(values[0], values[1:])
data = sc.textFile(current_folder + "/data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)
# Build the model
model = SVMWithSGD.train(parsedData, iterations=100)
# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
# Save and load model
model.save(sc, current_folder + "/saved_model")
# sameModel = SVMModel.load(sc, "myModelPath")
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
from pyspark import SparkContext
from pyspark.mllib.classification import SVMWithSGD, SVMModel, NaiveBayes, NaiveBayesModel
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
def file_to_data(path):
training_array = []
# Reading training file data
with open(path, "r") as f:
i = 0L
for line in f:
if line:
current_line = line.split('\t')
if current_line is not None:
try:
i += 1
label = float(current_line[0])
features = Vectors.dense(current_line[1].split(' '))
training_array.append(LabeledPoint(label, features))
except TypeError:
print ">>>>>>>ERROR: ", current_line
return training_array
current_folder = os.path.dirname(os.path.realpath(__file__))
data_folder = current_folder + "/labeled_data/";
training_file_path = data_folder + "training_mini.txt"
testing_file_path = data_folder + "testing_mini.txt"
sc = SparkContext(appName="LvDuit_Classify_Bayes", master="spark://lvduit:7077")
# sqlContext = SQLContext(sc)
# Load training and testing
training_array = file_to_data(training_file_path)
testing_array = file_to_data(testing_file_path)
# Prepare training documents, which are labeled.
LabeledDocument = Row("id", "label", "text")
training = sc.parallelize(training_array).map(lambda x: LabeledDocument(*x)).toDF()
# print [(i[0], i[1]) for i in training_array]
# Convert tranning data and testing data to Spark Context
training_data = sc.parallelize(training_array)
testing_data = sc.parallelize(testing_array)
print training_data
# Train model
model = SVMWithSGD.train(training_data, iterations=100)
# labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
# Save model
model.save(sc, current_folder + "/bayes_model")
test = model.predict(testing_data).collect()
print test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment