在服务器端使用内存来存储客户端发送过来的数据

电子说

1.2w人已加入

描述

在这篇文章中,我们将在服务器端使用内存来存储客户端发送过来的数据。在实现数据存储之前,我们先在客户端使用Clap库来解析命令行参数,并封装成命令发送给服务器。

Clap解析命令行参数 在Cargo.toml文件中加入clap依赖:

 

clap = {version = "3.1", features = ["derive"]}

在src目录下新建args.rs文件,写入以下代码:
 1use clap::Parser;
 2
 3#[derive(Debug, Parser)]
 4#[clap(name = "kv_client")]
 5pub enum ClientArgs {
 6    Get {
 7        #[clap(long)]
 8        key: String,
 9    },
10    Set {
11        #[clap(long)]
12        key: String,
13        #[clap(long)]
14        value: String,
15    },
16    Publish {
17        #[clap(long)]
18        topic: String,
19        #[clap(long)]
20        value: String,
21    },
22    Subscribe {
23        #[clap(long)]
24        topic: String,
25    },
26    Unsubscribe {
27        #[clap(long)]
28        topic: String,
29        #[clap(long)]
30        id: u32,
31    }
32}
在src/lib.rs中加入以下代码:
1mod args;
2pub use args::*;
修改src/bin/kv_client.rs代码:
 1#[tokio::main]
 2async fn main() -> Result<(), Box> {
 3    ......
 4
 5    let client_args = ClientArgs::parse();
 6
 7    // 解析命令行参数,生成命令
 8    let cmd = process_args(client_args).await?;
 9    // 命令编码
10    cmd.encode(&mut buf).unwrap();
11    // 发送命令
12    stream.send(buf.freeze()).await.unwrap();
13    info!("Send command successed!");
14
15    loop {
16        tokio::select! {
17            Some(Ok(buf)) = stream.next() => {
18                let cmd_res = CmdResponse::decode(&buf[..]).unwrap();
19                info!("Receive a response: {:?}", cmd_res);
20            }
21        }
22    }
23}
 
 1// 生成CmdRequest命令
 2async fn process_args(client_args: ClientArgs) -> Result> {
 3    match client_args {
 4        // 生成 GET 命令
 5        ClientArgs::Get{key} => {
 6            Ok(CmdRequest::get(key))
 7        },
 8        // 生成 SET 命令
 9        ClientArgs::Set{key, value} => {
10            Ok(CmdRequest::set(key, value.into()))
11        },
12        // 生成 PUBLISH 命令
13        ClientArgs::Publish{ topic, value } => {
14            Ok(CmdRequest::publish(topic, value.into()))
15        },
16        // 生成 SUBSCRIBE 命令
17        ClientArgs::Subscribe{ topic } => {
18            Ok(CmdRequest::subscribe(topic))
19        },
20        // 生成 UNSUBSCRIBE 命令
21        ClientArgs::Unsubscribe{topic, id} => {
22            Ok(CmdRequest::unsubscribe(topic, id))
23        }
24    }
25}

 

打开一个终端,启动kv_sever。打开另一个终端执行以下命令来测试客户端:

 

RUST_LOG=info cargo run --bin kv_client get --key mykeyRUST_LOG=info cargo run --bin kv_client set --key mykey --value myvalue

 

服务器和客户端都正常处理了收到的请求和响应。

内存存储

我们使用dashmap crate在内存中存储数据,dashmap是一个快速的并发map。

我们先创建src/storage目录,再创建src/storage/mod.rs文件,然后在src/lib.rs文件中引入storage模块。

在src/storage/mod.rs文件中定义一个storage trait,以便于以后不同存储方式的扩展,代码如下:

 

1use std::Error;
2
3use bytes::Bytes;
4
5pub trait Storage {
6    fn get(&self, key: &str) -> Result, Box>;
7    fn set(&self, key: &str, value: Bytes) -> Result<(), Box>;
8}

 

在src/storage目录下创建mem_storage.rs文件:

 

 1#[derive(Clone, Debug, Default)]
 2pub struct MemStorage {
 3    map: DashMap
 4}
 5
 6impl MemStorage {
 7    pub fn new() -> Self {
 8        Self {
 9            map: Default::default(),
10        }
11    }
12}
13
14impl Storage for MemStorage {
15    fn get(&self, key: &str) -> Result, Box> {
16        Ok(self.map.get(key).map(|v| v.value().clone()))
17    }
18
19    fn set(&self, key: &str, value: Bytes) -> Result, Box> {
20        self.map.insert(key.to_string(), value.clone());
21        Ok(Some(value))
22    }
23}

 

修改kv_server.rs代码:

 

 1async fn main() -> Result<(), Box> {
 2    ......
 3
 4    // 初始化内存存储
 5    let storage = Arc::new(MemStorage::new());
 6
 7    loop {
 8         ......
 9
10        let stor = storage.clone();
11
12        tokio::spawn(async move {
13            // 使用Frame的LengthDelimitedCodec进行编解码操作
14            let mut stream = Framed::new(stream, LengthDelimitedCodec::new());
15            while let Some(Ok(mut buf)) = stream.next().await {
16                // 对客户端发来的protobuf请求命令进行拆包
17                let cmd_req = CmdRequest::decode(&buf[..]).unwrap();
18                info!("Receive a command: {:?}", cmd_req);
19
20                // 处理请求命令
21                let cmd_res = process_cmd(cmd_req, &stor).await.unwrap();
22
23                buf.clear();
24
25                // 对protobuf的请求响应进行封包,然后发送给客户端。
26                cmd_res.encode(&mut buf).unwrap();
27                stream.send(buf.freeze()).await.unwrap();
28            }
29            info!("Client {:?} disconnected", addr);
30        });
31    }
32}
33
34// 处理请求命令,返回Response
35async fn process_cmd(req: CmdRequest, storage: &MemStorage) -> Result> {
36    match req {
37        // 处理 GET 命令
38        CmdRequest{
39            req_data:Get(Get {key})),
40        } => {
41            let value = storage.get(&key)?;
42            Ok(CmdResponse::new(200, "get success".to_string(), value.unwrap_or_default()))
43        }, 
44        // 处理 SET 命令
45        CmdRequest{
46            req_data:Set(Set {key, value})),
47        } => {
48            let value = storage.set(&key, value)?;
49            Ok(CmdResponse::new(200, "set success".to_string(), value.unwrap_or_default()))
50        }, 
51        _ => Err("Invalid command".into())
52    }
53}

 

测试

1,打开一个终端,运行kv_server:

 

RUST_LOG=info cargo run --bin kv_server

 

2,打开一个终端,运行kv_client,执行set命令:

 

RUST_LOG=info cargo run --bin kv_client set --key mykey --value myvalue

3,打开一个终端,运行kv_client,执行get命令:
RUST_LOG=info cargo run --bin kv_client get --key mykey

 

执行结果:

 

INFO kv_client: Send command successed!

INFO kv_client: Receive a response: CmdResponse { status: 200, message: "get success", value: b"myvalue" }

 




审核编辑:刘清

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分