Skip to content

Instantly share code, notes, and snippets.

@muendelezaji
Last active October 16, 2017 15:40
Show Gist options
  • Save muendelezaji/76666f54c59e229081e98ee2b68a10df to your computer and use it in GitHub Desktop.
Save muendelezaji/76666f54c59e229081e98ee2b68a10df to your computer and use it in GitHub Desktop.
Clean up traffic volume data from a range of sensors and parse into a more usable format
import os
import sys
from datetime import datetime, timedelta
from io import DEFAULT_BUFFER_SIZE
from re import compile as re_compile
from shutil import rmtree
# from time import strftime, strptime
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool, cpu_count # Pool() method not pool.Pool class
# avoid dot-lookups
strftime = datetime.strftime
strptime = datetime.strptime
# OUTPUT_DIR = 'output_{}.d'.format(strftime('%Y%m%d-%H%M%S'))
OUTPUT_DIR = 'output-py.d'
# To fix non-matching weekdays
WEEKDAY_MAP = ['Mo', 'Tu', 'We', 'Th', 'Fr', 'Sa', 'Su']
# Example row:
# We 02:55:45 M14 N60311H IVL 0016 OCC 0 LQ 0 BQ 0 EB0 LIT 0000
# primaryKey = (yearMonthDay, weekDay, time, sensorId, interval)
# wd, time, sensor, int(ivl), int(occ), int(lq), int(bq), bool(eb), int(lit, 2)
p_wd = '(Mo|Tu|We|Th|Fr|Sa|Su)'
p_time = '([012][0-9]:[0-5][0-9]:[0-5][0-9])'
p_sensor = 'M14 (N[0-9]{5}[A-Z])'
p_ivl = 'IVL ([0-9]{4})'
p_occ = 'OCC *([0-9]+)'
p_lq = 'LQ *(-?[0-9]+)'
p_bq = 'BQ *([0-9]+)'
p_eb = 'EB([01])'
p_lit = 'LIT ([01]{4})'
LINE_PATTERN = re_compile('{} {} {} {} {} {} {} {} {}'.format(
p_wd, p_time, p_sensor, p_ivl, p_occ, p_lq, p_bq, p_eb, p_lit))
def process_file(in_file):
filename = os.path.basename(in_file)
if not filename.endswith('.dat'):
print('Warning: Skipping non .dat file "{}"'.format(filename))
return
match_count, total_lines = 0, 0
out_file = os.path.join(OUTPUT_DIR, filename + '.csv')
with open(in_file) as input_fd, \
open(os.open(out_file, os.O_WRONLY | os.O_CREAT | os.O_NONBLOCK), 'w') as output_fd:
# get weekday
ymd = strptime(filename.split('.')[0], '%y%b%d')
ymd_next = strftime(ymd + timedelta(days=1), '%Y-%m-%d') # when sensor overruns into next day
# weekday = WEEKDAY_MAP[ymd.tm_wday] # OR: strftime('%a', ymd)[:2]
weekday = WEEKDAY_MAP[ymd.weekday()]
ymd = strftime(ymd, '%Y-%m-%d') # NOTE: swap arguments for time.strftime
# avoid dot-lookups
pattern_matches = LINE_PATTERN.match
write_out = output_fd.write
for line in input_fd:
total_lines += 1
matched = pattern_matches(line) # and line[:2] == weekday
if matched:
match_count += 1
ymd_str = ymd if line[:2] == weekday else ymd_next
write_out('{},{}\n'.format(ymd_str, ','.join(matched.groups())))
percent = 100 * match_count / total_lines
print('{}: {}/{} ({:.1f}%) lines matched'.format(filename, match_count, total_lines, percent))
def main():
args = sys.argv[1:]
if not args:
sys.exit('Error: No files or folders given. At least one required.')
# 1. enumerate files to be processed
# alt 1.1 – for loop
data_files = []
for arg in args:
if os.path.isfile(arg):
data_files += [arg]
elif os.path.isdir(arg):
# # alt 1.1a – shallow list dir
# data_files += [os.path.join(arg, f) for f in os.listdir(arg) if os.path.isfile(f)]
# alt 1.1b – recursive walk tree
for root, dirs, files in os.walk(arg):
data_files += [os.path.join(root, f) for f in files if f.endswith('.dat')]
else:
print('Warning: "{}" is not a file or folder. Skipping...'.format(arg))
# # alt 1.2 – list comprehension
# data_files = [p for p in args if os.path.isfile(p)] \
# + [os.path.join(p, f) for p in args if os.path.isdir(p) for f in os.listdir(p)]
# create output directory
outdir = os.path.join(os.getcwd(), OUTPUT_DIR)
if os.path.exists(outdir):
rmtree(outdir, ignore_errors=True)
os.mkdir(outdir)
# 2. process the files
workers_count = os.cpu_count() << 1
# alt 2.1 – sequential
for f in data_files:
process_file(f)
# # alt 2.2 – concurrent
# with ProcessPoolExecutor(max_workers=workers_count) as executor:
# executor.map(process_file, data_files)
# # alt 2.3 – parallel
# with Pool(processes=workers_count) as pool:
# pool.map(process_file, data_files)
print('\nDone. Results written to {}'.format(OUTPUT_DIR))
if __name__ == '__main__':
import timeit
runtime = timeit.timeit(main, number=1)
# print('Time taken: {}s'.format(round(runtime, 3)))
# runtime = timeit.repeat('main()', 'from __main__ import main', number=1, repeat=3)
# print('Time taken (seconds): {}'.format(', '.join([str(round(t, 3)) for t in runtime])))
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.lang.StringBuilder;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.Files;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.FileVisitResult.CONTINUE;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static java.time.temporal.ChronoField.YEAR;
public class TrafficVolume {
// static final String OUTPUT_DIR = "output_" + strftime("%Y%m%d-%H%M%S");
static final String OUTPUT_DIR = "output-java.d";
// To fix non-matching weekdays
static final String[] WEEKDAY_MAP = { "Mo", "Tu", "We", "Th", "Fr", "Sa", "Su" };
// Example row:
// We 02:55:45 M14 N60311H IVL 0016 OCC 0 LQ 0 BQ 0 EB0 LIT 0000
// primaryKey = (yearMonthDay, weekDay, time, sensorId, interval)
// wd, time, sensor, int(ivl), int(occ), int(lq), int(bq), bool(eb), int(lit, 2)
static final String p_wd = "(Mo|Tu|We|Th|Fr|Sa|Su)";
static final String p_time = "([012][0-9]:[0-5][0-9]:[0-5][0-9])";
static final String p_sensor = "M14 (N[0-9]{5}[A-Z])";
static final String p_ivl = "IVL ([0-9]{4})";
static final String p_occ = "OCC *([0-9]+)";
static final String p_lq = "LQ *(-?[0-9]+)";
static final String p_bq = "BQ *([0-9]+)";
static final String p_eb = "EB([01])";
static final String p_lit = "LIT ([01]{4})";
static final Pattern LINE_PATTERN = Pattern.compile(
String.format("%s %s %s %s %s %s %s %s %s",
p_wd, p_time, p_sensor, p_ivl, p_occ, p_lq, p_bq, p_eb, p_lit));
static final DateTimeFormatter DATE_FORMATTER = new DateTimeFormatterBuilder()
.parseCaseInsensitive().appendValueReduced(YEAR, 2, 2, 1950)
.appendPattern("MMMdd").toFormatter();
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.out.format("Error: No files or folders given. At least one required.\n");
return;
}
// 1. enumerate files to be processed
List<Path> dataFiles = new ArrayList<Path>();
for (String arg : args) {
Path path = FileSystems.getDefault().getPath(arg);
if (Files.isRegularFile(path)) {
dataFiles.add(path);
} else if (Files.isDirectory(path)) {
// // alt 1a – shallow list dir
// Files.list(path.toRealPath())
// .filter(p -> Files.isRegularFile(p) && p.toString().endsWith(".dat"))
// .forEach(p -> dataFiles.add(p));
// alt 1b – recursive walk tree
Files.walk(path.toRealPath())
.filter(p -> Files.isRegularFile(p) && p.toString().endsWith(".dat"))
.forEach(p -> dataFiles.add(p));
} else {
System.out.format("Warning: \"%s\" is not a file or folder. Skipping...\n", path);
}
}
// create output directory
Path outDir = Paths.get(System.getProperty("user.dir"), OUTPUT_DIR);
if (Files.exists(outDir)) {
removeTree(outDir);
}
Files.createDirectory(outDir);
// process the files
int count = 0;
for (Path f : dataFiles) {
processFile(f);
}
System.out.format("\nDone. Results written to %s\n", OUTPUT_DIR);
}
static void processFile(Path inFile) throws IOException {
String filename = inFile.getFileName().toString();
if (!filename.endsWith(".dat")) {
System.out.format("Warning: Skipping non .dat file \"%s\"\n", filename);
return;
}
int matchCount = 0, totalLines = 0;
Path outFile = Paths.get(OUTPUT_DIR, filename + ".csv");
try (
BufferedReader br = Files.newBufferedReader(inFile);
BufferedWriter bw = Files.newBufferedWriter(outFile, UTF_8);
) {
LocalDate date = LocalDate.parse(filename.split("\\.")[0], DATE_FORMATTER);
// String weekday = date.format("E").substring(0, 2);
String weekday = WEEKDAY_MAP[date.getDayOfWeek().ordinal()];
String ymd = date.format(ISO_LOCAL_DATE);
Matcher matched;
String line;
while ((line = br.readLine()) != null) {
totalLines++;
matched = LINE_PATTERN.matcher(line);
if (matched.lookingAt()) { // && line.substring(0, 2).equals(weekday)
matchCount++;
bw.write(String.format("%s,%s", ymd, joinGroups(matched, ",")));
bw.newLine();
}
}
}
float percent = 100f * matchCount / totalLines;
System.out.format("%s: %d/%d (%.1f%%) lines matched\n", filename, matchCount, totalLines, percent);
}
static String joinGroups(Matcher matched, String sep) {
StringBuilder joined = new StringBuilder();
int i;
for (i = 1; i < matched.groupCount(); i++) {
joined.append(matched.group(i) + sep);
}
joined.append(matched.group(i));
return joined.toString();
}
static void removeTree(Path directory) {
try {
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return CONTINUE;
}
});
} catch (IOException e) {
// silent catch
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment