Skip to content

Instantly share code, notes, and snippets.

@manashmandal
Last active November 3, 2017 17:33
Show Gist options
  • Save manashmandal/da2cea458d3cd131b973ccb75f547fca to your computer and use it in GitHub Desktop.
Save manashmandal/da2cea458d3cd131b973ccb75f547fca to your computer and use it in GitHub Desktop.
radio player
import RPi.GPIO as GPIO
import os
import time
import requests
import subprocess
NEXT_BTN = 13
PLAY_PAUSE_BTN = 19
VOL_UP_BTN = 5
VOL_DOWN_BTN = 6
LED_RED = 16
LED_GREEN = 20
LED_BLUE = 21
# CONST COMMANDS
CMD_MPC_CLEAR = "mpc clear"
CMD_MPC_LOAD_LIST = "mpc load list"
CMD_MPD_SERVICE_START = "service mpd start"
CMD_MPC_VOL = "mpc volume "
CMD_MPC_PLAY = "mpc play"
CMD_MPC_PAUSE = "mpc pause"
CMD_MPC_NEXT = "mpc next"
CMD_MPD_RESTART = "service mpd restart"
CMD_MPC_STATUS = "mpc status"
CMD_MPC_STOP = "mpc stop"
STREAM_LINKS = [
"http://mediacorp.rastream.com/958fm",
"http://ample-zeno-03.radiojar.com/8wv4d8g4344tv?rj-ttl=5&rj-token=AAABX3GuYj7apLtdjUx-i5eEJIfFT8buRyZxL8kDMtaf5UBDPtT90w",
"http://live.radiogoongoon.com:8888",
"http://118.179.219.244:8000",
"http://216.126.195.37:8012",
"http://mediacorp.rastream.com/933fm",
"http://stream.zenolive.com/8wv4d8g4344tv",
"http://mediacorp.rastream.com/972fm",
"http://mediacorp.rastream.com/942fm",
"http://mediacorp.rastream.com/905fm",
"http://live.leanstream.co/CISNFM-MP3"
]
def parse_status():
output = str(subprocess.check_output(CMD_MPC_STATUS, shell=True))
status_dict = {}
# Get playing status
if len(output) > 80:
status_dict['PLAYING'] = True
else:
status_dict['PLAYING'] = False
# Get current volume
vol_begin_idx = output.find('volume') + len('volume:')
vol_end_idx = output[vol_begin_idx : ].find('%')
parsed_vol = output[vol_begin_idx : vol_begin_idx + vol_end_idx].strip()
status_dict['VOLUME'] = parsed_vol
return status_dict
# Accepts negative value as well
def mpc_vol_set(value=70):
os.system(CMD_MPC_VOL + str(value))
# Increases volume
def increase_volume():
volume = int(parse_status()['VOLUME'])
if volume > 95:
pass
else:
volume += 5
mpc_vol_set(volume)
# Decreases volume
def decrease_volume():
volume = int(parse_status()['VOLUME'])
if volume < 5:
pass
else:
volume -= 5
mpc_vol_set(volume)
def is_radio_playing():
return parse_status()['PLAYING']
# Button operations
def read_vol_incr(sleep_time=0.1):
time.sleep(sleep_time)
return GPIO.input(VOL_UP_BTN)
def read_vol_decr(sleep_time=0.1):
time.sleep(sleep_time)
return GPIO.input(VOL_DOWN_BTN)
def read_next(sleep_time=0.1):
time.sleep(sleep_time)
return GPIO.input(NEXT_BTN)
def read_play_pause(sleep_time=0.1):
time.sleep(sleep_time)
return GPIO.input(PLAY_PAUSE_BTN)
# Returns True on 200, otherwise False
def test_internet(timeout=1):
try:
return requests.get('https://google.com', timeout=timeout).status_code == 200
except:
return False
def read_vol_up(sleep_time=0.1):
time.sleep(sleep_time)
return GPIO.input(VOL_UP_BTN)
def read_vol_down(sleep_time=0.1):
time.sleep(sleep_time)
return GPIO.input(VOL_DOWN_BTN)
def clear_mpc():
os.system(CMD_MPC_CLEAR)
def start_mpd():
os.system(CMD_MPD_SERVICE_START)
def add_playlist_mpc():
for link in STREAM_LINKS:
os.system("mpc add " + link)
def play_mpc():
os.system(CMD_MPC_PLAY)
def next_mpc():
os.system(CMD_MPC_NEXT)
def stop_mpc():
os.system(CMD_MPC_STOP)
# LED functions
def led_red(state=True):
if state == True:
GPIO.output(LED_RED, 1)
GPIO.output(LED_RED, 0)
else:
GPIO.output(LED_RED, 0)
GPIO.output(LED_RED, 1)
def led_blue(state=True):
if state == True:
GPIO.output(LED_BLUE, 1)
GPIO.output(LED_BLUE, 0)
else:
GPIO.output(LED_BLUE, 0)
GPIO.output(LED_BLUE, 1)
def led_green(state=True):
if state == True:
GPIO.output(LED_GREEN, 1)
GPIO.output(LED_GREEN, 0)
else:
GPIO.output(LED_GREEN, 0)
GPIO.output(LED_GREEN, 1)
# Initialize all
def init():
GPIO.setmode(GPIO.BCM)
GPIO.setup(NEXT_BTN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(PLAY_PAUSE_BTN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(VOL_UP_BTN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(VOL_DOWN_BTN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(LED_RED, GPIO.OUT)
GPIO.setup(LED_BLUE, GPIO.OUT)
GPIO.setup(LED_GREEN, GPIO.OUT)
# Start service
start_mpd()
clear_mpc()
# Adding playlist
add_playlist_mpc()
#init()
#while True:
# print(next_btn())
# time.sleep(1)
init()
radio_on = True
play_mpc()
while True:
# If internet not available turn on the red led
#led_red(not test_internet())
vol_incr = read_vol_up()
vol_decr = read_vol_down()
next = read_next()
toggle_btn = read_play_pause()
if vol_incr:
increase_volume()
if vol_decr:
decrease_volume()
if next:
next_mpc()
if toggle_btn and radio_on:
stop_mpc()
radio_on = False
elif toggle_btn and not radio_on:
play_mpc()
radio_on = True
from luma.core.interface.serial import i2c
from luma.core.render import canvas
from luma.oled.device import ssd1306
import Adafruit_BMP.BMP085 as BMP085
import Adafruit_DHT
sensor = BMP085.BMP085()
DHTpin = 22
HumiditySensor = Adafruit_DHT.DHT22
def do_nothing(obj):
pass
humidity, temperature = Adafruit_DHT.read_retry(HumiditySensor, DHTpin)
serial = i2c(address=0x3C)
device = ssd1306(serial)
device.cleanup = do_nothing
if humidity is not None and temperature is not None:
print('Temp={0:0.1f}*C Humidity={1:0.1f}%'.format(temperature, humidity))
# pass
with canvas(device) as draw:
draw.rectangle(device.bounding_box, outline="white", fill="black")
draw.text((10, 20), 'Temp = {0:0.2f} *C'.format(sensor.read_temperature()), fill="white")
draw.text((10, 30), 'Pressure = {0:0.2f} Pa'.format(sensor.read_pressure()), fill="white")
draw.text((10, 40), 'Altitude = {0:0.2f} m'.format(sensor.read_altitude()), fill="white")
draw.text((10, 50) ,'Humidity= {0:0.2f}%'.format(temperature, humidity), fill="white")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment