Skip to content

Instantly share code, notes, and snippets.

import asyncio
import os
import re
import socket
import socks
import sys
import urllib.request
from pyppeteer import launch
from pyppeteer.errors import TimeoutError
def traverse_json(json, predicater, value_getter, name=None):
if predicater(name, json):
yield value_getter(name, json)
if isinstance(json, list):
for obj in json:
# Use `yield from` instead of for loop in Python 3.x
for result in traverse_json(obj, predicater, value_getter):
yield result
@Xorcerer
Xorcerer / mp.py
Last active August 29, 2015 14:05
import os
from subprocess import Popen, PIPE
from bottle import route, run, template
dir = 'music'
files = os.listdir(dir)
music_player = None
# -*- coding: utf-8 -*-
import yaml
# Only export the class
__all__ = ['Message']
START_MARK = '{'
END_MARK = '}'
@Xorcerer
Xorcerer / enum.py
Last active August 29, 2015 14:02
3 ways to create enum in python 2.x
# -*- coding: utf-8 -*-
# 普通程序员
RED = 'red'
BLUE = 'blue'
# 文艺程序员
def create_enum(name, members):
return type(name, (object,), {m.upper(): m for m in members})
class NonNegativeDescriptor(object):
def __init__(self, name):
self.name = name
def __get__(self, obj, type_):
return obj.data.get(self.name, 0)
def __set__(self, obj, value):
if value < 0:
value = 0
var wait = require('wait.for');
var request = require("request");
function test(){
var resp = wait.for(request.get, "http://google.com");
console.log('Got google.com.');
var resp = wait.for(request.get,"http://yahoo.com");
console.log('Got yahoo.');
}
@Xorcerer
Xorcerer / event.py
Created April 11, 2014 07:58
A event implementation like boost::signal or C# event.
class AggregateException(Exception):
def __init__(self, *args, **kwargs):
super(self.__class__, self).__init__(*args, **kwargs)
self.sub_exceptions = []
def add(self, exception):
self.sub_exceptions.append(exception)
@property
def message(self):
@Xorcerer
Xorcerer / stream_tree.py
Created January 14, 2014 08:38
Stream tree.
class Converging(object):
def __init__(self, ds):
self.ds = ds
def diverge(self):
self.ds.ds = None
class Node(object):
def __init__(self, name):
self.ds = None
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Collections.Generic;
namespace TestSynchronizationContext
{
class SingleThreadSynchronizationContext : SynchronizationContext