Skip to content

Instantly share code, notes, and snippets.

@reinhrst
Last active August 29, 2015 14:08
Show Gist options
  • Select an option

  • Save reinhrst/05d1666fa7f93ad1bce3 to your computer and use it in GitHub Desktop.

Select an option

Save reinhrst/05d1666fa7f93ad1bce3 to your computer and use it in GitHub Desktop.
Proof of concept for Buildbot 0.89 / Campfire integration. For sure makes some assumptions on our buildbot setup!
import base64
from twisted.internet import reactor
from buildbot.interfaces import IStatusReceiver
from twisted.application import service
from twisted.web.client import getPage
from zope.interface import implements
from twisted.internet import defer
import httpstream
import json
from buildbot.status.results import EXCEPTION
from buildbot.status.results import FAILURE
from buildbot.status.results import SUCCESS
from buildbot.status.results import WARNINGS
class CampfireSteamReceiver(httpstream.MessageReceiver):
def __init__(self, campfirestatus):
self.campfirestatus = campfirestatus
super(CampfireSteamReceiver, self).__init__()
def connectionMade(self):
self.campfirestatus.streamingConnectionMade()
def connectionFailed(self, why):
self.campfirestatus.streamingConnectionFailed(why)
def messageReceived(self, message):
self.campfirestatus.streamingMessageReceived(message)
class CampfireStatus(service.MultiService):
implements(IStatusReceiver)
def __init__(self, accesstoken, subdomain, roomname):
service.MultiService.__init__(self)
self.accesstoken = accesstoken
self.subdomain = subdomain
self.roomname = roomname
def log(self, text):
print("FIRE! FIRE! FIRE! " + text)
def getAuthenticatedPage(self, url, method, postdata):
headers = {
"Authorization": "Basic " + base64.b64encode(
self.accesstoken + ":"),
"User-Agent": "Buildbot integration ([email protected]",
}
if postdata:
headers["Content-Type"] = "application/json"
return getPage(url, method=method, postdata=postdata, headers=headers)
@defer.inlineCallbacks
def getCampfirePage(self, path, method="GET", postdata=None):
if postdata:
postdata = json.dumps(postdata)
jsonresponse = yield self.getAuthenticatedPage(
"https://%s.campfirenow.com/%s.json" % (self.subdomain, path),
method=method, postdata=postdata)
if jsonresponse.strip():
defer.returnValue(json.loads(jsonresponse))
else:
defer.returnValue(None)
def postCampfirePage(self, path, data=None):
return self.getCampfirePage(path, method="POST", postdata=data)
@defer.inlineCallbacks
def joinRoom(self):
page = yield self.getCampfirePage("rooms")
for room in page["rooms"]:
if room["name"] == self.roomname:
self.roomid = room["id"]
break
else:
raise Exception("Room not found: %s" % self.roomname)
yield self.postCampfirePage("room/%s/join" % self.roomid)
@defer.inlineCallbacks
def leaveRoom(self):
yield self.postCampfirePage("room/%s/leave" % self.roomid)
self.roomid = None
@defer.inlineCallbacks
def speak(self, message):
assert self.roomid
self.log("Speak " + message)
packedmessage = {"message": {"body": message}}
yield self.postCampfirePage("room/%s/speak" % self.roomid,
data=packedmessage)
@defer.inlineCallbacks
def getUsername(self, userid):
userobject = yield self.getCampfirePage("users/%d" % userid)
username = userobject["user"]["name"]
defer.returnValue(username)
def startSteaming(self):
assert self.roomid
httpstream.stream(
reactor,
"https://streaming.campfirenow.com/room/%d/live.json" % (
self.roomid),
CampfireSteamReceiver(self),
username=self.accesstoken,
)
""" steaming callbacks """
def streamingConnectionMade(self):
self.log("streaming connection made")
def streamingConnectionFailed(self, why):
self.log("streamingConnectionFailed: %s" % why)
def streamingMessageReceived(self, message):
body = message["body"].strip()
if body[:3] in ["bb:", "bb "]:
command = body[3:].split()
self.log("command: %s" % command)
handler = "command_" + command[0].upper()
if not hasattr(self, handler):
handler = "command_HELP"
getattr(self, handler)(command, message["user_id"])
""" commands """
def command_HELP(self, args, userid):
actions = [member[len("command_"):].lower() for member in dir(self)
if member.startswith("command_")]
self.speak("I understand the following commands: %s" %
", ".join(sorted(actions)))
def command_STATUS(self, args, userid):
statuslines = ["Status:"]
for builder in self.master.botmaster.getBuilders():
builds = builder.builder_status.getCurrentBuilds()
if len(builds) > 1:
status = "building"
elif len(builds) == 1:
build = builds[0]
status = "Building (%s)" % build.getCurrentStep().getName()
else:
status = "free"
statuslines.append("%s: %s" % (builder.name, status))
self.speak("\n".join(statuslines))
@defer.inlineCallbacks
def command_FORCE(self, args, userid):
schedulername = args[1] if len(args) > 1 else None
allowednames = []
for sch in self.master.allSchedulers():
if schedulername == sch.name:
username = yield self.getUsername(userid)
yield sch.force(username, sch.builderNames)
yield self.speak("Build forced")
break
allowednames.append(sch.name)
else:
yield self.speak("Forceable builds: " + ", ".join(allowednames))
""" builtin callbacks """
def checkConfig(self, otherStatusReceivers):
pass
def setServiceParent(self, parent):
"""
@type parent: L{buildbot.master.BuildMaster}
"""
service.MultiService.setServiceParent(self, parent)
self.master_status = self.parent
self.master_status.subscribe(self)
self.master = self.master_status.master
@defer.inlineCallbacks
def startService(self):
yield self.joinRoom()
self.startSteaming()
yield self.speak("Hello peeps (buildbot (re)started)")
@defer.inlineCallbacks
def stopService(self):
yield self.leaveRoom()
""" status update callbacks """
def buildFinished(self, builderName, build, results):
message = "Builder %s finished: %s" % (
builderName,
{
FAILURE: "FAIL",
SUCCESS: "SUCCESS",
EXCEPTION: "EXCEPTION",
WARNINGS: "WARNINGS"
}.get(results, "[UNKNOWN VALUE]"))
self.speak(message)
def builderAdded(self, builderName, builder):
return self # subscribe to builder
def builderRemoved(self, name):
pass
def builderChangedState(self, name, state):
pass
def buildStarted(self, name, build):
self.speak("Build started: " + name)
# downloaded from
# https://github.com/rande/twisted-http-stream
# coding: utf-8
#
# Copyright 2009 Alexandre Fiori
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__author__ = "Thomas Rabaix, Alexandre Fiori"
__version__ = "0.0.1"
"""Twisted client library to handle Http Streaming.
This code is an adaptation of https://github.com/fiorix/twisted-twitter-stream
"""
import base64, urllib
from twisted.protocols import basic
from twisted.internet import defer, protocol, ssl
from urlparse import urlparse
try:
import simplejson as _json
except ImportError:
try:
import json as _json
except ImportError:
raise RuntimeError("A JSON parser is required, e.g., simplejson at "
"http://pypi.python.org/pypi/simplejson/")
class MessageReceiver(object):
def connectionMade(self):
pass
def connectionFailed(self, why):
pass
def messageReceived(self, message):
raise NotImplementedError
def _registerProtocol(self, protocol):
self._streamProtocol = protocol
def disconnect(self):
if hasattr(self, "_streamProtocol"):
self._streamProtocol.factory.continueTrying = 0
self._streamProtocol.transport.loseConnection()
else:
raise RuntimeError("not connected")
class HttpStreamProtocol(basic.LineReceiver):
delimiter = "\r\n"
def __init__(self):
self.in_header = True
self.header_data = []
self.status_data = ""
self.status_size = None
def connectionMade(self):
self.transport.write(self.factory.header)
self.factory.consumer._registerProtocol(self)
def lineReceived(self, line):
while self.in_header:
if line:
self.header_data.append(line)
else:
http, status, message = self.header_data[0].split(" ", 2)
status = int(status)
if status == 200:
self.factory.consumer.connectionMade()
else:
self.factory.continueTrying = 0
self.transport.loseConnection()
self.factory.consumer.connectionFailed(RuntimeError(status, message))
self.in_header = False
break
else:
try:
self.status_size = int(line, 16)
self.setRawMode()
except:
pass
def rawDataReceived(self, data):
if self.status_size is not None:
data, extra = data[:self.status_size], data[self.status_size:]
self.status_size -= len(data)
else:
extra = ""
self.status_data += data
if self.status_size == 0:
try:
# ignore newline keep-alive
message = _json.loads(self.status_data)
except:
pass
else:
self.factory.consumer.messageReceived(message)
self.status_data = ""
self.status_size = None
self.setLineMode(extra)
class HttpStreamFactory(protocol.ReconnectingClientFactory):
maxDelay = 120
protocol = HttpStreamProtocol
def __init__(self, consumer):
if isinstance(consumer, MessageReceiver):
self.consumer = consumer
else:
raise TypeError("consumer should be an instance of twistedhttpstream.MessageReceiver")
def make_header(self, username, password, method, url, postdata=""):
auth = base64.encodestring("%s:%s" % (username, password)).strip()
header = [
"%s %s HTTP/1.1" % (method, url.path),
"Authorization: Basic %s" % auth,
"User-Agent: twisted http stream",
"Host: %s" % url.netloc,
"Accept: 'application/json",
]
if method == "GET":
self.header = "\r\n".join(header) + "\r\n\r\n"
elif method == "POST":
header += [
"Content-Type: application/x-www-form-urlencoded",
"Content-Length: %d" % len(postdata),
]
self.header = "\r\n".join(header) + "\r\n\r\n" + postdata
def stream(reactor, url, consumer, username=None, password=None):
url = urlparse(url)
tw = HttpStreamFactory(consumer)
tw.make_header(username, password, "GET", url)
if url.scheme == "https":
reactor.connectSSL(url.netloc, 443, tw, ssl.ClientContextFactory())
else:
reactor.connectTCP(url.netloc, 80, tw)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment