Created
January 10, 2024 16:56
-
-
Save simolus3/0ae5a63d6bf499c53aeb7b75701d8f5e to your computer and use it in GitHub Desktop.
Adapted 1brc challenge using mmap from https://github.com/osaxma/1brc_dart
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:ffi'; | |
import 'dart:io'; | |
import 'dart:isolate'; | |
import 'dart:math'; | |
import 'dart:typed_data'; | |
import 'package:ffi/ffi.dart'; | |
@Native<Pointer<Void> Function(Pointer<Void>, Size, Int, Int, Int, Size)>() | |
external Pointer<Void> mmap( | |
Pointer<Void> addr, int length, int prot, int flags, int fd, int offset); | |
@Native<Int Function(Pointer<Utf8>, Int)>() | |
external int open(Pointer<Utf8> path, int mode); | |
const rows = 1 * 1000 * 1000 * 1000; | |
const maxBytesPerRow = 107; // see README | |
const semiColonCodeUnit = 59; | |
const newLineCodeUnit = 10; | |
class Data { | |
final String name; | |
double sum = 0; | |
double count = 0; | |
double minimum = double.infinity; | |
double maximum = double.negativeInfinity; | |
double get average => sum / count; | |
Data(this.name); | |
void merge(Data data) { | |
assert(data.name == name); | |
sum = data.sum + sum; | |
maximum = max(data.maximum, maximum); | |
minimum = min(data.minimum, minimum); | |
count = data.count + count; | |
} | |
@override | |
String toString() { | |
return '$name=$minimum/$average/$maximum'; | |
} | |
static String dataToString(Iterable<Data> data) { | |
final buff = StringBuffer(); | |
data.forEach((d) => buff.writeln(d.toString())); | |
return buff.toString(); | |
} | |
} | |
void main(List<String> args) async { | |
const isolates = int.fromEnvironment('isolates', defaultValue: 24); | |
final String filePath = args.single; | |
final sw = Stopwatch()..start(); | |
final totalBytes = File(filePath).lengthSync(); | |
final bytesPerIsolate = totalBytes ~/ isolates; | |
final remainder = totalBytes % isolates; | |
final chunks = List.generate(isolates, (i) { | |
final start = i * bytesPerIsolate; | |
final isLast = i == isolates - 1; | |
final end = (start + bytesPerIsolate) - 1 + (isLast ? remainder : 0); | |
return (start, end); | |
}); | |
final fd = open(filePath.toNativeUtf8(), 0); | |
if (fd < 0) { | |
throw 'open'; | |
} | |
final ptr = | |
mmap(nullptr, totalBytes, 1 /*PROT_READ*/, 2 /* MAP_PRIVATE */, fd, 0); | |
if (ptr.address == 0) { | |
throw 'mmap'; | |
} | |
final futures = <Future<Map<String, Data>>>[]; | |
for (var c in chunks) { | |
final address = ptr.address; | |
futures.add(Isolate.run( | |
() { | |
return computeChunk(c.$1, c.$2, totalBytes - 1, address); | |
}, | |
)); | |
} | |
final res = await Future.wait(futures).then((data) => mergeData(data)); | |
final buff = StringBuffer(); | |
res.values.forEach((d) => buff.writeln(d.toString())); | |
sw.stop(); | |
print(buff.toString()); | |
print('took ${sw.elapsed}'); | |
} | |
Future<Map<String, Data>> computeChunk( | |
int startByte, int endByte, int fileLength, int baseAddress) async { | |
final endPadding = endByte != fileLength ? maxBytesPerRow : 0; | |
final length = (endByte - startByte); | |
final bytes = Pointer<Uint8>.fromAddress(baseAddress) | |
.elementAt(startByte) | |
.asTypedList(endByte - startByte); | |
var fromIndex = 0; | |
var toIndex = length; | |
// effective start | |
if (startByte != 0) { | |
fromIndex = bytes.indexOf(newLineCodeUnit) + 1; | |
} | |
// effective end | |
if (endPadding != 0) { | |
toIndex = bytes.indexOf(newLineCodeUnit, length); | |
} | |
final cities = <String, Data>{}; | |
final city = BytesBuilder(copy: false); | |
var start = fromIndex; | |
var end = fromIndex; | |
int b = 0; | |
for (fromIndex; fromIndex < toIndex; fromIndex++) { | |
b = bytes[fromIndex]; | |
if (b == semiColonCodeUnit) { | |
city.add(Uint8List.sublistView(bytes, start, end)); | |
end++; | |
start = end; | |
continue; | |
} else if (b == newLineCodeUnit) { | |
final name = String.fromCharCodes(city.takeBytes()); | |
final temp = double.parse( | |
String.fromCharCodes(Uint8List.sublistView(bytes, start, end))); | |
final data = cities.putIfAbsent(name, () => Data(name)); | |
data | |
..sum = data.sum + temp | |
..maximum = max(data.maximum, temp) | |
..minimum = min(data.minimum, temp) | |
..count = data.count + 1; | |
end++; | |
start = end; | |
continue; | |
} else { | |
end += 1; | |
} | |
} | |
return cities; | |
} | |
Map<String, Data> mergeData(List<Map<String, Data>> data) { | |
final merged = <String, Data>{}; | |
for (var d in data) { | |
for (var entry in d.entries) { | |
final d = merged.putIfAbsent(entry.key, () => Data(entry.key)); | |
d.merge(entry.value); | |
} | |
} | |
return merged; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment