|
"""Latlonf: high-level formatter helper for csv files. |
|
|
|
Main API functions are: |
|
|
|
format_any_coordinates(<string describing coordinates>, <formatting template>) -> str |
|
run(<csv file>, <file desc>, <latitude column index>, <longitude col idx>, <formatting template>) -> list[str] |
|
|
|
The coordinates formatting template uses the following palceholders: |
|
|
|
%D Degrees (integer, positive) |
|
%M Minutes (integer) |
|
%S Seconds (float) |
|
%B Degrees (integer, signed) |
|
%d Degrees (float, positive) |
|
%b Degrees (float, signed) |
|
%m Minutes (float) |
|
%s Sign marker (a dash or empty string) |
|
%w Direction (S, N, W or E) |
|
|
|
For instance: |
|
>>> format_coordinates(-77, '%B') == format_coordinates(-77, '%s%D') |
|
True |
|
|
|
The run method is converting the longitude and lattitude values found |
|
in columns of given indexes, writing the same CSV into the given output file. |
|
|
|
For instance: |
|
|
|
run('mycsvfile.csv', sys.stdout, 0, 1, "%D %M %S %w") |
|
|
|
""" |
|
import re |
|
import sys |
|
import csv |
|
import argparse |
|
from typing import Union |
|
from itertools import islice |
|
import pandas |
|
try: |
|
b # avoid jit (it's the best according to benchmarks) |
|
from numba import jit |
|
except: |
|
print('Numba is not available. No JIT will be used.') |
|
def jit(**kwargs): |
|
def func(inner): |
|
return inner |
|
return func |
|
jit.numba = None # to test if numba is here |
|
|
|
PANDAS_CHUNK_SIZE = 1500000 |
|
|
|
|
|
@jit(nopython=True, cache=True) |
|
def isfloat(string:str) -> bool: |
|
if string.count('.') == 0: |
|
return string.isdigit() |
|
elif string.count('.') == 1: |
|
a, b = string.split('.') |
|
if a.startswith('-'): |
|
a = a[1:] |
|
return a.isdigit() and b.isdigit() |
|
return False |
|
|
|
## Converters |
|
def DD_from_wildDD(dd:str) -> float: |
|
"""Return canonical representation of given decimal coordinates. |
|
|
|
>>> DD_from_wildDD("-180°") |
|
-180.0 |
|
>>> DD_from_wildDD("180") |
|
180.0 |
|
|
|
""" |
|
dd = str(dd).replace('°', ' ').strip() |
|
if '.' not in dd: |
|
dd += '.0' |
|
return float(dd) |
|
|
|
|
|
@jit(nopython=True, cache=True) |
|
def DD_from_DDM(ddm:str) -> float: |
|
"""Return decimal representation of DDM (degree decimal minutes) |
|
|
|
>>> DD_from_DDM("45° 17,896' N") |
|
45.29826666666666 |
|
>>> DD_from_DDM("-45° 17,896' N") |
|
-45.29826666666666 |
|
>>> DD_from_DDM("-45° 17,896' S") |
|
45.29826666666666 |
|
|
|
""" |
|
ddm = re.sub(r"[°']", ' ', ddm).replace(',', '.') |
|
sign = -1 if re.search('[swSW]', ddm) else 1 |
|
# numbers = [*filter(len, re.split(r'\D+', ddm, maxsplit=4))] |
|
numbers = [s for s in map(str.strip, ddm.split()) if s and isfloat(s)] |
|
assert len(numbers) in range(2, 4) |
|
|
|
degree = int(numbers[0]) |
|
minute_decimal = float(numbers[1]) |
|
sign *= -1 if degree < 0 else 1 |
|
|
|
return sign * (abs(degree) + minute_decimal / 60) |
|
|
|
@jit(nopython=True, cache=True) |
|
def DD_from_DMS(dms:str) -> float: |
|
"""Return decimal representation of DMS (degree minutes seconds)""" |
|
dms = re.sub(r"[°'\"]", ' ', dms).replace(',', '.') |
|
sign = 1 |
|
assert len(dms.split()) in range(3, 5), dms |
|
if len(dms.split()) == 3: |
|
D, M, S = dms.split() |
|
elif len(dms.split()) == 4: |
|
D, M, S, W = dms.split() |
|
if W in 'SWsw': # swap direction |
|
sign = -1 |
|
return sign * (int(D) + float(M)/60 + float(S)/3600) |
|
|
|
def format_coordinates(dd:Union[str, float], fmt:str, rounding:int=4, longitude:bool=False) -> str: |
|
"""Format given DD coordinates following the given format. |
|
|
|
>>> format_coordinates(-19.9128, '%d') |
|
'19.9128' |
|
>>> format_coordinates(-19, '%D') |
|
'19' |
|
>>> format_coordinates(-19.9128, '%b') |
|
'-19.9128' |
|
>>> format_coordinates(-77.508333, '%D %M %S %w') |
|
'77 30 29.9988 S' |
|
>>> format_coordinates(-77.508333, '%B %M %S') |
|
'-77 30 29.9988' |
|
>>> format_coordinates(164.754167, "%B° %m' %w", longitude=True) |
|
"164° 45.25' E" |
|
""" |
|
def DMS_from_DD(dd:Union[str, float]) -> (int, int, float, bool): |
|
dd = float(dd) |
|
negative = dd < 0 |
|
dd = abs(dd) |
|
minutes, seconds = divmod(dd*3600, 60) |
|
degrees, minutes = divmod(minutes, 60) |
|
return int(degrees), int(minutes), seconds, negative |
|
def DDM_from_DD(dd:Union[str, float]): |
|
degrees = int(str(dd).split('.')[0]) if '.' in str(dd) else int(dd) |
|
decimals = float('0.' + str(dd).split('.')[1]) if '.' in str(dd) else 0. |
|
decimals *= 60 |
|
return abs(degrees), decimals, degrees < 0 |
|
|
|
V = { 'b': dd, 'd': str(dd).lstrip('-'), '%': '%'} |
|
if 'D' in fmt or 'B' in fmt or 'M' in fmt or 'S' in fmt: |
|
V['D'], V['M'], V['S'], negative = DMS_from_DD(dd) |
|
else: |
|
negative = None |
|
if 'm' in fmt: |
|
Dalt, V['m'], negative_alt = DDM_from_DD(dd) |
|
assert Dalt == V['D'] |
|
assert negative_alt == negative |
|
if 'B' in fmt: |
|
V['B'] = V['D'] * (-1 if negative else 1) |
|
if 'w' in fmt: |
|
V['w'] = ('W' if negative else 'E') if longitude else ('S' if negative else 'N') |
|
if 's' in fmt: |
|
V['s'] = '-' if negative else '' |
|
if rounding: |
|
if 'S' in V: |
|
V['S'] = round(V['S'], rounding) |
|
if 'm' in V: |
|
V['m'] = round(V['m'], rounding) |
|
if str(V['S']).endswith('.0'): V['S'] = int(V['S']) |
|
regex = '%' '([' + ''.join(V) + r'])' |
|
dd = re.sub(regex, r'{\1}', fmt) |
|
return dd.format(**V) |
|
|
|
|
|
# @jit(nopython=True, cache=True) |
|
def sniff_format(string:str) -> ('DD' or 'DDM' or 'DMS' or None, bool or None): |
|
string = string.replace('°', ' ').replace("\"", ' ').replace("'", ' ').strip() |
|
if isfloat(string.replace(',', '.')): |
|
return 'DD', None |
|
if len(string.split()) == 2: |
|
return 'DDM', None |
|
if len(string.split()) == 3: |
|
D, K, L = string.split() |
|
if '.' in K.replace(',', '.') or L in 'NSEW': # probably a decimal number |
|
return 'DDM', L in 'EW' |
|
return 'DMS', False |
|
if len(string.split()) == 4: |
|
D, K, L, W = string.split() |
|
return 'DMS', W in 'EW' |
|
raise ValueError(f"Can't find the format of string '{string}'") |
|
|
|
|
|
def format_is_ok(fmt:str) -> bool: |
|
"True if given format is a valid one" |
|
if jit.numba is not None: |
|
return True # there is some bug with numba with the following try except |
|
try: |
|
format_any_coordinates("-45 17,896 E", fmt) |
|
except Exception as e: |
|
return False |
|
return True |
|
|
|
|
|
# @jit(cache=True) |
|
def format_any_coordinates(string:Union[str, float], fmt:str, is_longitude:bool=None) -> str: |
|
""" |
|
>>> format_any_coordinates("-45° 17,896' N", "%b") |
|
'-45.29826666666666' |
|
>>> format_any_coordinates("-45°17,896'", "%d %w") |
|
'45.29826666666666 S' |
|
>>> format_any_coordinates("164° 45' 15.0012\\" W", "%b") |
|
'-164.754167' |
|
>>> format_any_coordinates('-180°', "%b") |
|
'-180.0' |
|
>>> format_any_coordinates('-0.0°', "%b") |
|
'-0.0' |
|
>>> format_any_coordinates('0.°', "%b") |
|
'0.0' |
|
>>> format_any_coordinates(0.1, "%D° %M' %S\\" %w") |
|
'0° 6\\' 0" N' |
|
>>> format_any_coordinates(-77, '%B') |
|
'-77' |
|
>>> format_any_coordinates(0.1, "%D° %M' %S\\" %w", is_longitude=True) |
|
'0° 6\\' 0" E' |
|
>>> format_any_coordinates('0° 6\\' 0" E', "%D° %M' %S\\" %w", is_longitude=True) |
|
'0° 6\\' 0" E' |
|
|
|
""" |
|
string = str(string) |
|
coord_format, sniffed_longitude = sniff_format(string) |
|
|
|
# decide if coordinates are longitude or latitude |
|
if sniffed_longitude is None: |
|
pass # is_longitude = is_longitude |
|
elif is_longitude is None: |
|
is_longitude = sniffed_longitude |
|
elif sniffed_longitude is (not is_longitude): # they disagree ! |
|
t = 'longitude' if is_longitude else 'latitude' |
|
raise ValueError("Coordinate `{}` was told as {}, but its content says otherwise.".format(string, t)) |
|
else: # they agree |
|
assert sniffed_longitude is is_longitude |
|
|
|
# Convert string to DD |
|
if coord_format == 'DDM': |
|
string = DD_from_DDM(string) |
|
elif coord_format == 'DMS': |
|
string = DD_from_DMS(string) |
|
else: |
|
# assert coord_format == 'DD', coord_format |
|
string = str(DD_from_wildDD(string)) |
|
return format_coordinates(string, fmt, longitude=is_longitude) |
|
|
|
|
|
# @jit(cache=True, nopython=True) |
|
def convert(columns:list[str], latcol:int, loncol:int, latlon_format:str) -> list[str]: |
|
columns[latcol] = format_any_coordinates(columns[latcol], latlon_format, is_longitude=False) |
|
columns[loncol] = format_any_coordinates(columns[loncol], latlon_format, is_longitude=True) |
|
return columns |
|
|
|
|
|
def convert_carefully(columns:list[str], latcol:int, loncol:int, latlon_format:str) -> list[str]: |
|
"Same as convert, but looking before leaping." |
|
if latcol >= len(columns): |
|
raise ValueError(f"Column encoding latitude would be {latcol+1}, but only {len(columns)} were found.") |
|
if loncol >= len(columns): |
|
raise ValueError(f"Column encoding longitude would be {loncol+1}, but only {len(columns)} were found.") |
|
if not format_is_ok(latlon_format): |
|
raise ValueError(f"Given coordinate output format `{latlon_format}` is not a valid format.") |
|
return convert(columns, latcol, loncol, latlon_format) |
|
|
|
def run(infile:str, outfile:open, latcol:int, loncol:int, latlon_format:str, use_pandas:bool=False, use_chunked_pandas:bool=False, **csv_kwargs:dict) -> list[str]: |
|
|
|
# sniff the CSV dialect |
|
with open(infile) as ifd: |
|
sample = '\n'.join(islice(ifd, 0, 10)) # take the first 10 lines as samples |
|
sniffer = csv.Sniffer() |
|
dialect = sniffer.sniff(sample, ',;\t') |
|
if dialect.escapechar is None: |
|
dialect.escapechar = '\\' |
|
has_header = sniffer.has_header(sample) |
|
|
|
# read, convert and write |
|
if use_chunked_pandas: |
|
print('Using pandas to work on data. Loading chunks of input data…', file=sys.stderr) |
|
ifd_chunks = pandas.read_csv(infile, sep=dialect.delimiter, chunksize=PANDAS_CHUNK_SIZE) |
|
print(end='') |
|
for idx, chunk in enumerate(ifd_chunks): |
|
print(f"\r{idx:04d} conv lat…", end='', flush=True, file=sys.stderr) |
|
chunk.iloc[:, latcol] = chunk.iloc[:, latcol].apply(lambda x: format_any_coordinates(x, latlon_format, is_longitude=False)) |
|
print(f"\r{idx:04d} conv lon…", end='', flush=True, file=sys.stderr) |
|
chunk.iloc[:, loncol] = chunk.iloc[:, loncol].apply(lambda x: format_any_coordinates(x, latlon_format, is_longitude=True)) |
|
print(f"\r{idx:04d} write… ", end='', flush=True, file=sys.stderr) |
|
chunk.to_csv(outfile, 'a') |
|
print(f"\r{idx+1:04d} load… ", end='', flush=True, file=sys.stderr) |
|
print('Done.') |
|
elif use_pandas: |
|
print('Using pandas to work on data. Loading chunks of input data…', file=sys.stderr) |
|
ifd_chunks = pandas.read_csv(infile, sep=dialect.delimiter) |
|
print('Convert the latitudes…', file=sys.stderr) |
|
ifd.iloc[:, latcol] = ifd.iloc[:, latcol].apply(lambda x: format_any_coordinates(x, latlon_format, is_longitude=False)) |
|
print('Convert the longitudes…', file=sys.stderr) |
|
ifd.iloc[:, loncol] = ifd.iloc[:, loncol].apply(lambda x: format_any_coordinates(x, latlon_format, is_longitude=True)) |
|
print('Writing csv file to outfile…', file=sys.stderr) |
|
ifd.to_csv(outfile) |
|
else: |
|
with open(infile) as ifd: |
|
reader = csv.reader(ifd, dialect, **csv_kwargs) |
|
writer = csv.writer(outfile, dialect) |
|
if has_header: # rewrite the header |
|
outfile.write(next(ifd)) |
|
# convert the first line carefully, to catch errors. |
|
writer.writerow(convert_carefully(next(reader), latcol, loncol, latlon_format)) |
|
# then do it fast |
|
for line in reader: |
|
writer.writerow(convert(line, latcol, loncol, latlon_format)) |
|
|
|
|
|
def parse_cli() -> argparse.Namespace: |
|
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter) |
|
parser.add_argument('csv', type=str, help='existing CSV file to convert') |
|
parser.add_argument('latcol', type=int, help='column in the CSV encoding the latitude') |
|
parser.add_argument('loncol', type=int, help='column in the CSV encoding the longitude') |
|
parser.add_argument('--outfile', type=str, default='-', help='name of the file to be written') |
|
parser.add_argument('--input-format', '-i', type=str, default=None, help='format of the latitude and longitude in the input file') |
|
parser.add_argument('--output-format', '-o', type=str, default=None, help='format of the latitude and longitude to use in the output file') |
|
parser.add_argument('--csv-delimiter', type=str, default=None, help='csv delimiter found in the input file') |
|
parser.add_argument('--latlon-format', '-f', type=str, default="%D° %M' %S\" %w", help='output format for latitude and longitude') |
|
parser.add_argument('--use-pandas', '-p', action='store_true', help='Use pandas to load and work on the data') |
|
parser.add_argument('--use-chunked-pandas', '-c', action='store_true', help='Use pandas chunks to load and work on the data') |
|
return parser.parse_args() |
|
|
|
|
|
if __name__ == '__main__': |
|
args = parse_cli() |
|
|
|
# get user-specified dialect |
|
csv_kwargs = {} |
|
if args.csv_delimiter: |
|
csv_kwargs['delimiter'] = args.csv_delimiter |
|
|
|
# run the program |
|
if args.outfile in ' -': # print to stdin |
|
run(args.csv, sys.stdout, args.latcol, args.loncol, args.latlon_format, args.use_pandas, args.use_chunked_pandas, **csv_kwargs) |
|
else: |
|
with open(args.outfile, 'w') as ofd: |
|
run(args.csv, ofd, args.latcol, args.loncol, args.latlon_format, args.use_pandas, args.use_chunked_pandas, **csv_kwargs) |