Last active
October 16, 2017 15:40
-
-
Save muendelezaji/76666f54c59e229081e98ee2b68a10df to your computer and use it in GitHub Desktop.
Clean up traffic volume data from a range of sensors and parse into a more usable format
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
from datetime import datetime, timedelta | |
from io import DEFAULT_BUFFER_SIZE | |
from re import compile as re_compile | |
from shutil import rmtree | |
# from time import strftime, strptime | |
from concurrent.futures import ProcessPoolExecutor | |
from multiprocessing import Pool, cpu_count # Pool() method not pool.Pool class | |
# avoid dot-lookups | |
strftime = datetime.strftime | |
strptime = datetime.strptime | |
# OUTPUT_DIR = 'output_{}.d'.format(strftime('%Y%m%d-%H%M%S')) | |
OUTPUT_DIR = 'output-py.d' | |
# To fix non-matching weekdays | |
WEEKDAY_MAP = ['Mo', 'Tu', 'We', 'Th', 'Fr', 'Sa', 'Su'] | |
# Example row: | |
# We 02:55:45 M14 N60311H IVL 0016 OCC 0 LQ 0 BQ 0 EB0 LIT 0000 | |
# primaryKey = (yearMonthDay, weekDay, time, sensorId, interval) | |
# wd, time, sensor, int(ivl), int(occ), int(lq), int(bq), bool(eb), int(lit, 2) | |
p_wd = '(Mo|Tu|We|Th|Fr|Sa|Su)' | |
p_time = '([012][0-9]:[0-5][0-9]:[0-5][0-9])' | |
p_sensor = 'M14 (N[0-9]{5}[A-Z])' | |
p_ivl = 'IVL ([0-9]{4})' | |
p_occ = 'OCC *([0-9]+)' | |
p_lq = 'LQ *(-?[0-9]+)' | |
p_bq = 'BQ *([0-9]+)' | |
p_eb = 'EB([01])' | |
p_lit = 'LIT ([01]{4})' | |
LINE_PATTERN = re_compile('{} {} {} {} {} {} {} {} {}'.format( | |
p_wd, p_time, p_sensor, p_ivl, p_occ, p_lq, p_bq, p_eb, p_lit)) | |
def process_file(in_file): | |
filename = os.path.basename(in_file) | |
if not filename.endswith('.dat'): | |
print('Warning: Skipping non .dat file "{}"'.format(filename)) | |
return | |
match_count, total_lines = 0, 0 | |
out_file = os.path.join(OUTPUT_DIR, filename + '.csv') | |
with open(in_file) as input_fd, \ | |
open(os.open(out_file, os.O_WRONLY | os.O_CREAT | os.O_NONBLOCK), 'w') as output_fd: | |
# get weekday | |
ymd = strptime(filename.split('.')[0], '%y%b%d') | |
ymd_next = strftime(ymd + timedelta(days=1), '%Y-%m-%d') # when sensor overruns into next day | |
# weekday = WEEKDAY_MAP[ymd.tm_wday] # OR: strftime('%a', ymd)[:2] | |
weekday = WEEKDAY_MAP[ymd.weekday()] | |
ymd = strftime(ymd, '%Y-%m-%d') # NOTE: swap arguments for time.strftime | |
# avoid dot-lookups | |
pattern_matches = LINE_PATTERN.match | |
write_out = output_fd.write | |
for line in input_fd: | |
total_lines += 1 | |
matched = pattern_matches(line) # and line[:2] == weekday | |
if matched: | |
match_count += 1 | |
ymd_str = ymd if line[:2] == weekday else ymd_next | |
write_out('{},{}\n'.format(ymd_str, ','.join(matched.groups()))) | |
percent = 100 * match_count / total_lines | |
print('{}: {}/{} ({:.1f}%) lines matched'.format(filename, match_count, total_lines, percent)) | |
def main(): | |
args = sys.argv[1:] | |
if not args: | |
sys.exit('Error: No files or folders given. At least one required.') | |
# 1. enumerate files to be processed | |
# alt 1.1 – for loop | |
data_files = [] | |
for arg in args: | |
if os.path.isfile(arg): | |
data_files += [arg] | |
elif os.path.isdir(arg): | |
# # alt 1.1a – shallow list dir | |
# data_files += [os.path.join(arg, f) for f in os.listdir(arg) if os.path.isfile(f)] | |
# alt 1.1b – recursive walk tree | |
for root, dirs, files in os.walk(arg): | |
data_files += [os.path.join(root, f) for f in files if f.endswith('.dat')] | |
else: | |
print('Warning: "{}" is not a file or folder. Skipping...'.format(arg)) | |
# # alt 1.2 – list comprehension | |
# data_files = [p for p in args if os.path.isfile(p)] \ | |
# + [os.path.join(p, f) for p in args if os.path.isdir(p) for f in os.listdir(p)] | |
# create output directory | |
outdir = os.path.join(os.getcwd(), OUTPUT_DIR) | |
if os.path.exists(outdir): | |
rmtree(outdir, ignore_errors=True) | |
os.mkdir(outdir) | |
# 2. process the files | |
workers_count = os.cpu_count() << 1 | |
# alt 2.1 – sequential | |
for f in data_files: | |
process_file(f) | |
# # alt 2.2 – concurrent | |
# with ProcessPoolExecutor(max_workers=workers_count) as executor: | |
# executor.map(process_file, data_files) | |
# # alt 2.3 – parallel | |
# with Pool(processes=workers_count) as pool: | |
# pool.map(process_file, data_files) | |
print('\nDone. Results written to {}'.format(OUTPUT_DIR)) | |
if __name__ == '__main__': | |
import timeit | |
runtime = timeit.timeit(main, number=1) | |
# print('Time taken: {}s'.format(round(runtime, 3))) | |
# runtime = timeit.repeat('main()', 'from __main__ import main', number=1, repeat=3) | |
# print('Time taken (seconds): {}'.format(', '.join([str(round(t, 3)) for t in runtime]))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.BufferedReader; | |
import java.io.BufferedWriter; | |
import java.io.IOException; | |
import java.lang.StringBuilder; | |
import java.nio.file.attribute.BasicFileAttributes; | |
import java.nio.file.Files; | |
import java.nio.file.FileSystems; | |
import java.nio.file.FileVisitResult; | |
import java.nio.file.Path; | |
import java.nio.file.Paths; | |
import java.nio.file.SimpleFileVisitor; | |
import java.time.LocalDate; | |
import java.time.format.DateTimeFormatter; | |
import java.time.format.DateTimeFormatterBuilder; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Locale; | |
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
import static java.nio.charset.StandardCharsets.UTF_8; | |
import static java.nio.file.FileVisitResult.CONTINUE; | |
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; | |
import static java.time.temporal.ChronoField.YEAR; | |
public class TrafficVolume { | |
// static final String OUTPUT_DIR = "output_" + strftime("%Y%m%d-%H%M%S"); | |
static final String OUTPUT_DIR = "output-java.d"; | |
// To fix non-matching weekdays | |
static final String[] WEEKDAY_MAP = { "Mo", "Tu", "We", "Th", "Fr", "Sa", "Su" }; | |
// Example row: | |
// We 02:55:45 M14 N60311H IVL 0016 OCC 0 LQ 0 BQ 0 EB0 LIT 0000 | |
// primaryKey = (yearMonthDay, weekDay, time, sensorId, interval) | |
// wd, time, sensor, int(ivl), int(occ), int(lq), int(bq), bool(eb), int(lit, 2) | |
static final String p_wd = "(Mo|Tu|We|Th|Fr|Sa|Su)"; | |
static final String p_time = "([012][0-9]:[0-5][0-9]:[0-5][0-9])"; | |
static final String p_sensor = "M14 (N[0-9]{5}[A-Z])"; | |
static final String p_ivl = "IVL ([0-9]{4})"; | |
static final String p_occ = "OCC *([0-9]+)"; | |
static final String p_lq = "LQ *(-?[0-9]+)"; | |
static final String p_bq = "BQ *([0-9]+)"; | |
static final String p_eb = "EB([01])"; | |
static final String p_lit = "LIT ([01]{4})"; | |
static final Pattern LINE_PATTERN = Pattern.compile( | |
String.format("%s %s %s %s %s %s %s %s %s", | |
p_wd, p_time, p_sensor, p_ivl, p_occ, p_lq, p_bq, p_eb, p_lit)); | |
static final DateTimeFormatter DATE_FORMATTER = new DateTimeFormatterBuilder() | |
.parseCaseInsensitive().appendValueReduced(YEAR, 2, 2, 1950) | |
.appendPattern("MMMdd").toFormatter(); | |
public static void main(String[] args) throws Exception { | |
if (args.length < 1) { | |
System.out.format("Error: No files or folders given. At least one required.\n"); | |
return; | |
} | |
// 1. enumerate files to be processed | |
List<Path> dataFiles = new ArrayList<Path>(); | |
for (String arg : args) { | |
Path path = FileSystems.getDefault().getPath(arg); | |
if (Files.isRegularFile(path)) { | |
dataFiles.add(path); | |
} else if (Files.isDirectory(path)) { | |
// // alt 1a – shallow list dir | |
// Files.list(path.toRealPath()) | |
// .filter(p -> Files.isRegularFile(p) && p.toString().endsWith(".dat")) | |
// .forEach(p -> dataFiles.add(p)); | |
// alt 1b – recursive walk tree | |
Files.walk(path.toRealPath()) | |
.filter(p -> Files.isRegularFile(p) && p.toString().endsWith(".dat")) | |
.forEach(p -> dataFiles.add(p)); | |
} else { | |
System.out.format("Warning: \"%s\" is not a file or folder. Skipping...\n", path); | |
} | |
} | |
// create output directory | |
Path outDir = Paths.get(System.getProperty("user.dir"), OUTPUT_DIR); | |
if (Files.exists(outDir)) { | |
removeTree(outDir); | |
} | |
Files.createDirectory(outDir); | |
// process the files | |
int count = 0; | |
for (Path f : dataFiles) { | |
processFile(f); | |
} | |
System.out.format("\nDone. Results written to %s\n", OUTPUT_DIR); | |
} | |
static void processFile(Path inFile) throws IOException { | |
String filename = inFile.getFileName().toString(); | |
if (!filename.endsWith(".dat")) { | |
System.out.format("Warning: Skipping non .dat file \"%s\"\n", filename); | |
return; | |
} | |
int matchCount = 0, totalLines = 0; | |
Path outFile = Paths.get(OUTPUT_DIR, filename + ".csv"); | |
try ( | |
BufferedReader br = Files.newBufferedReader(inFile); | |
BufferedWriter bw = Files.newBufferedWriter(outFile, UTF_8); | |
) { | |
LocalDate date = LocalDate.parse(filename.split("\\.")[0], DATE_FORMATTER); | |
// String weekday = date.format("E").substring(0, 2); | |
String weekday = WEEKDAY_MAP[date.getDayOfWeek().ordinal()]; | |
String ymd = date.format(ISO_LOCAL_DATE); | |
Matcher matched; | |
String line; | |
while ((line = br.readLine()) != null) { | |
totalLines++; | |
matched = LINE_PATTERN.matcher(line); | |
if (matched.lookingAt()) { // && line.substring(0, 2).equals(weekday) | |
matchCount++; | |
bw.write(String.format("%s,%s", ymd, joinGroups(matched, ","))); | |
bw.newLine(); | |
} | |
} | |
} | |
float percent = 100f * matchCount / totalLines; | |
System.out.format("%s: %d/%d (%.1f%%) lines matched\n", filename, matchCount, totalLines, percent); | |
} | |
static String joinGroups(Matcher matched, String sep) { | |
StringBuilder joined = new StringBuilder(); | |
int i; | |
for (i = 1; i < matched.groupCount(); i++) { | |
joined.append(matched.group(i) + sep); | |
} | |
joined.append(matched.group(i)); | |
return joined.toString(); | |
} | |
static void removeTree(Path directory) { | |
try { | |
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() { | |
@Override | |
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { | |
Files.delete(file); | |
return CONTINUE; | |
} | |
@Override | |
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { | |
Files.delete(dir); | |
return CONTINUE; | |
} | |
}); | |
} catch (IOException e) { | |
// silent catch | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment