Skip to content

Instantly share code, notes, and snippets.

@shofetim
Created March 10, 2016 19:05
Show Gist options
  • Save shofetim/71f2d705b3bf804dc391 to your computer and use it in GitHub Desktop.
Save shofetim/71f2d705b3bf804dc391 to your computer and use it in GitHub Desktop.

Code Samples

The samples are lifted from various production systems, and as such are not "polished".

Something in Clojure, this was part of a web service that detected if the data it was sent was encrypted or not.

(ns store.chisq
  (:require [clojure.java.io :as io]
            [vault.util :refer [file->byte-array]]))

;; See Knuth, Donald E. The Art of Computer Programming, Volume 2 /
;; Seminumerical Algorithms. Reading MA: Addison-Wesley, 1969.  ISBN
;; 0-201-89684-2. pp 35-40 / Section 3.3.1

(defn chi-square [ba]
  (let [n (count ba)
        k 256
        ps (/ 1 k)
        ys (frequencies ba)]
    (float (- (/ (reduce + (map #(/ (* % %) ps) (vals ys))) n) n))))

;; Knuth gives a table for distributions with degrees of freedom up
;; 50, we have 256. The correct values can be approximated as follows:

;; (def approximate-normal-chi-square-distribution
;;   (let [v 256]
;;     (map
;;        #(+ v (* (Math/sqrt (* 2 v)) %) (* (/ 2 3) (* % %)) (- (/ 2 3)) (/ 1 (Math/sqrt v)))
;;        [-2.33 -1.64 -0.674 0.0 0.674 1.64 2.33])))

(defn likely-encrypted?
  "Likeliness that the file is :plaintext, :suspect, or :encrypted.

  Is lenient in that almost suspect files are reported as encrypted."
  [v]
  (cond
    (< v 206.293218394731) :plaintext
    (and (>= v 206.293218394731) (< v 220.07993612333)) :suspect
    (and (>= v 220.07993612333) (<= v 294.29786387667)) :encrypted
    (and (< v 311.73698160526897) (> v 294.29786387667)) :suspect
    (> v 311.73698160526897) :plaintext))

Some Python sockets code, implementing a custom wire protocol for one of these

import socket
import struct
import base64
import logging

class Jade(object):
    """Interface to JadeX7 portal scanner
    """

    # Message types that the scanner can send. See Portal7-Ethernet
    # X2.9.pdf for definition of the message values.
    message_types = {
        "I": "Item Event",
        "B": "Label Data",
        "P": "Image Data",
        "L": "Item Location",
        "V": "Item Volume",
        "T": "Item Tracking",
        "F": "Fault Event",
        "A": "Acknowledgement",
        "U": "Update Item Even",
        "O": "Label Choice",
        "C": "Connected Message",
        "K": "Identification Message",
        "H": "Health Summary Message",
        "G": "Health Extended Message",
        "N": "Statistics Message",
        "X": "Check Connection Message",
    }
    MAX_BUFFER_SIZE = 4096
    connected = False

    #TODO These are wasteful, use closures?
    def _int(self, x):
        return struct.Struct("!I").unpack(x)[0]
    def _short(self, x):
        return struct.Struct("!H").unpack(x)[0]
    def _single_byte_int(self, x):
        return struct.Struct("!B").unpack(x)[0]

    def __init__(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        logging.basicConfig()
        self.log = logging.getLogger()

    def __del__(self):
        self.disconnect()


    #-------------------------------------------------------------------------#
    # Commands to Scanner
    #-------------------------------------------------------------------------#
    def connect(self, host="172.18.3.195", port=23959):
        self.sock.connect((host, port))
        self.connected = True

    def disconnect(self):
        self.sock.shutdown(1)
        self.sock.close()
        self.connected = False

    def enable(self):
        packer = struct.Struct("!cI?") #faking a single byte with 0 in it with the ?
        values = ("e", 1, 0)
        packed_data = packer.pack(*values)
        self.sock.send(packed_data)

    def disable(self):
        packer = struct.Struct("!cI?") #faking a single byte with 0 in it with the ?
        values = ("d", 1, 0)
        packed_data = packer.pack(*values)
        self.sock.send(packed_data)

    def sleep(self):
        packer = struct.Struct("!cI6c")
        values = ("z", 6, "D", "E", "E", "P", "L", "Y")
        packed_data = packer.pack(*values)
        self.sock.send(packed_data)

    def shutdown(self):
        """Will halt the scanner and require manual restart"""
        packer = struct.Struct("!cI")
        values = ("o", 0)
        packed_data = packer.pack(*values)
        self.sock.send(packed_data)

    def reset(self):
        # Will generate fault codes, and say host command not
        # implemented, but seems to work all the same
        packer = struct.Struct("!cI")
        values = ("r", 0)
        packed_data = packer.pack(*values)
        self.sock.send(packed_data)

    def restart(self):
        packer = struct.Struct("!cI")
        values = ("u", 0)
        packed_data = packer.pack(*values)
        self.sock.send(packed_data)

    def health_summary(self):
        packer = struct.Struct("!cI")
        values = ("h", 0)
        packed_data = packer.pack(*values)
        self.sock.send(packed_data)

    def statistics(self): #test this one, has response
        packer = struct.Struct("!cI")
        values = ("n", 0)
        packed_data = packer.pack(*values)
        self.sock.send(packed_data)


    #-------------------------------------------------------------------------#
    # Messages from scanner
    #-------------------------------------------------------------------------#
    def next(self):
        """Read next message from scanner
        """
        message_id = self.sock.recv(1)
        #self.log.debug(self._message_type(message_id))
        if message_id == "I":
            return self._read_item_event()
        elif message_id == "B":
            return self._read_label_data()
        elif message_id == "P":
            return self._read_image_data()
        elif message_id == "L":
            return self._read_item_location()
        elif message_id == "V":
            return self._read_item_volume()
        elif message_id == "T":
            return self._read_item_tracking()
        elif message_id == "F":
            return self._read_fault_event()
        elif message_id == "A":
            return self._read_acknowledgement()
        elif message_id == "U":
            return self._read_unimplemented(message_id)
        elif message_id == "O":
            return self._read_unimplemented(message_id)
        elif message_id == "C":
            return self._read_unimplemented(message_id)
        elif message_id == "K":
            return self._read_unimplemented(message_id)
        elif message_id == "H":
            return self._read_health()
        elif message_id == "G":
            return self._read_unimplemented(message_id)
        elif message_id == "N":
            return self._read_statistics()
        elif message_id == "X":
            return self._read_check_connection()
        else:
            #ignore everything else; there should be nothing else
            return self._read_unimplemented(message_id)

    def _read_item_event(self):
        message_length = self._int(self.sock.recv(4))
        item_id = self._int(self.sock.recv(4))
        item_flags = self._int(self.sock.recv(4))
        item_source = self._single_byte_int(self.sock.recv(1))
        remaining = message_length - 9
        pairs = remaining / 2
        additional_messages = {}
        for pair in range(pairs):
            message_id = self._message_type(
                chr(self._single_byte_int(self.sock.recv(1)))
            )
            count = self._single_byte_int(self.sock.recv(1))
            additional_messages[message_id] = count
        return {"message_type": self._message_type("I"), "item_id": item_id,
                "additional_messages": additional_messages }

    def _read_label_data(self):
        message_length = self._int(self.sock.recv(4))
        item_id = self._int(self.sock.recv(4))
        image_id = self._int(self.sock.recv(4)) #discard
        format_code = self._single_byte_int(self.sock.recv(1))
        length = self._short(self.sock.recv(2)) #discard
        barcode = unicode(self._read_bytes(message_length - 11))
        if format_code != 1:
            self.log.debug("Invalid barcode format!")
        return {"message_type": self._message_type("B"),
                "item_id": item_id, "format_code": format_code,
                "barcode": barcode}

    def _read_image_data(self):
        message_length = self._int(self.sock.recv(4))
        image_id = self._int(self.sock.recv(4))
        orientation = self._single_byte_int(self.sock.recv(1))
        image_format = self._single_byte_int(self.sock.recv(1))
        data = base64.b64encode(self._read_bytes(message_length - 6))
        return {"message_type":self._message_type("P"),
                "image_id": image_id, "orientation": orientation,
                "format": image_format, "data": data}

    def _read_item_location(self):
        message_length = self._int(self.sock.recv(4))
        item_id = self._int(self.sock.recv(4))
        image_id = self._int(self.sock.recv(4))
        loc_model_type = self._single_byte_int(self.sock.recv(1))
        loc_model_length = self._short(self.sock.recv(2))
        if loc_model_type == 0:
            #rectangle
            top_left_x = self._short(self.sock.recv(2))
            top_left_y = self._short(self.sock.recv(2))
            bot_right_x = self._short(self.sock.recv(2))
            bot_right_y = self._short(self.sock.recv(2))
        elif loc_model_type == 1:
            #polygon
            #we can't use it, and have never seen it, so skip it
            top_left_x = 0
            top_left_y = 0
            bot_right_x = 0
            bot_right_y = 0
            log.debug("Item location with polygon model")
            discard = self.sock.recv(loc_model_length)
        else:
            #not available, see spec section 3.3.5
            top_left_x = 0
            top_left_y = 0
            bot_right_x = 0
            bot_right_y = 0
            log.debug("Item location with none type model")
        return {"message_type":self._message_type("L"),
                "top_left_x": top_left_x, "top_left_y": top_left_y,
                "bot_right_x": bot_right_x, "bot_right_y": bot_right_y,}

    def _read_item_volume(self):
        message_length = self._int(self.sock.recv(4))
        item_id = self._int(self.sock.recv(4))
        low_vol = self._int(self.sock.recv(4))
        hi_vol = self._int(self.sock.recv(4))
        vol_model_type = self._single_byte_int(self.sock.recv(1))
        vol_model_length = self._short(self.sock.recv(2))
        if vol_model_type == 0:
            #rectangle
            item_length = self._short(self.sock.recv(2))
            item_width = self._short(self.sock.recv(2))
            item_height = self._short(self.sock.recv(2))
        elif vol_model_type == 1:
            #Cylinder
            item_diameter = self._short(self.sock.recv(2))
            item_length = self._short(self.sock.recv(2))
            #"cubify" it
            item_width = item_diameter
            item_height = item_diameter
        else:
            log.debug("invalid item volume model type")
        return {"message_type":self._message_type("V"),
                "item_id": item_id, "length": item_length,
                "width": item_width, "height": item_height}

    def _read_item_tracking(self):
        message_length = self.sock.recv(4)
        item_id = self._int(self.sock.recv(4))
        position = self._short(self.sock.recv(2))
        length = self._short(self.sock.recv(2))
        speed = self._short(self.sock.recv(2))
        return {"message_type":self._message_type("T"),
                "item_id": item_id, "position": position,
                "length": length, "speed": speed}

    def _read_fault_event(self):
        fault_types = ["Fatal", "Severe", "Warning", "Info", "System"]
        message_length = self.sock.recv(4)
        fault_type = self._single_byte_int(self.sock.recv(1))
        fault_length = self._short(self.sock.recv(2))
        fault_data = self._read_bytes(fault_length)
        return {"message_type":self._message_type("F"), "data": fault_data,
                "severity": fault_types[fault_type]}

    def _read_acknowledgement(self):
        message_length = self.sock.recv(4)
        ok_no = self.sock.recv(2)
        extra_length = self._short(self.sock.recv(2))
        if extra_length > 0:
            extra = self._read_bytes(extra_length)
        else:
            extra = ""
        return {"message_type":self._message_type("A"),
                "ok": ok_no, "data": extra}

    def _read_health(self):
        message_length = self._int(self.sock.recv(4))
        health_summary = self._read_bytes(message_length)
        return {"message_type": self._message_type("H"),
                "health_summary": unicode(health_summary)}

    def _read_statistics(self):
        message_length = self._int(self.sock.recv(4))
        stats = self._read_bytes(message_length)
        return {"message_type": self._message_type("N"),
                "stats": unicode(stats) }

    def _read_check_connection(self):
        message_length = self._int(self.sock.recv(4))
        timeout_limit = self._int(self.sock.recv(4))
        return {"message_type": self._message_type("X"),
                "timeout": timeout_limit}


    #-------------------------------------------------------------------------#
    # Helper functions
    #-------------------------------------------------------------------------#
    def _message_type(self, message_id):
        return self.message_types.get(message_id, "Unknown Message Type")

    def _read_unimplemented(self, message_id):
        message_length = self._int(self.sock.recv(4))
        discard = self._read_bytes(message_length)
        return {"message_type": self._message_type(message_id),
                "message": "Parsing for this type is not implemented"}

    def _read_bytes(self, message_length):
        message = ""
        while len(message) < message_length:
            chunk = self.sock.recv(min((message_length - len(message)), self.MAX_BUFFER_SIZE))
            if chunk == '':
                raise RuntimeError("socket connection broken")
            message = message + chunk
        return message


    #-------------------------------------------------------------------------#
    # Utils
    #-------------------------------------------------------------------------#
    def receive_forever(self):
        """Intended for Debugging"""
        while True:
            message = self.next()
            if message and message.get("message_type", False) == "Label Data":
                print message["barcode"][3:]

A small Mithril.js based app that was part of a WP theme generator.

var section = {};

section.Section = function(data) {
    this.name = m.prop(data.name);
    this.id = data.id;
};

section.Template = function(data) {
    this.name = m.prop(data.name);
};

section.SectionList = Array;
section.Theme = Array;

section.vm = (function() {
    var vm, counter;
    vm = {};
    counter = 0;
    vm.init = function() {
        vm.makeId = function () {
            counter += 1;
            return counter;
        };
        vm.theme = new section.Theme();
        vm.list = new section.SectionList();
        vm.template = new section.Template({name: ""});

        vm.add = function() {
            vm.list.push(new section.Section({name: "", id: vm.makeId()}));
        };

        vm.add();

        vm.next = function () {
            vm.theme.push({
                template: vm.template,
                sections: vm.list
            });
            vm.list = new section.SectionList();
            vm.template = new section.Template({name: ""});
            vm.add();
        };

        vm.getQueryVariable = function(variable) {
            var query = window.location.search.substring(1);
            var vars = query.split("&");
            for (var i=0;i<vars.length;i++) {
                var pair = vars[i].split("=");
                if(pair[0] == variable){return pair[1];}
            }
            return(false);
        };

        vm.save = function() {
            vm.next();
            var spinner = $('#spinner');
            spinner.show();
            $.post("/api/v1/create",
                   JSON.stringify({name: vm.getQueryVariable('project'),
                                   header: vm.getQueryVariable('header'),
                                   footer: vm.getQueryVariable('footer'),
                                   primary: decodeURIComponent(vm.getQueryVariable('primary')),
                                   secondary: decodeURIComponent(vm.getQueryVariable('secondary')),
                                   button: vm.getQueryVariable('button'),
                                   font: decodeURIComponent(vm.getQueryVariable('font')).replace('+', ' '),
                                   theme: vm.theme}),
                   function () {
                       spinner.fadeOut("slow");
                       window.location = "/";
                   });
        };
    };
    return vm;
}());

section.controller = function() {
    section.vm.init();
};

section.view = function() {
    return m("div", {class: "row"}, [
        m("div", {class: "col-sm-offset-4 col-sm-4"}, [
            m("form", [
                m("div", {class: "form-group"}, [
                    m("label", [m("h2", "Template Name")]),
                    m("input",
                      {class: "form-control", placeholder: "Template Name",
                       autofocus: "focus",
                       onchange: m.withAttr("value", section.vm.template.name),
                       value: section.vm.template.name()}),
                    m("p", {class: "help-block"}, "Example block-level help text here.")
                ]),
                m("div", {id: "sections"}, [
                    m("h2", "Sections"),
                    section.vm.list.map(function(aSection, index) {
                        return m("div", {class: "form-group"}, [
                            m("select",
                              {class: "form-control", key: aSection.id,
                               onchange: m.withAttr("value", aSection.name)}, [
                                   m("option", {value: ""}, ""),
                                   m("option", {value: "Contact"}, "Contact"),
                                   m("option", {value: "Content Block"}, "Content Block"),
                                   m("option", {value: "Content Slider"}, "Content Slider"),
                                   m("option", {value: "Content Thumbs"}, "Content Thumbs"),
                                   m("option", {value: "Four Squares"}, "Four Squares"),
                                   m("option", {value: "Gallery"}, "Gallery"),
                                   m("option", {value: "Hero Image"}, "Hero Image"),
                                   m("option", {value: "Hero Slider"}, "Hero Slider"),
                                   m("option", {value: "Icons"}, "Icons"),
                                   m("option", {value: "Masonry"}, "Masonry"),
                                   m("option", {value: "Quote"}, "Quote"),
                                   m("option", {value: "Team"}, "Team"),
                                   m("option", {value: "Text"}, "Text"),
                                   m("option", {value: "Z Pattern"}, "Z Pattern")
                              ])
                        ]);
                    }),
                    m("button",
                      {type: "button",
                       onclick: section.vm.add,
                       class: "btn btn-default btn-lg pull-right"}, [
                           m("span", {class: "glyphicon glyphicon-plus"}),
                           " Add Another"])
                ]),
                m("div", {class: "form-group create"}, [
                    m("button",
                      {type: "button", onclick: section.vm.next,
                       class: "btn btn-default btn-success btn-block btn-lg"},
                      "Next Template")]),
                m("div", {class: "form-group create"}, [
                    m("button",
                      {type: "button", onclick: section.vm.save,
                       class: "btn btn-default btn-warning btn-block btn-lg"},
                      "Finish & Generate Theme")])
            ])
        ])
    ]);
};

m.mount(document.getElementById("app"),
        {controller: section.controller, view: section.view});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment