Last active
May 10, 2018 22:03
-
-
Save MatthieuLemoine/0c72d866b9e3ec3b81b6d3216d2c6803 to your computer and use it in GitHub Desktop.
Parse RATP stops data & dump in an Algolia index
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { promises: fs, createReadStream } = require('fs'); | |
const { Transform } = require('stream'); | |
const path = require('path'); | |
const uuid = require('uuid/v4'); | |
const ProgressBar = require('ascii-progress'); | |
const algolia = require('algoliasearch'); | |
const { ALGOLIA_APP_ID, ALGOLIA_API_KEY } = process.env; | |
const client = algolia(ALGOLIA_APP_ID, ALGOLIA_API_KEY); | |
const INDEX = 'ratp_stops'; | |
const DATA_DIRECTORY = path.join(__dirname, 'RATP_GTFS_LINES'); | |
const STOPS_FILENAME = 'stops.txt'; | |
const ROUTES_FILENAME = 'routes.txt'; | |
const TRIPS_FILENAME = 'trips.txt'; | |
const STOPS_TIMES_FILENAME = 'stop_times.txt'; | |
// There are 12049 RATP stops but the free Algolia plan only allows 10000 records | |
// So we need to exclude some lines | |
const shouldBeExclude = name => | |
name.includes('N') || | |
name.includes('V') || | |
[ | |
'Amibus', | |
'Choisyb', | |
'Monast', | |
'Montbus', | |
'Montmar', | |
'Orlybus', | |
'Pc1', | |
'Pc3', | |
'Roissyb', | |
'Subb', | |
'Tillbus', | |
'Tim', | |
'Tub', | |
'Tuc', | |
'Tuvim', | |
'Tvm', | |
].includes(name); | |
const writeInIndex = lines => | |
new Promise((resolve, reject) => { | |
const stops = lines.reduce( | |
(acc, line) => | |
shouldBeExclude(line.name) | |
? acc | |
: acc.concat( | |
line.stops.map(stop => ({ | |
action: 'addObject', | |
indexName: INDEX, | |
body: { | |
...stop, | |
line: line.name, | |
type: line.type, | |
// To avoid duplicates on ratp lines update | |
objectID: stop.providerId, | |
}, | |
})), | |
), | |
[], | |
); | |
console.log(`${stops.length} entities will be written in index`); | |
client.batch(stops, (err, content) => { | |
if (err) return reject(err); | |
return resolve(content); | |
}); | |
}); | |
const join = parentDir => dirname => path.join(parentDir, dirname); | |
const getSplitStream = () => | |
new Transform({ | |
objectMode: true, | |
transform(chunk, encoding, cb) { | |
this.last = (this.last || '') + chunk; | |
const lines = this.last.split(/\r?\n/); | |
this._last = lines.pop(); | |
lines.forEach(line => this.push(line)); | |
cb(); | |
}, | |
}); | |
const getLines = async () => { | |
const linesDir = (await fs.readdir(DATA_DIRECTORY, { encoding: 'utf8' })).map( | |
join(DATA_DIRECTORY), | |
); | |
const bar = new ProgressBar({ | |
total: linesDir.length, | |
schema: ' |:bar| :current/:total :percent :elapseds :etas :name', | |
}); | |
return Promise.all( | |
linesDir.map(async lineDir => { | |
// Get routes | |
const routes = (await fs.readFile( | |
path.join(lineDir, ROUTES_FILENAME), | |
'utf8', | |
)) | |
.split(/\r?\n/) | |
.slice(1, -1) | |
.reduce((map, item) => { | |
const [ | |
route_id, | |
agency_id, | |
route_short_name, | |
route_long_name, | |
route_desc, | |
route_type, | |
route_url, | |
route_color, | |
route_text_color, | |
] = item.split(','); | |
return { | |
...map, | |
[route_id]: { | |
routeId: route_id, | |
way: route_long_name.includes('Aller') ? 'outward' : 'return', | |
}, | |
}; | |
}, {}); | |
// Map tripId to routeId | |
const mapTripsRoutes = {}; | |
const streamTrips = createReadStream(path.join(lineDir, TRIPS_FILENAME), { | |
encoding: 'utf8', | |
}).pipe(getSplitStream()); | |
for await (const line of streamTrips) { | |
const [ | |
route_id, | |
service_id, | |
trip_id, | |
trip_headsign, | |
trip_short_name, | |
direction_id, | |
shape_id, | |
] = line.split(','); | |
mapTripsRoutes[trip_id] = route_id; | |
} | |
// Map stops to routeId | |
const mapStationsRoutes = {}; | |
const streamTimes = createReadStream( | |
path.join(lineDir, STOPS_TIMES_FILENAME), | |
{ | |
encoding: 'utf8', | |
}, | |
).pipe(getSplitStream()); | |
for await (const line of streamTimes) { | |
const [ | |
trip_id, | |
arrival_time, | |
departure_time, | |
stop_id, | |
stop_sequence, | |
stop_headsign, | |
shape_dist_traveled, | |
] = line.split(','); | |
mapStationsRoutes[stop_id] = mapTripsRoutes[trip_id]; | |
} | |
const stream = createReadStream(path.join(lineDir, STOPS_FILENAME), { | |
encoding: 'utf8', | |
}).pipe(getSplitStream()); | |
const stops = []; | |
const mapStopIndex = new Map(); | |
for await (const line of stream) { | |
const [ | |
stop_id, | |
stop_code, | |
stop_name, | |
stop_desc, | |
stop_lat, | |
stop_lon, | |
location_type, | |
parent_station, | |
] = line.split(','); | |
// First line, drop headers | |
if (stop_id !== 'stop_id') { | |
const routeId = mapStationsRoutes[stop_id]; | |
if (mapStopIndex.has(stop_name)) { | |
const index = mapStopIndex.get(stop_name); | |
stops[index].locations.push({ | |
description: stop_desc, | |
latitude: stop_lat, | |
longitude: stop_lon, | |
way: routeId ? routes[routeId].way : null, | |
// ?? | |
locationType: location_type, | |
}); | |
} else { | |
stops.push({ | |
id: uuid(), | |
name: stop_name, | |
locations: [ | |
{ | |
description: stop_desc, | |
latitude: stop_lat, | |
longitude: stop_lon, | |
way: routeId ? routes[routeId].way : null, | |
// ?? | |
locationType: location_type, | |
}, | |
], | |
// ?? | |
parentStation: parent_station, | |
providerId: stop_id, | |
// ?? | |
providerCode: stop_code, | |
}); | |
mapStopIndex.set(stop_name, stops.length - 1); | |
} | |
} | |
} | |
const [, , type, name] = path.basename(lineDir).split('_'); | |
bar.tick({ | |
name: `${type} ${name}`, | |
}); | |
// Line | |
return { | |
id: uuid(), | |
name, | |
type, | |
stops, | |
providerId: `${type}_${name}`, | |
}; | |
}), | |
); | |
}; | |
getLines() | |
// Dump lines in case of indexing failure | |
.then(async lines => { | |
await fs.writeFile( | |
path.join(__dirname, 'lines.json'), | |
JSON.stringify({ lines }, null, 2), | |
'utf8', | |
); | |
return lines; | |
}) | |
// // Uncomment to start from local dump | |
// fs | |
// .readFile(path.join(__dirname, 'lines.json'), 'utf8') | |
// .then(data => JSON.parse(data).lines) | |
.then(writeInIndex) | |
.then(result => { | |
console.log('Done'); | |
console.log(`${result.objectIDs.length} entities created in index`); | |
}) | |
.catch(err => | |
fs.writeFile( | |
path.join(__dirname, 'err.json'), | |
JSON.stringify(err, 2, null), | |
'utf8', | |
), | |
); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment