Rewrote parts of the code

This commit is contained in:
techmetx11 2024-05-01 18:24:56 +01:00
parent 5364ea9f9c
commit 6bd9b67176
No known key found for this signature in database
GPG key ID: 20E0C88A0E7E5AF2
5 changed files with 320 additions and 126 deletions

54
Cargo.lock generated
View file

@ -196,6 +196,21 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
@ -203,6 +218,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@ -211,6 +227,34 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
@ -229,10 +273,16 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
@ -392,11 +442,13 @@ dependencies = [
name = "inv_sig_helper_rust"
version = "0.1.0"
dependencies = [
"futures",
"lazy-regex",
"regex",
"reqwest",
"rquickjs",
"tokio",
"tokio-util",
"tub",
]
@ -1059,7 +1111,9 @@ checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15"
dependencies = [
"bytes",
"futures-core",
"futures-io",
"futures-sink",
"futures-util",
"pin-project-lite",
"tokio",
"tracing",

View file

@ -8,7 +8,9 @@ edition = "2021"
[dependencies]
regex = "1.10.4"
rquickjs = {version = "0.6.0", features=["futures", "parallel"]}
tokio = { version = "1.37.0", features = ["full", "net", "macros", "rt-multi-thread", "io-std", "io-util"] }
tokio = { version = "1.37.0", features = ["full", "net", "macros", "rt-multi-thread", "io-std", "io-util", "mio"] }
reqwest = "0.12.4"
lazy-regex = "3.1.0"
tub = "0.3.7"
tokio-util = { version = "0.7.10", features=["futures-io", "futures-util", "codec"]}
futures = "0.3.30"

View file

@ -1,3 +1,4 @@
use futures::{Sink, SinkExt, Stream, StreamExt};
use rquickjs::{async_with, AsyncContext, AsyncRuntime};
use std::{num::NonZeroUsize, sync::Arc, thread::available_parallelism};
use tokio::{io::AsyncWriteExt, runtime::Handle, sync::Mutex, task::block_in_place};
@ -5,6 +6,7 @@ use tub::Pool;
use crate::{
consts::NSIG_FUNCTION_NAME,
opcode::OpcodeResponse,
player::{fetch_update, FetchUpdateStatus},
};
@ -109,44 +111,27 @@ impl GlobalState {
}
}
macro_rules! write_failure {
($s:ident, $r:ident) => {
$s.write_u32($r).await;
$s.write_u16(0x0000).await;
};
}
pub async fn process_fetch_update<W>(
state: Arc<GlobalState>,
stream: Arc<Mutex<W>>,
request_id: u32,
) where
W: tokio::io::AsyncWrite + Unpin + Send,
W: SinkExt<OpcodeResponse> + Unpin + Send,
{
let cloned_writer = stream.clone();
let mut writer;
let global_state = state.clone();
let status = fetch_update(global_state).await;
match fetch_update(global_state).await {
Ok(_x) => {
writer = cloned_writer.lock().await;
writer.write_u32(request_id).await;
// sync code to tell the client the player had updated
writer.write_u16(0xF44F).await;
println!("Successfully updated the player");
}
Err(FetchUpdateStatus::PlayerAlreadyUpdated) => {
writer = cloned_writer.lock().await;
writer.write_u32(request_id).await;
writer.write_u16(0xFFFF).await;
}
Err(_x) => {
writer = cloned_writer.lock().await;
writer.write_u32(request_id).await;
writer.write_u16(0).await;
}
}
let mut writer = cloned_writer.lock().await;
writer
.send(OpcodeResponse {
opcode: JobOpcode::ForceUpdate,
request_id,
update_status: status,
signature: Default::default(),
signature_timestamp: Default::default(),
})
.await;
}
pub async fn process_decrypt_n_signature<W>(
@ -155,7 +140,7 @@ pub async fn process_decrypt_n_signature<W>(
stream: Arc<Mutex<W>>,
request_id: u32,
) where
W: tokio::io::AsyncWrite + Unpin + Send,
W: SinkExt<OpcodeResponse> + Unpin + Send,
{
let cloned_writer = stream.clone();
let global_state = state.clone();
@ -179,7 +164,13 @@ pub async fn process_decrypt_n_signature<W>(
println!("JavaScript interpreter error (nsig code): {}", n);
}
writer = cloned_writer.lock().await;
write_failure!(writer, request_id);
writer.send(OpcodeResponse {
opcode: JobOpcode::DecryptNSignature,
request_id,
update_status: Ok(Default::default()),
signature: String::new(),
signature_timestamp: Default::default()
}).await;
return;
}
}
@ -190,7 +181,7 @@ pub async fn process_decrypt_n_signature<W>(
let mut call_string: String = String::new();
call_string += NSIG_FUNCTION_NAME;
call_string += "(\"";
call_string += &sig;
call_string += &sig.replace("\"", "\\\"");
call_string += "\")";
let decrypted_string = match ctx.eval::<String,String>(call_string) {
@ -202,19 +193,26 @@ pub async fn process_decrypt_n_signature<W>(
println!("JavaScript interpreter error (nsig code): {}", n);
}
writer = cloned_writer.lock().await;
write_failure!(writer, request_id);
writer.send(OpcodeResponse {
opcode: JobOpcode::DecryptNSignature,
request_id,
update_status: Ok(Default::default()),
signature: String::new(),
signature_timestamp: Default::default()
}).await;
return;
}
};
writer = cloned_writer.lock().await;
writer.write_u32(request_id).await;
writer.write_u16(u16::try_from(decrypted_string.len()).unwrap()).await;
writer.write_all(decrypted_string.as_bytes()).await;
println!("Decrypted signature: {}", decrypted_string);
writer.send(OpcodeResponse {
opcode: JobOpcode::DecryptNSignature,
request_id,
update_status: Ok(Default::default()),
signature: decrypted_string,
signature_timestamp: Default::default()
}).await;
})
.await;
}
@ -225,7 +223,7 @@ pub async fn process_decrypt_signature<W>(
stream: Arc<Mutex<W>>,
request_id: u32,
) where
W: tokio::io::AsyncWrite + Unpin + Send,
W: SinkExt<OpcodeResponse> + Unpin + Send,
{
let cloned_writer = stream.clone();
let global_state = state.clone();
@ -248,7 +246,13 @@ pub async fn process_decrypt_signature<W>(
println!("JavaScript interpreter error (sig code): {}", n);
}
writer = cloned_writer.lock().await;
write_failure!(writer, request_id);
writer.send(OpcodeResponse {
opcode: JobOpcode::DecryptSignature,
request_id,
update_status: Ok(Default::default()),
signature: String::new(),
signature_timestamp: Default::default()
}).await;
return;
}
}
@ -260,7 +264,7 @@ pub async fn process_decrypt_signature<W>(
let mut call_string: String = String::new();
call_string += sig_function_name;
call_string += "(\"";
call_string += &sig;
call_string += &sig.replace("\"", "\\\"");
call_string += "\")";
drop(player_info);
@ -274,19 +278,26 @@ pub async fn process_decrypt_signature<W>(
println!("JavaScript interpreter error (sig code): {}", n);
}
writer = cloned_writer.lock().await;
write_failure!(writer, request_id);
writer.send(OpcodeResponse {
opcode: JobOpcode::DecryptSignature,
request_id,
update_status: Ok(Default::default()),
signature: String::new(),
signature_timestamp: Default::default()
}).await;
return;
}
};
writer = cloned_writer.lock().await;
writer.write_u32(request_id).await;
writer.write_u16(u16::try_from(decrypted_string.len()).unwrap()).await;
writer.write_all(decrypted_string.as_bytes()).await;
println!("Decrypted signature: {}", decrypted_string);
writer.send(OpcodeResponse {
opcode: JobOpcode::DecryptSignature,
request_id,
update_status: Ok(Default::default()),
signature: decrypted_string,
signature_timestamp: Default::default(),
}).await;
})
.await;
}
@ -296,7 +307,7 @@ pub async fn process_get_signature_timestamp<W>(
stream: Arc<Mutex<W>>,
request_id: u32,
) where
W: tokio::io::AsyncWrite + Unpin + Send,
W: SinkExt<OpcodeResponse> + Unpin + Send,
{
let cloned_writer = stream.clone();
let global_state = state.clone();
@ -305,7 +316,13 @@ pub async fn process_get_signature_timestamp<W>(
let timestamp = player_info.signature_timestamp;
let mut writer = cloned_writer.lock().await;
writer.write_u32(request_id).await;
writer.write_u64(timestamp).await;
writer
.send(OpcodeResponse {
opcode: JobOpcode::GetSignatureTimestamp,
request_id,
update_status: Ok(Default::default()),
signature: String::new(),
signature_timestamp: timestamp,
})
.await;
}

View file

@ -1,17 +1,25 @@
mod consts;
mod jobs;
mod opcode;
mod player;
use ::futures::StreamExt;
use consts::DEFAULT_SOCK_PATH;
use jobs::{process_decrypt_n_signature, process_fetch_update, GlobalState, JobOpcode};
use opcode::OpcodeDecoder;
use player::fetch_update;
use std::{env::args, io::Error, sync::Arc};
use std::{env::args, future, io::Error, sync::Arc};
use tokio::{
fs::remove_file,
io::{self, AsyncReadExt, BufReader, BufWriter},
net::{UnixListener, UnixStream},
io::{
self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, BufReader,
BufWriter, Interest, Ready,
},
net::{TcpListener, UnixListener, UnixStream},
sync::Mutex,
task::{futures, spawn_blocking},
};
use tokio_util::codec::{Decoder, Framed, FramedRead, FramedWrite};
use crate::jobs::{process_decrypt_signature, process_get_signature_timestamp};
@ -32,10 +40,6 @@ macro_rules! eof_fail {
match $res {
Ok(value) => value,
Err(e) => {
if (e.kind() == io::ErrorKind::UnexpectedEof) {
$stream.get_ref().readable().await?;
continue;
}
println!("An error occurred while parsing the current request: {}", e);
break;
}
@ -54,11 +58,11 @@ async fn main() {
// have to please rust
let state: Arc<GlobalState> = Arc::new(GlobalState::new());
let socket = match UnixListener::bind(socket_url) {
let socket: UnixListener = match UnixListener::bind(socket_url) {
Ok(x) => x,
Err(x) => {
if x.kind() == std::io::ErrorKind::AddrInUse {
remove_file(socket_url).await.unwrap();
remove_file(socket_url);
UnixListener::bind(socket_url).unwrap()
} else {
println!("Error occurred while trying to bind: {}", x);
@ -78,81 +82,82 @@ async fn main() {
let (socket, _addr) = socket.accept().await.unwrap();
let cloned_state = state.clone();
tokio::spawn(async {
tokio::spawn(async move {
process_socket(cloned_state, socket).await;
});
}
}
async fn process_socket(state: Arc<GlobalState>, socket: UnixStream) -> Result<(), Error> {
async fn process_socket(state: Arc<GlobalState>, socket: UnixStream) {
let (rd, wr) = socket.into_split();
let wrapped_readstream = Arc::new(Mutex::new(BufReader::new(rd)));
let wrapped_writestream = Arc::new(Mutex::new(BufWriter::new(wr)));
let decoder = OpcodeDecoder {};
let cloned_readstream = wrapped_readstream.clone();
let mut inside_readstream = cloned_readstream.lock().await;
let sink = FramedWrite::new(wr, decoder);
let mut stream = FramedRead::new(rd, decoder);
loop {
inside_readstream.get_ref().readable().await?;
let arc_sink = Arc::new(Mutex::new(sink));
while let Some(opcode_res) = stream.next().await {
match opcode_res {
Ok(opcode) => {
println!("Received job: {}", opcode.opcode);
let cloned_writestream = wrapped_writestream.clone();
let opcode_byte: u8 = eof_fail!(inside_readstream.read_u8().await, inside_readstream);
let opcode: JobOpcode = opcode_byte.into();
let request_id: u32 = eof_fail!(inside_readstream.read_u32().await, inside_readstream);
println!("Received job: {}", opcode);
match opcode {
JobOpcode::ForceUpdate => {
let cloned_state = state.clone();
let cloned_stream = cloned_writestream.clone();
tokio::spawn(async move {
process_fetch_update(cloned_state, cloned_stream, request_id).await;
});
match opcode.opcode {
JobOpcode::ForceUpdate => {
let cloned_state = state.clone();
let cloned_sink = arc_sink.clone();
tokio::spawn(async move {
process_fetch_update(cloned_state, cloned_sink, opcode.request_id)
.await;
});
}
JobOpcode::DecryptNSignature => {
let cloned_state = state.clone();
let cloned_sink = arc_sink.clone();
tokio::spawn(async move {
process_decrypt_n_signature(
cloned_state,
opcode.signature,
cloned_sink,
opcode.request_id,
)
.await;
});
}
JobOpcode::DecryptSignature => {
let cloned_state = state.clone();
let cloned_sink = arc_sink.clone();
tokio::spawn(async move {
process_decrypt_signature(
cloned_state,
opcode.signature,
cloned_sink,
opcode.request_id,
)
.await;
});
}
JobOpcode::GetSignatureTimestamp => {
let cloned_state = state.clone();
let cloned_sink = arc_sink.clone();
tokio::spawn(async move {
process_get_signature_timestamp(
cloned_state,
cloned_sink,
opcode.request_id,
)
.await;
});
}
_ => {
continue;
}
}
}
JobOpcode::DecryptNSignature => {
let sig_size: usize = usize::from(eof_fail!(
inside_readstream.read_u16().await,
inside_readstream
));
let mut buf = vec![0u8; sig_size];
break_fail!(inside_readstream.read_exact(&mut buf).await);
let str = break_fail!(String::from_utf8(buf));
let cloned_state = state.clone();
let cloned_stream = cloned_writestream.clone();
tokio::spawn(async move {
process_decrypt_n_signature(cloned_state, str, cloned_stream, request_id).await;
});
Err(x) => {
println!("I/O error: {:?}", x);
break;
}
JobOpcode::DecryptSignature => {
let sig_size: usize = usize::from(eof_fail!(
inside_readstream.read_u16().await,
inside_readstream
));
let mut buf = vec![0u8; sig_size];
break_fail!(inside_readstream.read_exact(&mut buf).await);
let str = break_fail!(String::from_utf8(buf));
let cloned_state = state.clone();
let cloned_stream = cloned_writestream.clone();
tokio::spawn(async move {
process_decrypt_signature(cloned_state, str, cloned_stream, request_id).await;
});
}
JobOpcode::GetSignatureTimestamp => {
let cloned_state = state.clone();
let cloned_stream = cloned_writestream.clone();
tokio::spawn(async move {
process_get_signature_timestamp(cloned_state, cloned_stream, request_id).await;
});
}
_ => {}
}
}
Ok(())
}

116
src/opcode.rs Normal file
View file

@ -0,0 +1,116 @@
use std::io::ErrorKind;
use tokio_util::{
bytes::{Buf, BufMut},
codec::{Decoder, Encoder},
};
use crate::{jobs::JobOpcode, player::FetchUpdateStatus};
#[derive(Copy, Clone)]
pub struct OpcodeDecoder {}
pub struct Opcode {
pub opcode: JobOpcode,
pub request_id: u32,
pub signature: String,
}
pub struct OpcodeResponse {
pub opcode: JobOpcode,
pub request_id: u32,
pub update_status: Result<(), FetchUpdateStatus>,
pub signature: String,
pub signature_timestamp: u64,
}
impl Decoder for OpcodeDecoder {
type Item = Opcode;
type Error = std::io::Error;
fn decode(
&mut self,
src: &mut tokio_util::bytes::BytesMut,
) -> Result<Option<Self::Item>, Self::Error> {
if 5 > src.len() {
return Ok(None);
}
let opcode_byte: u8 = src[0];
let opcode: JobOpcode = opcode_byte.into();
let request_id: u32 = u32::from_be_bytes(src[1..5].try_into().unwrap());
match opcode {
JobOpcode::ForceUpdate | JobOpcode::GetSignatureTimestamp => {
src.advance(5);
Ok(Some(Opcode {
opcode,
request_id,
signature: Default::default(),
}))
}
JobOpcode::DecryptSignature | JobOpcode::DecryptNSignature => {
if 7 > src.len() {
return Ok(None);
}
let sig_size: u16 = (src[5] as u16) << 8 | (src[6] as u16);
if usize::from(sig_size) > src.len() {
return Ok(None);
}
let sig: String =
match String::from_utf8(src[7..(usize::from(sig_size) + 7)].to_vec()) {
Ok(x) => x,
Err(x) => {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
x.utf8_error(),
));
}
};
src.advance(7 + sig.len());
Ok(Some(Opcode {
opcode,
request_id,
signature: sig,
}))
}
_ => Err(std::io::Error::new(ErrorKind::InvalidInput, "")),
}
}
}
impl Encoder<OpcodeResponse> for OpcodeDecoder {
type Error = std::io::Error;
fn encode(
&mut self,
item: OpcodeResponse,
dst: &mut tokio_util::bytes::BytesMut,
) -> Result<(), Self::Error> {
dst.put_u32(item.request_id);
match item.opcode {
JobOpcode::ForceUpdate => match item.update_status {
Ok(_x) => dst.put_u16(0xF44F),
Err(FetchUpdateStatus::PlayerAlreadyUpdated) => dst.put_u16(0xFFFF),
Err(_x) => dst.put_u16(0x0000),
},
JobOpcode::DecryptSignature | JobOpcode::DecryptNSignature => {
dst.put_u16(u16::try_from(item.signature.len()).unwrap());
if !item.signature.is_empty() {
dst.put_slice(item.signature.as_bytes());
}
}
JobOpcode::GetSignatureTimestamp => {
dst.put_u64(item.signature_timestamp);
}
_ => {}
}
Ok(())
}
}