Skip to content

Instantly share code, notes, and snippets.

@jk0
Created October 30, 2012 21:07
Show Gist options
  • Select an option

  • Save jk0/3983068 to your computer and use it in GitHub Desktop.

Select an option

Save jk0/3983068 to your computer and use it in GitHub Desktop.
Pyhole Jenkins Plugin
# Copyright 2012 Josh Kearney
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pyhole Jenkins Plugin"""
import json
import subprocess
from lxml import etree
from pyhole import plugin
from pyhole import utils
class Jenkins(plugin.Plugin):
"""Provide access to common Jenkins functionality."""
@plugin.hook_add_poll("poll_bad_builds", poll_timer=60)
def poll_bad_builds(self, params=None, **kwargs):
rss = read_url("https://host/rssFailed")
tree = etree.fromstring(rss)
namespaces = {"atom": "http://www.w3.org/2005/Atom"}
new_failures = {}
for entry in tree.xpath("//atom:entry", namespaces=namespaces):
title = entry.xpath(".//atom:title/text()", namespaces=namespaces)[0]
url = entry.xpath(".//atom:link", namespaces=namespaces)[0].attrib["href"]
repo_json = json.loads(read_url(url + "/api/json"))
owner, branch = parse_params(repo_json["actions"][0]["parameters"])
title = title + " - %s/%s" % (owner, branch)
new_failures[title] = url
try:
old_failures = self._read_cache()
except TypeError:
self._write_cache(new_failures)
return
for new_failure in list(set(new_failures) - set(old_failures)):
title = str(new_failure)
url = new_failures[new_failure]
reply = "[BUILD FAILED] %s: %s" % (title, url)
self.irc.reply(reply)
self._write_cache(new_failures)
def _write_cache(self, data):
"""Write RSS data to cache file."""
json_data = json.dumps(data)
utils.write_file(self.name, "rss_cache", json_data)
def _read_cache(self):
"""Read RSS cache data."""
rss_cache = utils.read_file(self.name, "rss_cache")
return json.loads(rss_cache)
def read_url(url):
user = "user"
api_key = "key"
# NOTE(jk0): I know, but I didn't feel like hacking things to get
# around the SSL errors that urllib2 was throwing.
curl_command = "curl -s -3 -u %s:%s %s" % (user, api_key, url)
return subprocess.check_output(curl_command.split(" "))
def parse_params(params):
owner = None
branch = None
for param in params:
if param["name"] == "GITHUB_OWNER":
owner = param["value"]
elif param["name"] == "GITHUB_BRANCH":
branch = param["value"]
return (owner, branch)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment