Skip to content

Instantly share code, notes, and snippets.

@Julien00859
Last active February 6, 2019 08:30
Show Gist options
  • Save Julien00859/422e00acc917b72f43428e65710c191e to your computer and use it in GitHub Desktop.
Save Julien00859/422e00acc917b72f43428e65710c191e to your computer and use it in GitHub Desktop.
Sorting algos
error.json
__pycache__/
#!/usr/bin/env python3
import json
import logging
from argparse import ArgumentParser
from collections import defaultdict, ChainMap
from copy import copy
from math import floor, ceil, log
from os.path import isfile
from pprint import pformat
from random import randrange, shuffle, sample
from string import ascii_lowercase
from sorting_algorithms import algorithms, RestrictionError
from checks import is_sorted, is_permutation_of
def main():
global algorithms
# Get options from the command line
parser = ArgumentParser()
parser.add_argument("sizes", type=int, nargs="*", default=[128],
help="Array sizes (default: [128])")
parser.add_argument("-a", "--algo", dest="algos", action="append",
help="Sorting algorithms to use (default: all)")
parser.add_argument("-s", "--notalgo", dest="notalgos", action="append",
help="Sorting algorithms to not use (default: none)")
parser.add_argument("-i", "--integer", dest="integer", action="store_true", default=False,
help="Use intergers instead of strings")
parser.add_argument("--linear", dest="linear", action="store_true", default=False,
help="Shuffle a range of elements")
parser.add_argument("--bounded", dest="bounded", action="store_true", default=False,
help="Randomly select values from -size/2 to size/2")
parser.add_argument("--save", action="store_true", default=False,
help="Save times on disk")
parser.add_argument("--array", action="store",
help="Use this specific json array instead of a random one")
parser.add_argument("-v", "--verbose", dest="verbosity", action="count", default=0,
help="Increase verbosity")
options = parser.parse_args()
# Setup logging
handler = logging.StreamHandler()
handler.terminator = ""
logging.basicConfig(
format="%(message)s",
level=["WARNING", "INFO", "DEBUG"][min(2, options.verbosity)],
handlers=[handler])
logging.debug("Options: %s\n\n", pformat(options.__dict__))
# Filter algorithms
logging.debug("Filtering algorithms... ")
if options.algos:
algorithms = list(filter(lambda f: f.__name__ in options.algos, algorithms))
if options.notalgos:
algorithms = list(filter(lambda f: f.__name__ not in options.notalgos, algorithms))
logging.debug("ok\n")
if options.array:
rnd_array = json.loads(options.array)
options.sizes = [len(rnd_array)]
results = defaultdict(dict)
for size in options.sizes:
# Create and shuffle the array using given length and options
logging.info("\nArray size: %d\n", size)
if not options.array:
logging.info("Creating and shuffling array... ")
if options.linear:
rnd_array = list(range(floor(-size / 2), floor(size / 2)))
shuffle(rnd_array)
elif options.bounded:
rnd_array = [randrange(floor(-size / 2), floor(size / 2)) for _ in range(size)]
elif options.integer:
rnd_array = [randrange(-size, size) for _ in range(size)]
else:
length = ceil(log(size, 26))
rnd_array = ["".join(sample(ascii_lowercase, length)) for _ in range(size)]
logging.info("ok, ")
if size >= 5:
sample_slice = rnd_array[:2] + ["..."] + rnd_array[-2:]
logging.info("[%s]\n", ", ".join(map(str, sample_slice)))
else:
logging.info("[%s]\n", ", ".join(map(str, rnd_array)))
logging.info("\n=====\n")
for func in algorithms:
# Sort the array using selected algorithm
logging.warning("\nAlgo: %s\n", func.__name__)
logging.info("Copying... ")
tmp = copy(rnd_array)
logging.info("ok\n")
logging.warning("Sorting... ")
try:
sorted_array, time_elapsed = func(tmp, size)
except RestrictionError as e:
logging.warning("%s\n" % e.args[0])
else:
logging.warning("done, tooks %.4f seconds\n", time_elapsed)
logging.info("Validating... ")
if is_sorted(sorted_array) and is_permutation_of(rnd_array, sorted_array):
logging.info("ok\n")
results[func.__name__]["{: 7d}".format(size)] = time_elapsed
else:
logging.info("error\n")
with open("error.json", "a") as file:
json.dump({"random": rnd_array, "sorted": sorted_array}, file)
file.write("\n")
logging.warning("An error occured with algo %s."
"A full report has been written to \"error.json\"\n",
func.__name__)
# If saving the output is not required, exit now
if not options.save:
return
if isfile("results.json"):
# Merge previous results with new ones
logging.info("Loading previous results... ")
with open("results.json") as jsonfile:
old_results = json.load(jsonfile)
logging.info("ok\n")
logging.info("Merging new results with previous ones... ")
for algo in results.keys() | old_results.keys():
results[algo] = dict(ChainMap(
results.get(algo, {}),
old_results.get(algo, {})))
logging.info("ok\n")
else:
logging.warning("File \"results.json\" not found\n")
# Save results to disk
logging.info("Saving to disk... ")
with open("results.json", "w") as jsonfile:
json.dump(results, jsonfile, indent=2, sort_keys=True)
logging.info("ok\n")
if __name__ == "__main__":
main()

Sorting algorithms

This project implements some sorting algorithms in python.

Wikipedia

usage: sorting_algo [-h] [-a ALGOS] [-s NOTALGOS] [-i] [--linear] [--bounded]
                    [--save] [--array ARRAY] [-v]
                    [sizes [sizes ...]]

positional arguments:
  sizes                 Array sizes (default: [128])

optional arguments:
  -h, --help            show this help message and exit
  -a ALGOS, --algo ALGOS
                        Sorting algorithms to use (default: all)
  -s NOTALGOS, --notalgo NOTALGOS
                        Sorting algorithms to not use (default: none)
  -i, --integer         Use intergers instead of strings
  --linear              Shuffle a range of elements
  --bounded             Randomly select values from -size/2 to size/2
  --save                Save times on disk
  --array ARRAY         Use this specific json array instead of a random one
  -v, --verbose         Increase verbosity

Implemented so far:

  • Bogo
  • Time
  • Bubble
  • Selection
  • Insertion
    • Shifting
    • Swapping
  • Shell
  • Merge
    • Recursive
    • Iterative
  • Quick
    • Recursive
  • Heap
  • Counting

Planned

  • Radix
from typing import List, Any
from itertools import tee
from collections import Counter
def is_sorted(a: List[int]):
a_it, b_it = tee(a)
next(b_it, None)
for x, y in zip(a_it, b_it):
if x > y:
return False
return True
def is_permutation_of(a: List[Any], b: List[int]):
return Counter(a) == Counter(b)
def is_linear(a: List[int], n: int):
m1 = min(a)
m2 = min(a, key=lambda x: x if x != m1 else float('inf'))
M = max(a)
return m2 - m1 == 1 and M - m1 == n - 1 and len(set(a)) == len(a)
def is_bounded(a: List[int], n: int):
return max(a) - min(a) <= n - 1
def is_numeric(a: List[Any], n: int):
return all(map(lambda x: isinstance(x, (int, float)), a))
#!/usr/bin/python3
from collections import UserList
class Heap(UserList):
def __init__(self, array):
super().__init__(array)
for i in range(len(self)):
self.sift_down(len(self) - i - 1, len(self))
def swap(self, i, j):
self[i], self[j] = self[j], self[i]
def sift_up(self, node, size):
"""
Compare self, neighbore and parent to find the biggest node,
if the biggest is one of the children, that child is swap
with its parent and then this function is call recurcively
to sort the swapped node against its neighore and parent.
"""
if not node:
return
parent = (node - 1) // 2
if self[node] > self[parent]:
self.swap(parent, node)
self.sift_up(parent, size)
def sift_down(self, parent, size):
"""
Compare self and children to find the smallest node,
if the smallest is one of the children, that child is swap
with its parent and then this function is call recurcively
to sort the swapped node against its children.
"""
lchild = parent * 2 + 1
rchild = lchild + 1
if lchild >= size:
# no children
pass
elif rchild >= size:
# just one child, comp self and child
if self[lchild] > self[parent]:
self.swap(parent, lchild)
else:
# two children, comp self and children
bigchild = max(lchild, rchild, key=self.__getitem__)
if self[bigchild] > self[parent]:
self.swap(parent, bigchild)
self.sift_down(bigchild, size)
def sort(self):
for i in range(len(self) - 1, -1, -1):
self.swap(0, i)
self.sift_down(0, i)
def __repr__(self):
return "<Heap [{}]>".format(", ".join(self.data))
{
"bubble": {
"100": 0.0027418136596679688,
"1000": 0.002859354019165039,
"10000": 0.2062089443206787,
"100000": 24.63749623298645
},
"heap": {
"100": 0.0020911693572998047,
"1000": 0.02652144432067871,
"10000": 0.01609635353088379,
"100000": 0.07877516746520996,
"1000000": 2.1845967769622803,
"10000000": 16.750819444656372
},
"insertion_shifting": {
"100": 0.001024007797241211,
"1000": 0.0012154579162597656,
"10000": 0.03629660606384277,
"100000": 3.328697681427002
},
"insertion_swapping": {
"100": 0.0011451244354248047,
"1000": 0.0012369155883789062,
"10000": 0.046155691146850586,
"100000": 4.611931085586548
},
"merge_iterative": {
"100": 0.0005395412445068359,
"1000": 0.009482622146606445,
"10000": 0.011868000030517578,
"100000": 0.03788590431213379,
"1000000": 0.45288825035095215,
"10000000": 5.395555257797241
},
"merge_recurcive": {
"100": 0.00032711029052734375,
"1000": 0.007157325744628906,
"10000": 0.037772178649902344,
"100000": 0.10283708572387695,
"1000000": 1.0644819736480713,
"10000000": 10.629281520843506
},
"quick_recurcive": {
"100": 0.0003337860107421875,
"1000": 0.011045455932617188,
"10000": 0.04933285713195801,
"100000": 0.14489364624023438,
"1000000": 0.7203609943389893,
"10000000": 6.571626424789429
},
"radix_lsd": {
"100": 0.000457763671875,
"1000": 0.003802061080932617,
"10000": 0.007916927337646484,
"100000": 0.06836724281311035,
"1000000": 0.8161497116088867,
"10000000": 9.620404481887817
},
"selection": {
"100": 0.001398324966430664,
"1000": 0.003341197967529297,
"10000": 0.07923698425292969,
"100000": 7.562659978866577
},
"shell_using_shell_sequence": {
"100": 0.0005040168762207031,
"1000": 0.01676344871520996,
"10000": 0.010799407958984375,
"100000": 0.1643354892730713,
"1000000": 2.9832465648651123,
"10000000": 42.93825912475586
},
"shell_using_tokuda_sequence": {
"100": 0.0004036426544189453,
"1000": 0.0014111995697021484,
"10000": 0.009624481201171875,
"100000": 0.10033249855041504,
"1000000": 1.5035054683685303,
"10000000": 17.692376613616943
}
}
#!/usr/bin/env python3
from functools import partial, wraps
from itertools import takewhile, count, permutations, chain
from math import ceil, floor
from operator import gt, le, methodcaller
from threading import Thread, Event
from time import time, sleep as sleep_
from typing import List, Any
from checks import is_numeric, is_bounded, is_linear, is_sorted
from heap import Heap
class RestrictionError(Exception):
def __init__(self, function):
return super().__init__("restricted by %s" % function.__name__)
pass
algorithms = []
def register(*checks):
def wrapper(func):
@wraps(func)
def wrapped(*args, **kwargs):
for check in checks:
if not check(*args, **kwargs):
raise RestrictionError(check)
before = time()
return func(*args, **kwargs), time() - before
algorithms.append(wrapped)
return func # Don't change the function itself
return wrapper
@register()
def bogo(a: List[Any], n: int):
"""Generate all the permutations of ``a``, returns the sorted one"""
for perm in permutations(a, n):
if is_sorted(perm):
return perm
@register(is_numeric)
def sleep(a: List[int], n: int):
"""Sleep the time of each value to append the value to a new list"""
m = min(a)
b = []
threads = []
start = Event()
def wait(x):
start.wait()
sleep_((x - m) / 100)
b.append(x)
for x in a:
th = Thread(target=wait, args=(x,))
th.start()
threads.append(th)
start.set()
for th in threads:
th.join()
return b
@register()
def bubble(a: List[Any], n: int):
"""Swap each neighbore until they are all sorted"""
for i in range(n):
for j in range(n - i - 1):
if a[j] > a[j + 1]:
a[j], a[j + 1] = a[j + 1], a[j]
return a
@register()
def insertion_swapping(a: List[Any], n: int):
"""Move each value to the left until this value is sorted"""
for i in range(1, n):
while i and a[i - 1] > a[i]:
a[i], a[i - 1] = a[i - 1], a[i]
i -= 1
return a
@register()
def insertion_shifting(a: List[Any], n: int):
"""Move each value to the left until this value is sorted"""
for i in range(1, n):
value = a[i]
j = i
while j > 0 and a[j - 1] > value:
a[j] = a[j - 1]
j -= 1
a[j] = value
return a
@register()
def selection(a: List[Any], n: int):
"""Find smallest value and swap it with the i-th value"""
for i in range(n):
smallest = i
for j in range(i + 1, n):
if a[j] < a[smallest]:
smallest = j
a[i], a[smallest] = a[smallest], a[i]
return a
@register()
def heap(a: List[Any], n: int):
"""Sequentially heapify the list and pop the largest element"""
h = Heap(a)
h.sort()
return h
def shell(a: List[Any], n: int, gaps: List[int]):
""":found on internet:"""
for gap in gaps:
for i in range(gap, n):
tmp = a[i]
while i >= gap and a[i - gap] > tmp:
a[i] = a[i - gap]
i -= gap
a[i] = tmp
return a
@register()
def shell_using_shell_sequence(a: List[Any], n: int):
seq_func = lambda k: floor(n / 2 ** (k + 1))
gaps = takewhile(partial(le, 1), map(seq_func, count()))
return shell(a, n, gaps)
@register()
def shell_using_tokuda_sequence(a: List[Any], n: int):
seq_func = lambda k: ceil((9 * (9/4) ** k - 4) / 5)
gaps = takewhile(partial(gt, n), map(seq_func, count()))
return shell(a, n, reversed(list(gaps)))
@register()
def merge_iterative(a: List[Any], n: int):
if n <= 1:
return a
tmp = list()
def fusion(lower, half, upper):
aa = a
i = lower
j = half
while i < half and j < upper:
if aa[i] < aa[j]:
tmp.append(aa[i])
i += 1
else:
tmp.append(aa[j])
j += 1
tmp.extend(aa[i:half])
tmp.extend(aa[j:upper])
for i in range(upper - 1, lower - 1, -1):
aa[i] = tmp.pop()
half_steps = 1
steps = half_steps << 1
while steps < n:
for lower in range(0, n - steps, steps):
fusion(lower, lower + half_steps, lower + steps)
lower += steps
fusion(lower, lower + half_steps, n)
half_steps = steps
steps <<= 1
fusion(0, half_steps, n)
return a
@register()
def quick_recurcive(a: List[Any], n: int):
if n <= 1:
return a
elif n == 2:
if a[0] > a[1]:
a[0], a[1] = a[1], a[0]
lowers = []
lowers_cnt = 0
equals = []
highers = []
highers_cnt = 0
pivot = a[floor(n / 2)]
for i in range(n):
if a[i] < pivot:
lowers.append(a[i])
lowers_cnt += 1
elif a[i] > pivot:
highers.append(a[i])
highers_cnt += 1
else:
equals.append(a[i])
b = quick_recurcive(lowers, lowers_cnt)
b.extend(equals)
b.extend(quick_recurcive(highers, highers_cnt))
return b
@register()
def merge_recurcive(a: List[Any], n: int):
if n <= 1:
pass
elif n == 2:
if a[0] > a[1]:
a[0], a[1] = a[1], a[0]
else:
half = floor(n / 2)
a1 = merge_recurcive(a[:half], half)
a2 = merge_recurcive(a[half:], n - half)
i = 0
j = 0
while i < half and j < (n - half):
if a1[i] < a2[j]:
a[i + j] = a1[i]
i += 1
else:
a[i + j] = a2[j]
j += 1
while i < half:
a[i + j] = a1[i]
i += 1
while j < (n - half):
a[i + j] = a2[j]
j += 1
return a
@register(is_numeric)
def radix_lsd(a: List[int], n: int):
"""Sort by sorting units then decades then hundreds..."""
length = max(map(methodcaller('bit_length'), a))
# Binary-wise unit is the least significant bit, decades is
# the second lsb and so on. We use a window to select the
# current bits (two at a time to be a bit faster) and then
# shift the &-ed value. The resulted values can either be
# 00, 01, 10 or 11, we use that value as the array index
# where to store the number.
for i in range(0, length + 1, 2):
units_selector = 0b11 << i # Move the selector to select the two next bits
units_values = ([], [], [], []) # 2 bits => 4 possible values
for j in range(len(a)):
value = a[j]
selection = value & units_selector # Select the bits
unit = selection >> i # Shift the selection to have 00, 01, 10 or 11
units_values[unit].append(value) # Append the value to the corresponding slot
a = list(chain(*units_values)) # Merge the lists, values are sorted on all the
# previous bits
# In two-complement, values with 1 as most-significant bit
# are negatives and values with 0 as msb are positives.
# Due to that, negatives values show after positives one,
# the following code place the windows on one sign bit to
# replace negatives values before.
window = 1 << length + 1
sign = ([], [])
for j in range(len(a)):
sign[(a[j] & window) >> length + 1].append(a[j])
return [*sign[1], *sign[0]]
@register(is_numeric, is_bounded)
def counting(a: List[int], n: int):
b = [0] * n
m = min(a)
for i in range(n):
b[a[i] - m] += 1
i = 0
for idx, cnt in enumerate(b):
for _ in range(cnt):
a[i] = idx + m
i += 1
return a
@register(is_numeric, is_linear)
def assignment(a: List[int], n: int):
b = [0] * n
m = min(a)
for e in a:
b[e - m] = e
return b
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment