Skip to content

Instantly share code, notes, and snippets.

@Akanoa
Last active October 14, 2024 14:06
Show Gist options
  • Save Akanoa/238abd9d0509fe4f67cd42de696cff5c to your computer and use it in GitHub Desktop.
Save Akanoa/238abd9d0509fe4f67cd42de696cff5c to your computer and use it in GitHub Desktop.
Clodo Redis : le redis dans une map
[package]
name = "clodo_redis"
version = "0.1.0"
edition = "2021"
[dependencies]
eyre = "0.6.12"
log = "0.4.22"
env_logger = "0.11.5"
tokio = { version = "1.40.0", default-features = false, features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "sync"] }
foundationdb = { version = "0.9.1", default-features = false, features = ["fdb-7_3", "embedded-fdb-include"] }
FROM foundationdb/foundationdb:7.3.43 as fdb-release
FROM foundationdb/foundationdb:7.3.43-debug as fdb
RUN yum update -y
RUN yum install gcc clang -y
RUN curl https://sh.rustup.rs -sSf | sh -s -- -y
WORKDIR /opt/clodo_redis
COPY src/main.rs src/main.rs
COPY Cargo.toml .
RUN ${HOME}/.cargo/bin/cargo build --release
FROM gcc:14.2.0 as builder
RUN cat <<EOF > /script.sh
#!/usr/bin/bash
set -m
set -x
/var/fdb/scripts/fdb_single.bash&
echo Waiting 20 secondes the FDB initialisation...
sleep 20
echo ...Done
/usr/bin/clodo_redis
EOF
COPY start.sh /var/fdb/scripts/fdb_single.bash
RUN mkdir -p /var/fdb/logs
RUN chmod +x /script.sh /var/fdb/scripts/fdb_single.bash
FROM scratch
VOLUME /var/fdb/data
ENV FDB_PORT 4500
ENV FDB_CLUSTER_FILE /var/fdb/fdb.cluster
ENV FDB_NETWORKING_MODE host
ENV FDB_COORDINATOR ""
ENV FDB_COORDINATOR_PORT 4500
ENV FDB_CLUSTER_FILE_CONTENTS ""
ENV FDB_PROCESS_CLASS unset
ENV RUST_LOG=info
ENV PATH /usr/bin/
ENV LD_LIBRARY_PATH /lib/x86_64-linux-gnu:/lib64:/usr/lib
COPY --from=builder /script.sh /script.sh
COPY --from=fdb /usr/bin/bash /usr/bin/bash
COPY --from=fdb /usr/bin/sh /bin/sh
COPY --from=fdb /usr/bin/sleep /usr/bin/sleep
COPY --from=fdb /opt/clodo_redis/target/release/clodo_redis /usr/bin/clodo_redis
COPY --from=fdb /lib64/librt.so.1 /lib64/librt.so.1
COPY --from=fdb /lib64/libdl.so.2 /lib64/libdl.so.2
COPY --from=fdb /lib64/liblzma.so.5 /lib64/liblzma.so.5
COPY --from=fdb /lib64/libz.so.1 /lib64/libz.so.1
COPY --from=fdb /lib64/libm.so.6 /lib64/libm.so.6
COPY --from=fdb /lib64/libpthread.so.0 /lib64/libpthread.so.0
COPY --from=fdb /lib64/libc.so.6 /lib64/libc.so.6
COPY --from=fdb /lib64/libtinfo.so.6 /lib64/libtinfo.so.6
COPY --from=fdb /lib64/libgcc_s.so.1 /lib64/libgcc_s.so.1
COPY --from=fdb /lib64/ld-linux-x86-64.so.2 /lib64/ld-linux-x86-64.so.2
COPY --from=fdb-release /usr/lib/libfdb_c.so /usr/lib/libfdb_c.so
COPY --from=fdb-release /usr/bin/fdbserver /usr/bin/fdbserver
COPY --from=fdb-release /usr/bin/fdbcli /usr/bin/fdbcli
COPY --from=builder /var/fdb /var/fdb
ENTRYPOINT ["/script.sh"]
#!/usr/bin/bash
set -Eeuo pipefail
set -m
#
# fdb_single.bash
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2013-2024 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
function create_cluster_file() {
FDB_CLUSTER_FILE=${FDB_CLUSTER_FILE:-/etc/foundationdb/fdb.cluster}
# mkdir -p "$(dirname "$FDB_CLUSTER_FILE")"
if [[ -n "$FDB_CLUSTER_FILE_CONTENTS" ]]; then
echo "$FDB_CLUSTER_FILE_CONTENTS" > "$FDB_CLUSTER_FILE"
elif [[ -n $FDB_COORDINATOR ]]; then
echo "$FDB_COORDINATOR"
coordinator_ip=$(dig +short "$FDB_COORDINATOR")
if [[ -z "$coordinator_ip" ]]; then
echo "Failed to look up coordinator address for $FDB_COORDINATOR" 1>&2
exit 1
fi
coordinator_port=${FDB_COORDINATOR_PORT:-4500}
echo "docker:docker@$coordinator_ip:$coordinator_port" > "$FDB_CLUSTER_FILE"
else
echo "FDB_COORDINATOR environment variable not defined" 1>&2
exit 1
fi
}
function create_server_environment() {
env_file=/var/fdb/.fdbenv
if [[ "$FDB_NETWORKING_MODE" == "host" ]]; then
public_ip=127.0.0.1
elif [[ "$FDB_NETWORKING_MODE" == "container" ]]; then
public_ip='0.0.0.0'
else
echo "Unknown FDB Networking mode \"$FDB_NETWORKING_MODE\"" 1>&2
exit 1
fi
echo "export PUBLIC_IP=$public_ip" > $env_file
if [[ -z $FDB_COORDINATOR && -z "$FDB_CLUSTER_FILE_CONTENTS" ]]; then
FDB_CLUSTER_FILE_CONTENTS="docker:docker@$public_ip:$FDB_PORT"
fi
create_cluster_file
}
function start_fdb () {
create_server_environment
source /var/fdb/.fdbenv
echo "Starting FDB server on $PUBLIC_IP:$FDB_PORT"
fdbserver --listen-address 0.0.0.0:"$FDB_PORT" \
--public-address "$PUBLIC_IP:$FDB_PORT" \
--datadir /var/fdb/data \
--logdir /var/fdb/logs \
--locality-zoneid="localhost" \
--locality-machineid="localhost" \
--knob_disable_posix_kernel_aio=1 \
--class "$FDB_PROCESS_CLASS" &
fdb_pid=$(jobs -p)
echo "fdbserver pid is: ${fdb_pid}"
}
function configure_fdb_single () {
echo "Configuring new single memory FDB database"
fdbcli --exec 'configure new single memory'
sleep 3
fdbcli --exec 'status'
}
start_fdb
sleep 5
configure_fdb_single
fg %1
use foundationdb::api::NetworkAutoStop;
use foundationdb::{Database, FdbBindingError};
use log::{debug, error, info};
use std::fmt::{Debug, Display, Formatter};
use std::sync::LazyLock;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::signal;
use tokio::signal::ctrl_c;
struct DatabaseGuard {
#[allow(unused)]
guard: NetworkAutoStop,
database: Database,
}
impl DatabaseGuard {
async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, FdbBindingError> {
let result = self
.database
.run(|trx, _| async move { Ok(trx.get(key, false).await?) })
.await?;
Ok(result.map(|value| value.to_vec()))
}
async fn set(&self, key: &[u8], value: &[u8]) -> Result<(), FdbBindingError> {
self.database
.run(|trx, _| async move {
trx.set(key, value);
Ok(())
})
.await
}
async fn del(&self, key: &[u8]) -> Result<bool, FdbBindingError> {
self.database
.run(|trx, _| async move {
if trx.get(key, false).await?.is_some() {
trx.clear(key);
return Ok(true);
}
Ok(false)
})
.await
}
}
static DATABASE: LazyLock<DatabaseGuard> = LazyLock::new(|| {
let guard = unsafe { foundationdb::boot() };
let database = Database::new(None).expect("Unable to create database");
DatabaseGuard { guard, database }
});
//*3\r\n $3\r\nSET\r\n $4\r\ntoto\r\n $4\r\ntata\r\n
//*2\r\n $3\r\nGET\r\n $4\r\ntoto\r\n
//*2\r\n $3\r\nDEL\r\n $4\r\ntoto\r\n
enum Request<'a> {
Set { key: &'a [u8], value: &'a [u8] },
Get { key: &'a [u8] },
Del { key: &'a [u8] },
Unknown { command: &'a [u8] },
}
impl Debug for Request<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{self}")
}
}
impl Display for Request<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Request::Set { key, value } => write!(
f,
"Set {{ key: {}, value: {} }}",
String::from_utf8_lossy(key),
String::from_utf8_lossy(value)
),
Request::Get { key } => write!(f, "Get {{ key: {} }}", String::from_utf8_lossy(key)),
Request::Del { key } => write!(f, "Del {{ key: {} }}", String::from_utf8_lossy(key)),
Request::Unknown { command } => {
write!(f, "Unknown command : {}", String::from_utf8_lossy(command))
}
}
}
}
struct Splitter<'a> {
data: &'a [u8],
cursor: usize,
prev: usize,
}
impl<'a> Splitter<'a> {
fn new(data: &'a [u8]) -> Self {
Self {
data,
cursor: 0,
prev: 0,
}
}
}
impl<'a> Iterator for Splitter<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
loop {
// guard
if self.cursor == self.data.len() {
break None;
}
// check for fragment
if &self.data[self.cursor..self.cursor + 2] == b"\r\n" {
let fragment = &self.data[self.prev..self.cursor];
self.cursor += 2;
self.prev = self.cursor;
break Some(fragment);
}
// fragment end not found yet
self.cursor += 1;
}
}
}
async fn reply(request: Request<'_>, stream: &mut TcpStream) {
let reply = match request {
Request::Set { key, value } => match DATABASE.set(key, value).await {
Ok(_) => "+OK\r\n".to_string(),
Err(err) => {
error!("FDB error : {err}");
"-ERR Internal error".to_string()
}
},
Request::Get { key } => match DATABASE.get(key).await {
Ok(None) => "_\r\n".to_string(),
Ok(Some(value)) => {
format!(
"${}\r\n{}\r\n",
value.len(),
String::from_utf8_lossy(&value)
)
}
Err(err) => {
error!("FDB error : {err}");
"-ERR Internal error".to_string()
}
},
Request::Del { key } => match DATABASE.del(key).await {
Ok(true) => ":1\r\n".to_string(),
Ok(false) => ":0\r\n".to_string(),
Err(err) => {
error!("FDB error : {err}");
"-ERR Internal error".to_string()
}
},
Request::Unknown { .. } => format!("-ERR {request}\r\n"),
};
stream
.write_all(reply.as_bytes())
.await
.expect("Unable to respond")
}
fn parse_request(buf: &[u8]) -> Request {
let mut splitter = Splitter::new(buf);
splitter.next().expect("Expect arity");
splitter.next().expect("Expect command name len");
match splitter.next() {
Some(b"set") | Some(b"SET") => {
splitter.next().expect("key len");
let key = splitter.next().expect("key");
splitter.next().expect("value len");
let value = splitter.next().expect("value");
Request::Set { key, value }
}
Some(b"get") | Some(b"GET") => {
splitter.next().expect("key len");
let key = splitter.next().expect("key");
Request::Get { key }
}
Some(b"del") | Some(b"DEL") => {
splitter.next().expect("key len");
let key = splitter.next().expect("key");
Request::Del { key }
}
Some(command) => Request::Unknown { command },
None => panic!("Expected command"),
}
}
async fn handle_stream(mut stream: TcpStream) -> eyre::Result<()> {
info!("Handle new connection");
let mut buf = [0_u8; 1024];
loop {
let size = stream.read(&mut buf).await?;
if size == 0 {
// client disconnected
info!("Client disconnected");
break;
}
debug!("Request size : {size}");
debug!("Request : {}", String::from_utf8_lossy(&buf[..size]));
let request = parse_request(&buf[..size]);
debug!("Request: {request}");
reply(request, &mut stream).await;
}
Ok(())
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
env_logger::init();
let (tx, mut rx) = tokio::sync::oneshot::channel::<()>();
tokio::spawn(async {
ctrl_c().await.expect("Unable to register signal");
tx.send(()).expect("Unable to send shutdown");
});
let listener = tokio::net::TcpListener::bind("0.0.0.0:7000").await?;
info!("Start listening on port 7000");
loop {
tokio::select! {
_ = &mut rx => {
break
},
result = listener.accept() => {
if let Ok((stream, _)) = result {
info!("start");
let _ = tokio::spawn(handle_stream(stream)).await;
info!("end");
}
}
}
}
info!("stop");
Ok(())
}
#[test]
fn test_splitter() {
let buf = b"*3\r\n$3\r\nSET\r\n$4\r\ntoto\r\n$4\r\ntata\r\n";
let mut splitter = Splitter::new(buf);
dbg!(splitter.next().map(String::from_utf8_lossy));
dbg!(splitter.next().map(String::from_utf8_lossy));
}
#[test]
fn test_parser() {
let buf = b"*3\r\n$3\r\nSET\r\n$4\r\ntoto\r\n$4\r\ntata\r\n";
dbg!(parse_request(buf));
let buf = b"*3\r\n$3\r\nGET\r\n$4\r\ntoto\r\n";
dbg!(parse_request(buf));
let buf = b"*3\r\n$3\r\nDEL\r\n$4\r\ntoto\r\n";
dbg!(parse_request(buf));
let buf = b"*3\r\n$4\r\nDEL8\r\n$4\r\ntoto\r\n";
dbg!(parse_request(buf));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment