Last active
October 14, 2024 14:06
-
-
Save Akanoa/238abd9d0509fe4f67cd42de696cff5c to your computer and use it in GitHub Desktop.
Clodo Redis : le redis dans une map
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "clodo_redis" | |
version = "0.1.0" | |
edition = "2021" | |
[dependencies] | |
eyre = "0.6.12" | |
log = "0.4.22" | |
env_logger = "0.11.5" | |
tokio = { version = "1.40.0", default-features = false, features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "sync"] } | |
foundationdb = { version = "0.9.1", default-features = false, features = ["fdb-7_3", "embedded-fdb-include"] } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM foundationdb/foundationdb:7.3.43 as fdb-release | |
FROM foundationdb/foundationdb:7.3.43-debug as fdb | |
RUN yum update -y | |
RUN yum install gcc clang -y | |
RUN curl https://sh.rustup.rs -sSf | sh -s -- -y | |
WORKDIR /opt/clodo_redis | |
COPY src/main.rs src/main.rs | |
COPY Cargo.toml . | |
RUN ${HOME}/.cargo/bin/cargo build --release | |
FROM gcc:14.2.0 as builder | |
RUN cat <<EOF > /script.sh | |
#!/usr/bin/bash | |
set -m | |
set -x | |
/var/fdb/scripts/fdb_single.bash& | |
echo Waiting 20 secondes the FDB initialisation... | |
sleep 20 | |
echo ...Done | |
/usr/bin/clodo_redis | |
EOF | |
COPY start.sh /var/fdb/scripts/fdb_single.bash | |
RUN mkdir -p /var/fdb/logs | |
RUN chmod +x /script.sh /var/fdb/scripts/fdb_single.bash | |
FROM scratch | |
VOLUME /var/fdb/data | |
ENV FDB_PORT 4500 | |
ENV FDB_CLUSTER_FILE /var/fdb/fdb.cluster | |
ENV FDB_NETWORKING_MODE host | |
ENV FDB_COORDINATOR "" | |
ENV FDB_COORDINATOR_PORT 4500 | |
ENV FDB_CLUSTER_FILE_CONTENTS "" | |
ENV FDB_PROCESS_CLASS unset | |
ENV RUST_LOG=info | |
ENV PATH /usr/bin/ | |
ENV LD_LIBRARY_PATH /lib/x86_64-linux-gnu:/lib64:/usr/lib | |
COPY --from=builder /script.sh /script.sh | |
COPY --from=fdb /usr/bin/bash /usr/bin/bash | |
COPY --from=fdb /usr/bin/sh /bin/sh | |
COPY --from=fdb /usr/bin/sleep /usr/bin/sleep | |
COPY --from=fdb /opt/clodo_redis/target/release/clodo_redis /usr/bin/clodo_redis | |
COPY --from=fdb /lib64/librt.so.1 /lib64/librt.so.1 | |
COPY --from=fdb /lib64/libdl.so.2 /lib64/libdl.so.2 | |
COPY --from=fdb /lib64/liblzma.so.5 /lib64/liblzma.so.5 | |
COPY --from=fdb /lib64/libz.so.1 /lib64/libz.so.1 | |
COPY --from=fdb /lib64/libm.so.6 /lib64/libm.so.6 | |
COPY --from=fdb /lib64/libpthread.so.0 /lib64/libpthread.so.0 | |
COPY --from=fdb /lib64/libc.so.6 /lib64/libc.so.6 | |
COPY --from=fdb /lib64/libtinfo.so.6 /lib64/libtinfo.so.6 | |
COPY --from=fdb /lib64/libgcc_s.so.1 /lib64/libgcc_s.so.1 | |
COPY --from=fdb /lib64/ld-linux-x86-64.so.2 /lib64/ld-linux-x86-64.so.2 | |
COPY --from=fdb-release /usr/lib/libfdb_c.so /usr/lib/libfdb_c.so | |
COPY --from=fdb-release /usr/bin/fdbserver /usr/bin/fdbserver | |
COPY --from=fdb-release /usr/bin/fdbcli /usr/bin/fdbcli | |
COPY --from=builder /var/fdb /var/fdb | |
ENTRYPOINT ["/script.sh"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/bash | |
set -Eeuo pipefail | |
set -m | |
# | |
# fdb_single.bash | |
# | |
# This source file is part of the FoundationDB open source project | |
# | |
# Copyright 2013-2024 Apple Inc. and the FoundationDB project authors | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
function create_cluster_file() { | |
FDB_CLUSTER_FILE=${FDB_CLUSTER_FILE:-/etc/foundationdb/fdb.cluster} | |
# mkdir -p "$(dirname "$FDB_CLUSTER_FILE")" | |
if [[ -n "$FDB_CLUSTER_FILE_CONTENTS" ]]; then | |
echo "$FDB_CLUSTER_FILE_CONTENTS" > "$FDB_CLUSTER_FILE" | |
elif [[ -n $FDB_COORDINATOR ]]; then | |
echo "$FDB_COORDINATOR" | |
coordinator_ip=$(dig +short "$FDB_COORDINATOR") | |
if [[ -z "$coordinator_ip" ]]; then | |
echo "Failed to look up coordinator address for $FDB_COORDINATOR" 1>&2 | |
exit 1 | |
fi | |
coordinator_port=${FDB_COORDINATOR_PORT:-4500} | |
echo "docker:docker@$coordinator_ip:$coordinator_port" > "$FDB_CLUSTER_FILE" | |
else | |
echo "FDB_COORDINATOR environment variable not defined" 1>&2 | |
exit 1 | |
fi | |
} | |
function create_server_environment() { | |
env_file=/var/fdb/.fdbenv | |
if [[ "$FDB_NETWORKING_MODE" == "host" ]]; then | |
public_ip=127.0.0.1 | |
elif [[ "$FDB_NETWORKING_MODE" == "container" ]]; then | |
public_ip='0.0.0.0' | |
else | |
echo "Unknown FDB Networking mode \"$FDB_NETWORKING_MODE\"" 1>&2 | |
exit 1 | |
fi | |
echo "export PUBLIC_IP=$public_ip" > $env_file | |
if [[ -z $FDB_COORDINATOR && -z "$FDB_CLUSTER_FILE_CONTENTS" ]]; then | |
FDB_CLUSTER_FILE_CONTENTS="docker:docker@$public_ip:$FDB_PORT" | |
fi | |
create_cluster_file | |
} | |
function start_fdb () { | |
create_server_environment | |
source /var/fdb/.fdbenv | |
echo "Starting FDB server on $PUBLIC_IP:$FDB_PORT" | |
fdbserver --listen-address 0.0.0.0:"$FDB_PORT" \ | |
--public-address "$PUBLIC_IP:$FDB_PORT" \ | |
--datadir /var/fdb/data \ | |
--logdir /var/fdb/logs \ | |
--locality-zoneid="localhost" \ | |
--locality-machineid="localhost" \ | |
--knob_disable_posix_kernel_aio=1 \ | |
--class "$FDB_PROCESS_CLASS" & | |
fdb_pid=$(jobs -p) | |
echo "fdbserver pid is: ${fdb_pid}" | |
} | |
function configure_fdb_single () { | |
echo "Configuring new single memory FDB database" | |
fdbcli --exec 'configure new single memory' | |
sleep 3 | |
fdbcli --exec 'status' | |
} | |
start_fdb | |
sleep 5 | |
configure_fdb_single | |
fg %1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use foundationdb::api::NetworkAutoStop; | |
use foundationdb::{Database, FdbBindingError}; | |
use log::{debug, error, info}; | |
use std::fmt::{Debug, Display, Formatter}; | |
use std::sync::LazyLock; | |
use tokio::io::{AsyncReadExt, AsyncWriteExt}; | |
use tokio::net::TcpStream; | |
use tokio::signal; | |
use tokio::signal::ctrl_c; | |
struct DatabaseGuard { | |
#[allow(unused)] | |
guard: NetworkAutoStop, | |
database: Database, | |
} | |
impl DatabaseGuard { | |
async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, FdbBindingError> { | |
let result = self | |
.database | |
.run(|trx, _| async move { Ok(trx.get(key, false).await?) }) | |
.await?; | |
Ok(result.map(|value| value.to_vec())) | |
} | |
async fn set(&self, key: &[u8], value: &[u8]) -> Result<(), FdbBindingError> { | |
self.database | |
.run(|trx, _| async move { | |
trx.set(key, value); | |
Ok(()) | |
}) | |
.await | |
} | |
async fn del(&self, key: &[u8]) -> Result<bool, FdbBindingError> { | |
self.database | |
.run(|trx, _| async move { | |
if trx.get(key, false).await?.is_some() { | |
trx.clear(key); | |
return Ok(true); | |
} | |
Ok(false) | |
}) | |
.await | |
} | |
} | |
static DATABASE: LazyLock<DatabaseGuard> = LazyLock::new(|| { | |
let guard = unsafe { foundationdb::boot() }; | |
let database = Database::new(None).expect("Unable to create database"); | |
DatabaseGuard { guard, database } | |
}); | |
//*3\r\n $3\r\nSET\r\n $4\r\ntoto\r\n $4\r\ntata\r\n | |
//*2\r\n $3\r\nGET\r\n $4\r\ntoto\r\n | |
//*2\r\n $3\r\nDEL\r\n $4\r\ntoto\r\n | |
enum Request<'a> { | |
Set { key: &'a [u8], value: &'a [u8] }, | |
Get { key: &'a [u8] }, | |
Del { key: &'a [u8] }, | |
Unknown { command: &'a [u8] }, | |
} | |
impl Debug for Request<'_> { | |
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | |
write!(f, "{self}") | |
} | |
} | |
impl Display for Request<'_> { | |
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | |
match self { | |
Request::Set { key, value } => write!( | |
f, | |
"Set {{ key: {}, value: {} }}", | |
String::from_utf8_lossy(key), | |
String::from_utf8_lossy(value) | |
), | |
Request::Get { key } => write!(f, "Get {{ key: {} }}", String::from_utf8_lossy(key)), | |
Request::Del { key } => write!(f, "Del {{ key: {} }}", String::from_utf8_lossy(key)), | |
Request::Unknown { command } => { | |
write!(f, "Unknown command : {}", String::from_utf8_lossy(command)) | |
} | |
} | |
} | |
} | |
struct Splitter<'a> { | |
data: &'a [u8], | |
cursor: usize, | |
prev: usize, | |
} | |
impl<'a> Splitter<'a> { | |
fn new(data: &'a [u8]) -> Self { | |
Self { | |
data, | |
cursor: 0, | |
prev: 0, | |
} | |
} | |
} | |
impl<'a> Iterator for Splitter<'a> { | |
type Item = &'a [u8]; | |
fn next(&mut self) -> Option<Self::Item> { | |
loop { | |
// guard | |
if self.cursor == self.data.len() { | |
break None; | |
} | |
// check for fragment | |
if &self.data[self.cursor..self.cursor + 2] == b"\r\n" { | |
let fragment = &self.data[self.prev..self.cursor]; | |
self.cursor += 2; | |
self.prev = self.cursor; | |
break Some(fragment); | |
} | |
// fragment end not found yet | |
self.cursor += 1; | |
} | |
} | |
} | |
async fn reply(request: Request<'_>, stream: &mut TcpStream) { | |
let reply = match request { | |
Request::Set { key, value } => match DATABASE.set(key, value).await { | |
Ok(_) => "+OK\r\n".to_string(), | |
Err(err) => { | |
error!("FDB error : {err}"); | |
"-ERR Internal error".to_string() | |
} | |
}, | |
Request::Get { key } => match DATABASE.get(key).await { | |
Ok(None) => "_\r\n".to_string(), | |
Ok(Some(value)) => { | |
format!( | |
"${}\r\n{}\r\n", | |
value.len(), | |
String::from_utf8_lossy(&value) | |
) | |
} | |
Err(err) => { | |
error!("FDB error : {err}"); | |
"-ERR Internal error".to_string() | |
} | |
}, | |
Request::Del { key } => match DATABASE.del(key).await { | |
Ok(true) => ":1\r\n".to_string(), | |
Ok(false) => ":0\r\n".to_string(), | |
Err(err) => { | |
error!("FDB error : {err}"); | |
"-ERR Internal error".to_string() | |
} | |
}, | |
Request::Unknown { .. } => format!("-ERR {request}\r\n"), | |
}; | |
stream | |
.write_all(reply.as_bytes()) | |
.await | |
.expect("Unable to respond") | |
} | |
fn parse_request(buf: &[u8]) -> Request { | |
let mut splitter = Splitter::new(buf); | |
splitter.next().expect("Expect arity"); | |
splitter.next().expect("Expect command name len"); | |
match splitter.next() { | |
Some(b"set") | Some(b"SET") => { | |
splitter.next().expect("key len"); | |
let key = splitter.next().expect("key"); | |
splitter.next().expect("value len"); | |
let value = splitter.next().expect("value"); | |
Request::Set { key, value } | |
} | |
Some(b"get") | Some(b"GET") => { | |
splitter.next().expect("key len"); | |
let key = splitter.next().expect("key"); | |
Request::Get { key } | |
} | |
Some(b"del") | Some(b"DEL") => { | |
splitter.next().expect("key len"); | |
let key = splitter.next().expect("key"); | |
Request::Del { key } | |
} | |
Some(command) => Request::Unknown { command }, | |
None => panic!("Expected command"), | |
} | |
} | |
async fn handle_stream(mut stream: TcpStream) -> eyre::Result<()> { | |
info!("Handle new connection"); | |
let mut buf = [0_u8; 1024]; | |
loop { | |
let size = stream.read(&mut buf).await?; | |
if size == 0 { | |
// client disconnected | |
info!("Client disconnected"); | |
break; | |
} | |
debug!("Request size : {size}"); | |
debug!("Request : {}", String::from_utf8_lossy(&buf[..size])); | |
let request = parse_request(&buf[..size]); | |
debug!("Request: {request}"); | |
reply(request, &mut stream).await; | |
} | |
Ok(()) | |
} | |
#[tokio::main] | |
async fn main() -> eyre::Result<()> { | |
env_logger::init(); | |
let (tx, mut rx) = tokio::sync::oneshot::channel::<()>(); | |
tokio::spawn(async { | |
ctrl_c().await.expect("Unable to register signal"); | |
tx.send(()).expect("Unable to send shutdown"); | |
}); | |
let listener = tokio::net::TcpListener::bind("0.0.0.0:7000").await?; | |
info!("Start listening on port 7000"); | |
loop { | |
tokio::select! { | |
_ = &mut rx => { | |
break | |
}, | |
result = listener.accept() => { | |
if let Ok((stream, _)) = result { | |
info!("start"); | |
let _ = tokio::spawn(handle_stream(stream)).await; | |
info!("end"); | |
} | |
} | |
} | |
} | |
info!("stop"); | |
Ok(()) | |
} | |
#[test] | |
fn test_splitter() { | |
let buf = b"*3\r\n$3\r\nSET\r\n$4\r\ntoto\r\n$4\r\ntata\r\n"; | |
let mut splitter = Splitter::new(buf); | |
dbg!(splitter.next().map(String::from_utf8_lossy)); | |
dbg!(splitter.next().map(String::from_utf8_lossy)); | |
} | |
#[test] | |
fn test_parser() { | |
let buf = b"*3\r\n$3\r\nSET\r\n$4\r\ntoto\r\n$4\r\ntata\r\n"; | |
dbg!(parse_request(buf)); | |
let buf = b"*3\r\n$3\r\nGET\r\n$4\r\ntoto\r\n"; | |
dbg!(parse_request(buf)); | |
let buf = b"*3\r\n$3\r\nDEL\r\n$4\r\ntoto\r\n"; | |
dbg!(parse_request(buf)); | |
let buf = b"*3\r\n$4\r\nDEL8\r\n$4\r\ntoto\r\n"; | |
dbg!(parse_request(buf)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment