Skip to content

Instantly share code, notes, and snippets.

@netmarkjp
Last active July 14, 2022 14:41
Show Gist options
  • Save netmarkjp/13025849ad89ac49514fcc1ab9256fc9 to your computer and use it in GitHub Desktop.
Save netmarkjp/13025849ad89ac49514fcc1ab9256fc9 to your computer and use it in GitHub Desktop.
wip-isucon11q-tranining-rust
alter table `isu_condition` add column (`condition_level` VARCHAR(20) NOT NULL);
-- alter table `isu_condition` add index (`jia_isu_uuid`, `timestamp`, `condition_level`);
alter table `isu_condition` add index (`jia_isu_uuid`, `timestamp`);
-- alter table `isu` add index (`character`);
DROP TABLE IF EXISTS `last_isu_condition`;
CREATE TABLE `last_isu_condition` (
`id` bigint(20) NOT NULL,
`jia_isu_uuid` char(36) NOT NULL,
`timestamp` datetime NOT NULL,
`is_sitting` tinyint(1) NOT NULL,
`condition` varchar(255) NOT NULL,
`message` varchar(255) NOT NULL,
`created_at` datetime(6) DEFAULT current_timestamp(6),
`condition_level` varchar(20) NOT NULL,
PRIMARY KEY (`jia_isu_uuid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ;
upstream app {
server 127.0.0.1:3000;
keepalive_requests 1000000; # nginx-1.16以降
keepalive 128;
}
server {
listen 443 ssl http2;
ssl_certificate /etc/nginx/certificates/tls-cert.pem;
ssl_certificate_key /etc/nginx/certificates/tls-key.pem;
location / {
client_body_buffer_size 256k;
# proxy_set_header Connection "close";
proxy_set_header Connection "";
proxy_http_version 1.1;
proxy_set_header Host $http_host;
proxy_pass http://app;
}
location /assets/ {
root /home/isucon/webapp/public;
access_log off;
}
location /internal/icons/ {
root /home/isucon/webapp/public;
}
}
diff --git a/rust/src/main.rs b/rust/src/main.rs
index 67ad587..914f92d 100644
--- a/rust/src/main.rs
+++ b/rust/src/main.rs
@@ -5,7 +5,12 @@ use chrono::TimeZone as _;
use chrono::{DateTime, NaiveDateTime};
use futures::StreamExt as _;
use futures::TryStreamExt as _;
+// use tokio::time::interval;
use std::collections::{HashMap, HashSet};
+use std::io::{BufWriter, Write};
+use std::sync::{Arc, Mutex};
+// use std::time::Duration;
+use tokio::sync::mpsc;
const SESSION_NAME: &str = "isucondition_rust";
const CONDITION_LIMIT: usize = 20;
@@ -97,6 +102,7 @@ struct IsuCondition {
condition: String,
message: String,
created_at: DateTime<chrono::FixedOffset>,
+ condition_level: String,
}
impl sqlx::FromRow<'_, sqlx::mysql::MySqlRow> for IsuCondition {
fn from_row(row: &sqlx::mysql::MySqlRow) -> sqlx::Result<Self> {
@@ -115,10 +121,20 @@ impl sqlx::FromRow<'_, sqlx::mysql::MySqlRow> for IsuCondition {
condition: row.try_get("condition")?,
message: row.try_get("message")?,
created_at,
+ condition_level: row.try_get("condition_level")?,
})
}
}
+#[derive(Debug, Clone)]
+struct PostIsuCondition {
+ jia_isu_uuid: String,
+ timestamp: NaiveDateTime,
+ is_sitting: bool,
+ condition: String,
+ message: String,
+}
+
#[derive(Debug)]
struct MySQLConnectionEnv {
host: String,
@@ -200,7 +216,7 @@ struct GetIsuConditionResponse {
message: String,
}
-#[derive(Debug, serde::Serialize)]
+#[derive(Debug, serde::Serialize, Clone)]
struct TrendResponse {
character: String,
info: Vec<TrendCondition>,
@@ -208,7 +224,7 @@ struct TrendResponse {
critical: Vec<TrendCondition>,
}
-#[derive(Debug, serde::Serialize)]
+#[derive(Debug, serde::Serialize, Clone)]
struct TrendCondition {
#[serde(rename = "isu_id")]
id: i64,
@@ -251,6 +267,7 @@ async fn main() -> std::io::Result<()> {
.port(mysql_connection_env.port)
.database(&mysql_connection_env.db_name)
.username(&mysql_connection_env.user)
+ .statement_cache_capacity(8192)
.password(&mysql_connection_env.password),
)
.await
@@ -263,6 +280,153 @@ async fn main() -> std::io::Result<()> {
session_key.resize(32, 0);
}
+ // let flush_trigger: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
+ // let flush_trigger_wk = flush_trigger.clone();
+ // let _flush_trigger_worker = tokio::spawn(async move {
+ // let mut itv = interval(Duration::from_millis(300));
+ // loop {
+ // itv.tick().await;
+ // {
+ // let mut flush_trigger_wk = flush_trigger_wk.lock().unwrap();
+ // *flush_trigger_wk = true;
+ // }
+ // }
+ // });
+ // let flush_trigger_wk2 = flush_trigger.clone();
+
+ let (isu_condition_tx, mut isu_condition_rx): (
+ mpsc::Sender<PostIsuCondition>,
+ mpsc::Receiver<PostIsuCondition>,
+ ) = mpsc::channel(65535); // FIXME サイズは適当
+
+ // FIXME pool.acquire().await.unwrap().leak()がよくわからない
+ let mut thread_pool = pool.acquire().await.unwrap().leak();
+ let _bulk_insert_worker = tokio::spawn(async move {
+ let mut post_isu_conditions = Vec::new();
+ let mut updated_jia_isu_uuids: HashMap<String, NaiveDateTime> = HashMap::new();
+ // let mut trigger = false;
+ while let Some(val) = isu_condition_rx.recv().await {
+ updated_jia_isu_uuids.insert(val.jia_isu_uuid.clone(), val.timestamp.clone());
+ post_isu_conditions.push(val);
+ // // n秒経ったらflush。いれるとスコアが落ちる...
+ // {
+ // let mut flush_trigger_wk2 = flush_trigger_wk2.lock().unwrap();
+ // if post_isu_conditions.len() >= 8000 || *flush_trigger_wk2 {
+ // trigger = true;
+ // *flush_trigger_wk2 = false;
+ // }
+ // }
+ // if trigger {
+ if post_isu_conditions.len() >= 10000 {
+ let mut query_builder: sqlx::QueryBuilder<sqlx::MySql> = sqlx::QueryBuilder::new(
+ "INSERT INTO `isu_condition` (`jia_isu_uuid`, `timestamp`, `is_sitting`, `condition`, `message`, `condition_level`) "
+ );
+
+ query_builder.push_values(post_isu_conditions.clone(), |mut b, cond| {
+ let condition_level =
+ calculate_condition_level(cond.condition.as_str()).unwrap();
+ b.push_bind(cond.jia_isu_uuid)
+ .push_bind(cond.timestamp)
+ .push_bind(cond.is_sitting)
+ .push_bind(cond.condition)
+ .push_bind(cond.message)
+ .push_bind(condition_level);
+ });
+
+ let ret = query_builder.build().execute(&mut thread_pool).await;
+ match ret {
+ Ok(_) => {}
+ Err(val) => {
+ log::error!("{:?}", val);
+ }
+ }
+
+ {
+ for (k, v) in updated_jia_isu_uuids.iter() {
+ sqlx::query(
+ "REPLACE INTO `last_isu_condition` SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` = ?",
+ )
+ .bind(k)
+ .bind(v)
+ .execute(&mut thread_pool)
+ .await
+ .unwrap();
+ }
+ }
+ updated_jia_isu_uuids.clear();
+ post_isu_conditions.clear();
+ }
+ }
+ });
+
+ let jia_isu_uuid_master: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
+ {
+ let uuids: Vec<String> = sqlx::query_scalar("SELECT distinct jia_isu_uuid FROM `isu`")
+ .fetch_all(&mut pool.acquire().await.unwrap())
+ .await
+ .unwrap();
+ {
+ // lockしてArc<Mutex<T>>から取り出し
+ let mut jia_isu_uuid_master = jia_isu_uuid_master.lock().unwrap();
+ jia_isu_uuid_master.clear();
+ for uuid in uuids {
+ jia_isu_uuid_master.push(uuid.clone());
+ }
+ }
+ }
+
+ let jia_user_id_master: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
+ {
+ let jia_user_ids: Vec<String> =
+ sqlx::query_scalar("SELECT DISTINCT `jia_user_id` FROM `user`")
+ .fetch_all(&mut pool.acquire().await.unwrap())
+ .await
+ .unwrap();
+ {
+ // lockしてArc<Mutex<T>>から取り出し
+ let mut jia_user_id_master = jia_user_id_master.lock().unwrap();
+ jia_user_id_master.clear();
+ for jia_user_id in jia_user_ids {
+ jia_user_id_master.push(jia_user_id.clone());
+ }
+ }
+ }
+
+ // key: jia_user_id, value: vec of jia_isu_uuid
+ let isu_jia_user_id_jia_isu_uuid_master: Arc<Mutex<HashMap<String, Vec<String>>>> =
+ Arc::new(Mutex::new(HashMap::new()));
+ {
+ let isu_jia_user_id_jia_isu_uuid_master = isu_jia_user_id_jia_isu_uuid_master.clone();
+
+ #[derive(sqlx::FromRow)]
+ struct IsuTmp {
+ jia_user_id: String,
+ jia_isu_uuid: String,
+ }
+ let tmps: Vec<IsuTmp> = sqlx::query_as("SELECT `jia_user_id`, `jia_isu_uuid` FROM `isu`")
+ .fetch_all(&mut pool.acquire().await.unwrap())
+ .await
+ .unwrap();
+
+ // lockしてArc<Mutex<T>>から取り出し
+ let mut isu_jia_user_id_jia_isu_uuid_master =
+ isu_jia_user_id_jia_isu_uuid_master.lock().unwrap();
+
+ isu_jia_user_id_jia_isu_uuid_master.clear();
+ for tmp in tmps {
+ let val: Option<&Vec<String>> =
+ isu_jia_user_id_jia_isu_uuid_master.get(&tmp.jia_user_id.clone());
+ if val.is_none() {
+ isu_jia_user_id_jia_isu_uuid_master
+ .insert(tmp.jia_user_id.clone(), vec![tmp.jia_isu_uuid.clone()]);
+ } else {
+ let val: &Vec<String> = val.unwrap();
+ let mut val: Vec<String> = (*val).clone();
+ val.push(tmp.jia_isu_uuid.clone());
+ }
+ }
+ }
+
let server = actix_web::HttpServer::new(move || {
actix_web::App::new()
.app_data(web::JsonConfig::default().error_handler(|err, _| {
@@ -273,7 +437,14 @@ async fn main() -> std::io::Result<()> {
}
}))
.app_data(web::Data::new(pool.clone()))
- .wrap(actix_web::middleware::Logger::default())
+ .app_data(web::Data::new(isu_condition_tx.clone()))
+ // masters
+ .app_data(web::Data::new((
+ jia_isu_uuid_master.clone(),
+ jia_user_id_master.clone(),
+ isu_jia_user_id_jia_isu_uuid_master.clone(),
+ )))
+ // .wrap(actix_web::middleware::Logger::default())
.wrap(
actix_session::CookieSession::signed(&session_key)
.secure(false)
@@ -438,22 +609,21 @@ where
async fn require_signed_in<'e, 'c, E>(
executor: E,
+ jia_user_id_master: &Arc<Mutex<Vec<String>>>,
session: actix_session::Session,
) -> actix_web::Result<String>
where
'c: 'e,
E: 'e + sqlx::Executor<'c, Database = sqlx::MySql>,
{
- if let Some(jia_user_id) = session.get("jia_user_id")? {
- let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM `user` WHERE `jia_user_id` = ?")
- .bind(&jia_user_id)
- .fetch_one(executor)
- .await
- .map_err(SqlxError)?;
- if count == 0 {
- Err(actix_web::error::ErrorUnauthorized("not found: user"))
- } else {
+ if let Some(jia_user_id) = session.get::<String>("jia_user_id")? {
+ // lockしてArc<Mutex<T>>から取り出し
+ let jia_user_id_master = jia_user_id_master.lock().unwrap();
+
+ if !jia_user_id.is_empty() && jia_user_id_master.contains(&jia_user_id) {
Ok(jia_user_id)
+ } else {
+ Err(actix_web::error::ErrorUnauthorized("you are not signed in"))
}
} else {
Err(actix_web::error::ErrorUnauthorized("you are not signed in"))
@@ -476,8 +646,16 @@ async fn get_jia_service_url(tx: &mut sqlx::Transaction<'_, sqlx::MySql>) -> sql
#[actix_web::post("/initialize")]
async fn post_initialize(
pool: web::Data<sqlx::MySqlPool>,
+ masters: web::Data<(
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<HashMap<String, Vec<String>>>>,
+ )>,
request: web::Json<InitializeRequest>,
) -> actix_web::Result<HttpResponse> {
+ let jia_isu_uuid_master = &masters.get_ref().0;
+ let jia_user_id_master = &masters.get_ref().1;
+ let isu_jia_user_id_jia_isu_uuid_master = &masters.get_ref().2;
let status = tokio::process::Command::new("../sql/init.sh")
.status()
.await
@@ -498,6 +676,177 @@ async fn post_initialize(
.execute(pool.as_ref())
.await
.map_err(SqlxError)?;
+
+ {
+ let jia_isu_uuids: Vec<String> =
+ sqlx::query_scalar("SELECT DISTINCT `jia_isu_uuid` FROM `isu`")
+ .fetch_all(&mut pool.acquire().await.unwrap())
+ .await
+ .unwrap();
+ {
+ // lockしてArc<Mutex<T>>から取り出し
+ let mut jia_isu_uuid_master = jia_isu_uuid_master.lock().unwrap();
+ jia_isu_uuid_master.clear();
+ for jia_isu_uuid in jia_isu_uuids {
+ jia_isu_uuid_master.push(jia_isu_uuid.clone());
+ }
+ }
+ }
+
+ {
+ let jia_user_ids: Vec<String> =
+ sqlx::query_scalar("SELECT DISTINCT `jia_user_id` FROM `user`")
+ .fetch_all(&mut pool.acquire().await.unwrap())
+ .await
+ .unwrap();
+ {
+ // lockしてArc<Mutex<T>>から取り出し
+ let mut jia_user_id_master = jia_user_id_master.lock().unwrap();
+ jia_user_id_master.clear();
+ for jia_user_id in jia_user_ids {
+ jia_user_id_master.push(jia_user_id.clone());
+ }
+ }
+ }
+
+ // iconをクリア・ダンプ
+ // /internal/icons/{jia_isu_uuid}
+ // ファイルのありかは /home/isucon/public/internal/icons/{jia_isu_uuid}
+ let status = tokio::process::Command::new("install")
+ .args(&[
+ "-d",
+ "-m",
+ "777",
+ "/home/isucon/webapp/public/internal/icons",
+ ])
+ .status()
+ .await
+ .unwrap();
+ if !status.success() {
+ log::error!("exec install failed with exit code {:?}", status.code());
+ return Err(actix_web::error::ErrorInternalServerError(""));
+ }
+ let status = tokio::process::Command::new("rm")
+ .args(&["-rf", "/home/isucon/webapp/public/internal/icons"])
+ .status()
+ .await
+ .unwrap();
+ if !status.success() {
+ log::error!("exec rm -rf failed with exit code {:?}", status.code());
+ return Err(actix_web::error::ErrorInternalServerError(""));
+ }
+ let status = tokio::process::Command::new("install")
+ .args(&[
+ "-d",
+ "-m",
+ "777",
+ "/home/isucon/webapp/public/internal/icons",
+ ])
+ .status()
+ .await
+ .unwrap();
+ if !status.success() {
+ log::error!("exec install failed with exit code {:?}", status.code());
+ return Err(actix_web::error::ErrorInternalServerError(""));
+ }
+
+ #[derive(sqlx::FromRow)]
+ struct IsuIcon {
+ jia_isu_uuid: String,
+ image: Vec<u8>,
+ }
+ let isu_icons: Vec<IsuIcon> = sqlx::query_as("SELECT `jia_isu_uuid`, `image` FROM `isu`")
+ .fetch_all(pool.as_ref())
+ .await
+ .map_err(SqlxError)?;
+ for isu_icon in isu_icons {
+ let jia_isu_uuid = isu_icon.jia_isu_uuid;
+ let image = isu_icon.image;
+ let mut writer = BufWriter::new(std::fs::File::create(format!(
+ "/home/isucon/webapp/public/internal/icons/{jia_isu_uuid}.jpg"
+ ))?);
+ writer.write_all(image.as_ref()).unwrap();
+ writer.flush().unwrap();
+ }
+
+ {
+ #[derive(sqlx::FromRow)]
+ struct IsuTmp {
+ jia_user_id: String,
+ jia_isu_uuid: String,
+ }
+ let tmps: Vec<IsuTmp> = sqlx::query_as("SELECT `jia_user_id`, `jia_isu_uuid` FROM `isu`")
+ .fetch_all(&mut pool.acquire().await.unwrap())
+ .await
+ .unwrap();
+
+ // lockしてArc<Mutex<T>>から取り出し
+ let mut isu_jia_user_id_jia_isu_uuid_master =
+ isu_jia_user_id_jia_isu_uuid_master.lock().unwrap();
+
+ isu_jia_user_id_jia_isu_uuid_master.clear();
+ // let mut vvv = isu_jia_user_id_jia_isu_uuid_master.get(&String::from("a"));
+
+ for tmp in tmps {
+ let k: String = tmp.jia_user_id.clone();
+ let v: String = tmp.jia_isu_uuid.clone();
+ match isu_jia_user_id_jia_isu_uuid_master.get_mut(&k) {
+ None => {
+ isu_jia_user_id_jia_isu_uuid_master.insert(k, vec![v]);
+ }
+ Some(val) => {
+ (*val).push(v);
+ }
+ }
+ }
+ }
+
+ {
+ #[derive(sqlx::FromRow)]
+ struct Tmp {
+ id: i64,
+ condition: String,
+ }
+ let tmps: Vec<Tmp> = sqlx::query_as("SELECT `id`, `condition` FROM `isu_condition`")
+ .fetch_all(&mut pool.acquire().await.unwrap())
+ .await
+ .unwrap();
+ for tmp in tmps {
+ sqlx::query("UPDATE `isu_condition` SET `condition_level` = ? WHERE `id` = ?")
+ .bind(calculate_condition_level(tmp.condition.as_str()).unwrap())
+ .bind(tmp.id)
+ .execute(&mut pool.acquire().await.unwrap())
+ .await
+ .unwrap();
+ }
+ }
+
+ {
+ let jia_isu_uuids: Vec<String> =
+ sqlx::query_scalar("SELECT DISTINCT `jia_isu_uuid` FROM `isu_condition`")
+ .fetch_all(&mut pool.acquire().await.unwrap())
+ .await
+ .unwrap();
+
+ for jia_isu_uuid in jia_isu_uuids {
+ let last_condition_id: Vec<i64> = sqlx::query_scalar(
+ "SELECT `id` FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY `timestamp` DESC LIMIT 1"
+ )
+ .bind(jia_isu_uuid)
+ .fetch_all(&mut pool.acquire().await.unwrap())
+ .await
+ .unwrap();
+
+ sqlx::query(
+ "INSERT INTO `last_isu_condition` SELECT * FROM `isu_condition` WHERE `id` = ?",
+ )
+ .bind(last_condition_id[0])
+ .execute(&mut pool.acquire().await.unwrap())
+ .await
+ .unwrap();
+ }
+ }
+
Ok(HttpResponse::Ok().json(InitializeResponse {
language: "rust".to_owned(),
}))
@@ -512,9 +861,16 @@ struct Claims {
#[actix_web::post("/api/auth")]
async fn post_authentication(
pool: web::Data<sqlx::MySqlPool>,
+ masters: web::Data<(
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<HashMap<String, Vec<String>>>>,
+ )>,
request: actix_web::HttpRequest,
session: actix_session::Session,
) -> actix_web::Result<HttpResponse> {
+ let jia_isu_uuid_master = &masters.get_ref().0;
+ let jia_user_id_master = &masters.get_ref().1;
let req_jwt = request
.headers()
.get("Authorization")
@@ -543,6 +899,12 @@ async fn post_authentication(
.await
.map_err(SqlxError)?;
+ {
+ // lockしてArc<Mutex<T>>から取り出し
+ let mut jia_user_id_master = jia_user_id_master.lock().unwrap();
+ jia_user_id_master.push(jia_user_id.clone());
+ }
+
session.insert("jia_user_id", jia_user_id).map_err(|e| {
log::error!("failed to set cookie: {}", e);
e
@@ -565,9 +927,16 @@ async fn post_signout(session: actix_session::Session) -> actix_web::Result<Http
#[actix_web::get("/api/user/me")]
async fn get_me(
pool: web::Data<sqlx::MySqlPool>,
+ masters: web::Data<(
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<HashMap<String, Vec<String>>>>,
+ )>,
session: actix_session::Session,
) -> actix_web::Result<HttpResponse> {
- let jia_user_id = require_signed_in(pool.as_ref(), session).await?;
+ let jia_isu_uuid_master = &masters.get_ref().0;
+ let jia_user_id_master = &masters.get_ref().1;
+ let jia_user_id = require_signed_in(pool.as_ref(), jia_user_id_master, session).await?;
Ok(HttpResponse::Ok().json(GetMeResponse { jia_user_id }))
}
@@ -575,9 +944,15 @@ async fn get_me(
#[actix_web::get("/api/isu")]
async fn get_isu_list(
pool: web::Data<sqlx::MySqlPool>,
+ masters: web::Data<(
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<HashMap<String, Vec<String>>>>,
+ )>,
session: actix_session::Session,
) -> actix_web::Result<HttpResponse> {
- let jia_user_id = require_signed_in(pool.as_ref(), session).await?;
+ let jia_user_id_master = &masters.get_ref().1;
+ let jia_user_id = require_signed_in(pool.as_ref(), jia_user_id_master, session).await?;
let mut tx = pool.begin().await.map_err(SqlxError)?;
@@ -591,10 +966,9 @@ async fn get_isu_list(
let mut response_list = Vec::new();
for isu in isu_list {
let last_condition: Option<IsuCondition> = fetch_optional_as(
- sqlx::query_as(
- "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY `timestamp` DESC LIMIT 1"
- ).bind(&isu.jia_isu_uuid),
- &mut tx
+ sqlx::query_as("SELECT * FROM `last_isu_condition` WHERE `jia_isu_uuid` = ?")
+ .bind(&isu.jia_isu_uuid),
+ &mut tx,
)
.await
.map_err(SqlxError)?;
@@ -636,10 +1010,18 @@ async fn get_isu_list(
#[actix_web::post("/api/isu")]
async fn post_isu(
pool: web::Data<sqlx::MySqlPool>,
+ masters: web::Data<(
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<HashMap<String, Vec<String>>>>,
+ )>,
session: actix_session::Session,
mut payload: actix_multipart::Multipart,
) -> actix_web::Result<HttpResponse> {
- let jia_user_id = require_signed_in(pool.as_ref(), session).await?;
+ let jia_isu_uuid_master = &masters.get_ref().0;
+ let jia_user_id_master = &masters.get_ref().1;
+ let isu_jia_user_id_jia_isu_uuid_master = &masters.get_ref().2;
+ let jia_user_id = require_signed_in(pool.as_ref(), jia_user_id_master, session).await?;
let mut jia_isu_uuid = None;
let mut isu_name = None;
@@ -699,6 +1081,37 @@ async fn post_isu(
}
result.map_err(SqlxError)?;
+ // オンメモリキャッシュに追加
+ {
+ // lockしてArc<Mutex<T>>から取り出し
+ let mut jia_isu_uuid_master = jia_isu_uuid_master.lock().unwrap();
+ if !jia_isu_uuid_master.contains(&jia_isu_uuid) {
+ jia_isu_uuid_master.push(jia_isu_uuid.clone());
+ }
+ }
+
+ {
+ // lockしてArc<Mutex<T>>から取り出し
+ let mut isu_jia_user_id_jia_isu_uuid_master =
+ isu_jia_user_id_jia_isu_uuid_master.lock().unwrap();
+ let k: String = jia_user_id.clone();
+ let v: String = jia_isu_uuid.clone();
+ match isu_jia_user_id_jia_isu_uuid_master.get_mut(&k) {
+ None => {
+ isu_jia_user_id_jia_isu_uuid_master.insert(k, vec![v]);
+ }
+ Some(val) => {
+ (*val).push(v);
+ }
+ }
+ }
+
+ let mut writer = BufWriter::new(std::fs::File::create(format!(
+ "/home/isucon/webapp/public/internal/icons/{jia_isu_uuid}.jpg"
+ ))?);
+ writer.write_all(image.as_ref()).unwrap();
+ writer.flush().unwrap();
+
let target_url = format!(
"{}/api/activate",
get_jia_service_url(&mut tx).await.map_err(SqlxError)?
@@ -764,10 +1177,16 @@ async fn post_isu(
#[actix_web::get("/api/isu/{jia_isu_uuid}")]
async fn get_isu_id(
pool: web::Data<sqlx::MySqlPool>,
+ masters: web::Data<(
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<HashMap<String, Vec<String>>>>,
+ )>,
session: actix_session::Session,
jia_isu_uuid: web::Path<String>,
) -> actix_web::Result<HttpResponse> {
- let jia_user_id = require_signed_in(pool.as_ref(), session).await?;
+ let jia_user_id_master = &masters.get_ref().1;
+ let jia_user_id = require_signed_in(pool.as_ref(), jia_user_id_master, session).await?;
let isu: Option<Isu> =
sqlx::query_as("SELECT * FROM `isu` WHERE `jia_user_id` = ? AND `jia_isu_uuid` = ?")
@@ -788,24 +1207,36 @@ async fn get_isu_id(
#[actix_web::get("/api/isu/{jia_isu_uuid}/icon")]
async fn get_isu_icon(
pool: web::Data<sqlx::MySqlPool>,
+ masters: web::Data<(
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<HashMap<String, Vec<String>>>>,
+ )>,
session: actix_session::Session,
jia_isu_uuid: web::Path<String>,
) -> actix_web::Result<HttpResponse> {
- let jia_user_id = require_signed_in(pool.as_ref(), session).await?;
-
- let image: Option<Vec<u8>> = sqlx::query_scalar(
- "SELECT `image` FROM `isu` WHERE `jia_user_id` = ? AND `jia_isu_uuid` = ?",
- )
- .bind(&jia_user_id)
- .bind(jia_isu_uuid.as_ref())
- .fetch_optional(pool.as_ref())
- .await
- .map_err(SqlxError)?;
-
- if let Some(image) = image {
- Ok(HttpResponse::Ok().body(image))
- } else {
- Err(actix_web::error::ErrorNotFound("not found: isu"))
+ let jia_isu_uuid_master = &masters.get_ref().0;
+ let jia_user_id_master = &masters.get_ref().1;
+ let isu_jia_user_id_jia_isu_uuid_master = &masters.get_ref().2;
+ let jia_user_id = require_signed_in(pool.as_ref(), jia_user_id_master, session).await?;
+
+ let isu_jia_user_id_jia_isu_uuid_master = isu_jia_user_id_jia_isu_uuid_master.lock().unwrap();
+ let k: String = jia_user_id.clone();
+ let v: String = jia_isu_uuid.clone();
+ match isu_jia_user_id_jia_isu_uuid_master.get(&k) {
+ None => Err(actix_web::error::ErrorNotFound("not found: isu")),
+ Some(val) => {
+ if (*val).contains(&v) {
+ Ok(HttpResponse::Ok()
+ .append_header((
+ "X-Accel-Redirect",
+ format!("/internal/icons/{jia_isu_uuid}.jpg"),
+ ))
+ .body(""))
+ } else {
+ Err(actix_web::error::ErrorNotFound("not found: isu"))
+ }
+ }
}
}
@@ -818,11 +1249,18 @@ struct GetIsuGraphQuery {
#[actix_web::get("/api/isu/{jia_isu_uuid}/graph")]
async fn get_isu_graph(
pool: web::Data<sqlx::MySqlPool>,
+ masters: web::Data<(
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<HashMap<String, Vec<String>>>>,
+ )>,
session: actix_session::Session,
jia_isu_uuid: web::Path<String>,
query: web::Query<GetIsuGraphQuery>,
) -> actix_web::Result<HttpResponse> {
- let jia_user_id = require_signed_in(pool.as_ref(), session).await?;
+ let jia_user_id_master = &masters.get_ref().1;
+ let isu_jia_user_id_jia_isu_uuid_master = &masters.get_ref().2;
+ let jia_user_id = require_signed_in(pool.as_ref(), jia_user_id_master, session).await?;
let date = match &query.datetime {
Some(datetime_str) => match datetime_str.parse() {
@@ -840,22 +1278,27 @@ async fn get_isu_graph(
}
};
- let mut tx = pool.begin().await.map_err(SqlxError)?;
-
- let count: i64 = fetch_one_scalar(
- sqlx::query_scalar(
- "SELECT COUNT(*) FROM `isu` WHERE `jia_user_id` = ? AND `jia_isu_uuid` = ?",
- )
- .bind(&jia_user_id)
- .bind(jia_isu_uuid.as_ref()),
- &mut tx,
- )
- .await
- .map_err(SqlxError)?;
- if count == 0 {
- return Err(actix_web::error::ErrorNotFound("not found: isu"));
+ {
+ let isu_jia_user_id_jia_isu_uuid_master =
+ isu_jia_user_id_jia_isu_uuid_master.lock().unwrap();
+ let k: String = jia_user_id.clone();
+ let v: String = jia_isu_uuid.clone();
+ match isu_jia_user_id_jia_isu_uuid_master.get(&k) {
+ None => {
+ return Err(actix_web::error::ErrorNotFound("not found: isu"));
+ }
+ Some(val) => {
+ if (*val).contains(&v) {
+ ""; // pass
+ } else {
+ return Err(actix_web::error::ErrorNotFound("not found: isu"));
+ }
+ }
+ }
}
+ let mut tx = pool.begin().await.map_err(SqlxError)?;
+
let res = generate_isu_graph_response(&mut tx, &jia_isu_uuid, date).await?;
tx.commit().await.map_err(SqlxError)?;
@@ -875,10 +1318,16 @@ async fn generate_isu_graph_response(
let mut start_time_in_this_hour =
DateTime::from_utc(NaiveDateTime::from_timestamp(0, 0), JST_OFFSET.fix());
+ let query_end_time = graph_date + chrono::Duration::hours(24);
+ let query_begin_time = graph_date
+ .duration_trunc(chrono::Duration::days(1))
+ .unwrap();
let mut rows = sqlx::query_as(
- "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY `timestamp` ASC",
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` >= ? AND `timestamp` <= ? ORDER BY `timestamp` ASC",
)
.bind(jia_isu_uuid)
+ .bind(query_begin_time.naive_local())
+ .bind(query_end_time.naive_local())
.fetch(tx);
while let Some(row) = rows.next().await {
@@ -1030,11 +1479,17 @@ struct GetIsuConditionsQuery {
#[actix_web::get("/api/condition/{jia_isu_uuid}")]
async fn get_isu_conditions(
pool: web::Data<sqlx::MySqlPool>,
+ masters: web::Data<(
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<HashMap<String, Vec<String>>>>,
+ )>,
session: actix_session::Session,
jia_isu_uuid: web::Path<String>,
query: web::Query<GetIsuConditionsQuery>,
) -> actix_web::Result<HttpResponse> {
- let jia_user_id = require_signed_in(pool.as_ref(), session).await?;
+ let jia_user_id_master = &masters.get_ref().1;
+ let jia_user_id = require_signed_in(pool.as_ref(), jia_user_id_master, session).await?;
if jia_isu_uuid.is_empty() {
return Err(actix_web::error::ErrorBadRequest("missing: jia_isu_uuid"));
@@ -1112,44 +1567,59 @@ async fn get_isu_conditions_from_db(
limit: usize,
isu_name: &str,
) -> sqlx::Result<Vec<GetIsuConditionResponse>> {
- let conditions: Vec<IsuCondition> = if let Some(ref start_time) = start_time {
- sqlx::query_as(
- "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND ? <= `timestamp` ORDER BY `timestamp` DESC",
- )
- .bind(jia_isu_uuid)
- .bind(end_time.naive_local())
- .bind(start_time.naive_local())
- .fetch_all(pool)
- } else {
- sqlx::query_as(
- "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? ORDER BY `timestamp` DESC",
- )
- .bind(jia_isu_uuid)
- .bind(end_time.naive_local())
- .fetch_all(pool)
- }.await?;
-
let mut conditions_response = Vec::new();
- for c in conditions {
- if let Some(c_level) = calculate_condition_level(&c.condition) {
- if condition_level.contains(c_level) {
- conditions_response.push(GetIsuConditionResponse {
- jia_isu_uuid: c.jia_isu_uuid,
- isu_name: isu_name.to_owned(),
- timestamp: c.timestamp.timestamp(),
- is_sitting: c.is_sitting,
- condition: c.condition,
- condition_level: c_level,
- message: c.message,
- });
- }
+
+ let in_clause = condition_level
+ .iter()
+ .map(|a| format!("'{}'", *a))
+ .collect::<Vec<String>>()
+ .join(",");
+
+ let mut conditions: Vec<IsuCondition> = Vec::new();
+ match *start_time {
+ Some(start_time) => {
+ let query = format!(
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND ? <= `timestamp` AND `condition_level` IN ({}) ORDER BY `timestamp` DESC LIMIT 20",
+ in_clause,
+ );
+ let query = query.as_str();
+ conditions = sqlx::query_as(query)
+ .bind(jia_isu_uuid)
+ .bind(end_time.naive_local())
+ .bind(start_time.naive_local())
+ .fetch_all(pool)
+ .await?;
+ }
+ None => {
+ let query = format!(
+ "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? AND `timestamp` < ? AND `condition_level` IN ({}) ORDER BY `timestamp` DESC LIMIT 20",
+ in_clause,
+ );
+ let query = query.as_str();
+ conditions = sqlx::query_as(query)
+ .bind(jia_isu_uuid)
+ .bind(end_time.naive_local())
+ .fetch_all(pool)
+ .await?;
}
}
- if conditions_response.len() > limit {
- conditions_response.truncate(limit);
+ for c in conditions {
+ let x = match c.condition_level.as_str() {
+ "info" => CONDITION_LEVEL_INFO,
+ "warning" => CONDITION_LEVEL_WARNING,
+ _ => CONDITION_LEVEL_CRITICAL,
+ };
+ conditions_response.push(GetIsuConditionResponse {
+ jia_isu_uuid: c.jia_isu_uuid,
+ isu_name: isu_name.to_owned(),
+ timestamp: c.timestamp.timestamp(),
+ is_sitting: c.is_sitting,
+ condition: c.condition,
+ condition_level: x,
+ message: c.message,
+ });
}
-
Ok(conditions_response)
}
@@ -1164,7 +1634,30 @@ fn calculate_condition_level(condition: &str) -> Option<&'static str> {
}
}
-// ISUの性格毎の最新のコンディション情報
+#[derive(sqlx::FromRow)]
+struct IsuPartialApiTrend {
+ id: i64,
+ jia_isu_uuid: String,
+}
+
+struct IsuConditionPartialApiTrend {
+ timestamp: DateTime<chrono::FixedOffset>,
+ condition_level: String,
+}
+impl sqlx::FromRow<'_, sqlx::mysql::MySqlRow> for IsuConditionPartialApiTrend {
+ fn from_row(row: &sqlx::mysql::MySqlRow) -> sqlx::Result<Self> {
+ use sqlx::Row as _;
+
+ let timestamp: NaiveDateTime = row.try_get("timestamp")?;
+ // DB の datetime 型は JST として解釈する
+ let timestamp = JST_OFFSET.from_local_datetime(&timestamp).unwrap();
+ Ok(Self {
+ timestamp,
+ condition_level: row.try_get("condition_level")?,
+ })
+ }
+}
+
#[actix_web::get("/api/trend")]
async fn get_trend(pool: web::Data<sqlx::MySqlPool>) -> actix_web::Result<HttpResponse> {
let character_list: Vec<String> =
@@ -1176,43 +1669,34 @@ async fn get_trend(pool: web::Data<sqlx::MySqlPool>) -> actix_web::Result<HttpRe
let mut res = Vec::new();
for character in character_list {
- let isu_list: Vec<Isu> = sqlx::query_as("SELECT * FROM `isu` WHERE `character` = ?")
- .bind(&character)
- .fetch_all(pool.as_ref())
- .await
- .map_err(SqlxError)?;
+ let isu_list: Vec<IsuPartialApiTrend> =
+ sqlx::query_as("SELECT `id`, `jia_isu_uuid` FROM `isu` WHERE `character` = ?")
+ .bind(&character)
+ .fetch_all(pool.as_ref())
+ .await
+ .map_err(SqlxError)?;
let mut character_info_isu_conditions = Vec::new();
let mut character_warning_isu_conditions = Vec::new();
let mut character_critical_isu_conditions = Vec::new();
for isu in isu_list {
- let conditions: Vec<IsuCondition> = sqlx::query_as(
- "SELECT * FROM `isu_condition` WHERE `jia_isu_uuid` = ? ORDER BY timestamp DESC",
+ let isu_last_condition: IsuConditionPartialApiTrend = sqlx::query_as(
+ "SELECT `timestamp`, `condition_level` FROM `last_isu_condition` WHERE `jia_isu_uuid` = ?",
)
.bind(&isu.jia_isu_uuid)
- .fetch_all(pool.as_ref())
+ .fetch_one(pool.as_ref())
.await
.map_err(SqlxError)?;
-
- if !conditions.is_empty() {
- let isu_last_condition = &conditions[0];
- let condition_level = calculate_condition_level(&isu_last_condition.condition);
- if condition_level.is_none() {
- log::error!("unexpected warn count");
- return Err(actix_web::error::ErrorInternalServerError(""));
- }
- let condition_level = condition_level.unwrap();
- let trend_condition = TrendCondition {
- id: isu.id,
- timestamp: isu_last_condition.timestamp.timestamp(),
- };
- match condition_level {
- "info" => character_info_isu_conditions.push(trend_condition),
- "warning" => character_warning_isu_conditions.push(trend_condition),
- "critical" => character_critical_isu_conditions.push(trend_condition),
- _ => {}
- };
- }
+ let trend_condition = TrendCondition {
+ id: isu.id,
+ timestamp: isu_last_condition.timestamp.timestamp(),
+ };
+ match isu_last_condition.condition_level.as_str() {
+ "info" => character_info_isu_conditions.push(trend_condition),
+ "warning" => character_warning_isu_conditions.push(trend_condition),
+ "critical" => character_critical_isu_conditions.push(trend_condition),
+ _ => {}
+ };
}
character_info_isu_conditions
@@ -1235,32 +1719,28 @@ async fn get_trend(pool: web::Data<sqlx::MySqlPool>) -> actix_web::Result<HttpRe
// ISUからのコンディションを受け取る
#[actix_web::post("/api/condition/{jia_isu_uuid}")]
async fn post_isu_condition(
- pool: web::Data<sqlx::MySqlPool>,
+ isu_condition_tx: web::Data<mpsc::Sender<PostIsuCondition>>,
+ masters: web::Data<(
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<Vec<String>>>,
+ Arc<Mutex<HashMap<String, Vec<String>>>>,
+ )>,
jia_isu_uuid: web::Path<String>,
req: web::Json<Vec<PostIsuConditionRequest>>,
) -> actix_web::Result<HttpResponse> {
- // TODO: 一定割合リクエストを落としてしのぐようにしたが、本来は全量さばけるようにすべき
- const DROP_PROBABILITY: f64 = 0.9;
- if rand::random::<f64>() <= DROP_PROBABILITY {
- log::warn!("drop post isu condition request");
- return Ok(HttpResponse::Accepted().finish());
- }
-
+ let jia_isu_uuid_master = &masters.get_ref().0;
+ let jia_user_id_master = &masters.get_ref().1;
if req.is_empty() {
return Err(actix_web::error::ErrorBadRequest("bad request body"));
}
- let mut tx = pool.begin().await.map_err(SqlxError)?;
-
- let count: i64 = fetch_one_scalar(
- sqlx::query_scalar("SELECT COUNT(*) FROM `isu` WHERE `jia_isu_uuid` = ?")
- .bind(jia_isu_uuid.as_ref()),
- &mut tx,
- )
- .await
- .map_err(SqlxError)?;
- if count == 0 {
- return Err(actix_web::error::ErrorNotFound("not found: isu"));
+ // オンメモリキャッシュを利用
+ {
+ // lockしてArc<Mutex<T>>から取り出し
+ let jia_isu_uuid_master = jia_isu_uuid_master.lock().unwrap();
+ if !jia_isu_uuid_master.contains(&jia_isu_uuid) {
+ return Err(actix_web::error::ErrorNotFound("not found: isu"));
+ }
}
for cond in req.iter() {
@@ -1273,20 +1753,17 @@ async fn post_isu_condition(
return Err(actix_web::error::ErrorBadRequest("bad request body"));
}
- sqlx::query(
- "INSERT INTO `isu_condition` (`jia_isu_uuid`, `timestamp`, `is_sitting`, `condition`, `message`) VALUES (?, ?, ?, ?, ?)",
- )
- .bind(jia_isu_uuid.as_ref())
- .bind(&timestamp.naive_local())
- .bind(&cond.is_sitting)
- .bind(&cond.condition)
- .bind(&cond.message)
- .execute(&mut tx)
- .await.map_err(SqlxError)?;
+ let _ = isu_condition_tx
+ .send(PostIsuCondition {
+ jia_isu_uuid: (jia_isu_uuid.clone()),
+ timestamp: (timestamp.naive_local()),
+ is_sitting: (cond.is_sitting.clone()),
+ condition: (cond.condition.clone()),
+ message: (cond.message.clone()),
+ })
+ .await;
}
- tx.commit().await.map_err(SqlxError)?;
-
Ok(HttpResponse::Accepted().finish())
}
user www-data;
worker_processes auto;
error_log /var/log/nginx/error.log warn;
pid /run/nginx.pid;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" $request_time';
access_log /var/log/nginx/access.log main buffer=128k flush=3;
open_file_cache max=4096 inactive=60s;
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
http2_max_requests 1000000;
keepalive_requests 1000000;
types_hash_max_size 2048;
gzip on;
include /etc/nginx/conf.d/*.conf;
include /etc/nginx/sites-enabled/*.conf;
}
@netmarkjp
Copy link
Author

netmarkjp commented Jul 11, 2022

3924 -> 220279
(vagrant)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment