-
-
Save kartben/614fea74e9c67df0aae0 to your computer and use it in GitHub Desktop.
# | |
# Licensed to the Apache Software Foundation (ASF) under one or more | |
# contributor license agreements. See the NOTICE file distributed with | |
# this work for additional information regarding copyright ownership. | |
# The ASF licenses this file to You under the Apache License, Version 2.0 | |
# (the "License"); you may not use this file except in compliance with | |
# the License. You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
""" | |
This example will consume temperature data (or any other numerical values, really) | |
from an MQTT broker, and consolidate/graph this data on a 15-second sliding window. | |
This work is based on the original mqtt_wordcount.py sample from the Apache Spark codebase | |
Running the example: | |
`$ bin/spark-submit --jars \ | |
external/mqtt-assembly/target/spark-streaming-mqtt-assembly_*.jar \ | |
mqtt_spark_streaming.py` | |
""" | |
def is_number(s): | |
try: | |
float(s) | |
return True | |
except ValueError: | |
return False | |
import sys | |
import operator | |
from pyspark import SparkContext | |
from pyspark.streaming import StreamingContext | |
from pyspark.streaming.mqtt import MQTTUtils | |
sc = SparkContext(appName="TemperatureHistory") | |
ssc = StreamingContext(sc, 1) | |
ssc.checkpoint("checkpoint") | |
# broker URI | |
brokerUrl = "tcp://192.168.2.26:1883" # "tcp://iot.eclipse.org:1883" | |
# topic or topic pattern where temperature data is being sent | |
topic = "+/+/sensors/temperature" | |
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic) | |
counts = mqttStream \ | |
.filter(lambda message: is_number(message)) \ | |
.map(lambda message: ( round(float(message) * 2, 0) / 2, 1 )) \ | |
.reduceByKeyAndWindow(operator.add, operator.sub, 15, 1) \ | |
.transform(lambda rdd: rdd.sortByKey()) | |
def printHistogram(time, rdd): | |
c = rdd.collect() | |
print("-------------------------------------------") | |
print("Time: %s" % time) | |
print("-------------------------------------------") | |
for record in c: | |
# "draw" our lil' ASCII-based histogram | |
print(str(record[0]) + ': ' + '#'*record[1]) | |
print("") | |
counts.foreachRDD(printHistogram) | |
ssc.start() | |
ssc.awaitTermination() |
Hi,
I had the same problem but running the code using Apache Bahir package worked for me. However I made some changes to the code.
My Apache Spark version is 2.45.
First I started pyspark with this command:
pyspark --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.4.0
When the console is ready I can run the code example changing
from pyspark.streaming.mqtt import MQTTUtils
to from mqtt import MQTTUtils
Also I had to change also the MQTT connection to:
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic, username=None, password=None)
@massimocallisto When I run the command pyspark --packages org.apache.bahir:spark-streaming-mqtt_2.11:3.3.1
, I got the following error
module not found: org.apache.bahir#spark-streaming-mqtt_2.11;3.3.1
Please let me know if you have any solution?
This code is really old so I doubt it still works, unfortunately.
@kartben , It is not working, So is there any other way to do spark streaming with MQTT protocol?
@BennisonDevadoss have you tried the latest version of the official samples which this code was originally inspired from?
https://github.com/apache/bahir/tree/master/streaming-mqtt/examples
Any way to retrieve actual topic name from the stream if using wildcard characters like '#' or '+' to subscribe to multiple topics. MqttUtils also has a method createPairedStream but it does not allow username, password authentication. Need help
Hello ! i am facing an issue while running this, it says "ModuleNotFoundError: No module named 'pyspark.streaming.mqtt'" what could be the solution ?