使用Protobuf实现客户端与服务器之间的通信协议层

电子说

1.3w人已加入

描述

本系列是关于用Rust构建一个KV Server的系列文章,内容包括用tokio做底层异步网络通讯、使用toml文件做配置、protobuf做传输协议、内存/RockDB做数据存储、事件通知、优雅关机、并发连接限制及测量监控等。
   在上一篇文章中,我们用tokio实现了客户端和服务器的基本框架并设置了toml格式的配置文件。在这一篇文章中,我们参考Redis的命令:GET、SET、PUBLISH和SUBSCRIBE,使用Protobuf来实现客户端与服务器之间的通信协议层。为了处理Protobuf,我们加入了post库。同时加入了tracing库用于日志处理。Cargo.toml如下:
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
[dependencies]anyhow = "1"tokio = { version = "1.19", features = ["full"] }serde = { version = "1", features = ["derive"] }toml = "0.5"tracing = "0.1"tracing-subscriber = "0.3"bytes = "1"prost = "0.11"
[build-dependencies]prost-build = "0.11"
     Protobuf在项目根目录下新建cmd.proto,加入如下代码:
 1syntax = "proto3";
 2
 3package cmd;
 4
 5// 命令请求
 6message CmdRequest {
 7    oneof req_data {
 8        Get get = 1;
 9        Set set = 2;
10        Publish publish = 3;
11        Subscribe subscribe = 4;
12        Unsubscribe unsubscribe = 5;
13    }
14}
15
16// 服务器的响应
17message CmdResponse {
18    uint32 status = 1;
19    string message = 2;
20    bytes value = 3;
21}
22
23// 请求值命令
24message Get {
25    string key = 1;
26}
27
28// 存储值命令
29message Set {
30    string key = 1;
31    bytes value = 2;
32    uint32 expire = 3;
33}
34
35// 向Topic发布值命令
36message Publish {
37    string topic = 1;
38    bytes value = 2;
39}
40
41// 订阅Topic命令
42message Subscribe {
43    string topic = 1;
44}
45
46// 取消订阅命令
47message Unsubscribe {
48    string topic = 1;
49    uint32 id = 2;
50}
 在src目录下创建pb目录,在根目录下创建build.rs文件,加入如下代码:
1fn main() {
2    let mut conf = prost_build::new();
3    conf.bytes(&["."]);
4    conf.type_attribute(".""#[derive(PartialOrd)]");
5    conf.out_dir("src/pb")
6        .compile_protos(&["cmd.proto"], &["."]) 
7        .unwrap();
8}
 在src/pb目录下已经自动生成了cmd.rs文件。在src/pb目录下创建mod.rs文件,加入如下代码:
 1use bytes::Bytes;
 2
 3use crate::{cmd_request::ReqData, CmdRequest, Get, Publish, Set, Subscribe, Unsubscribe};
 4
 5pub mod cmd;
 6
 7impl CmdRequest {
 8    // GET命令
 9    pub fn get(key: impl Into) -> Self {
10        Self {
11            req_data: Some(ReqData::Get(Get { key: key.into() })),
12        }
13    }
14
15    // SET命令
16    pub fn set(key: impl Into, value: Bytes, expire: u32) -> Self {
17        Self {
18            req_data: Some(ReqData::Set(Set {
19                key: key.into(),
20                value,
21                expire,
22            })),
23        }
24    }
25
26    // PUBLISH命令
27    pub fn publish(topic: impl Into, value: Bytes) -> Self {
28        Self {
29            req_data: Some(ReqData::Publish(Publish {
30                topic: topic.into(),
31                value,
32            })),
33        }
34    }
35
36    // 订阅命令
37    pub fn subscribe(topic: impl Into) -> Self {
38        Self {
39            req_data: Some(ReqData::Subscribe(Subscribe {
40                topic: topic.into(),
41            })),
42        }
43    }
44
45    // 解除订阅命令
46    pub fn unsubscribe(topic: impl Into, id: u32) -> Self {
47        Self {
48            req_data: Some(ReqData::Unsubscribe(Unsubscribe {
49                topic: topic.into(),
50                id,
51            })),
52        }
53    }
54}
55
56impl CmdResponse {
57    pub fn new(status: u32, message: String, value: Bytes) -> Self {
58        Self { 
59            status, 
60            message, 
61            value,
62        }
63    }
64}
在 src/lib.rs 中,引入pb模块:
1mod pb;
2pub use pb::*;
   客户端 & 服务器我们使用tokio-util库的Frame里的LengthDelimitedCodec(根据长度进行编解码)对protobuf协议进行封包解包。在Cargo.toml里加入tokio-util依赖:
  •  
  •  
  •  
  •  
  •  
[dependencies]......futures = "0.3"tokio-util = {version = "0.7", features = ["codec"]}......
   修改src/bin/kv_server.rs代码:
 1#[tokio::main]
 2async fn main() -> Result<(), Box{
 3    tracing_subscriber::init();
 4
 5    ......
 6
 7    loop {
 8        ......
 9
10        tokio::spawn(async move {
11            // 使用Frame的LengthDelimitedCodec进行编解码操作
12            let mut stream = Framed::new(stream, LengthDelimitedCodec::new());
13            while let Some(Ok(mut buf)) = stream.next().await {
14                // 对客户端发来的protobuf请求命令进行拆包
15                let cmd_req = CmdRequest::decode(&buf[..]).unwrap();
16                info!("Receive a command: {:?}", cmd_req);
17
18                buf.clear();
19
20                // 对protobuf的请求响应进行封包,然后发送给客户端。
21                let cmd_res = CmdResponse::new(200"success".to_string(), Bytes::default());
22                cmd_res.encode(&mut buf).unwrap();
23                stream.send(buf.freeze()).await.unwrap();
24            }
25            info!("Client {:?} disconnected", addr);
26        });
27    }
28}
 修改src/bin/kv_client.rs代码:
 1#[tokio::main]
 2async fn main() -> Result<(), Box{
 3    tracing_subscriber::init();
 4
 5    ......
 6
 7    // 使用Frame的LengthDelimitedCodec进行编解码操作
 8    let mut stream = Framed::new(stream, LengthDelimitedCodec::new());
 9    let mut buf = BytesMut::new();
10
11    // 创建GET命令
12    let cmd_get = CmdRequest::get("mykey");
13    cmd_get.encode(&mut buf).unwrap();
14
15    // 发送GET命令
16    stream.send(buf.freeze()).await.unwrap();
17    info!("Send info successed!");
18
19    // 接收服务器返回的响应
20    while let Some(Ok(buf)= stream.next().await {
21        let cmd_res = CmdResponse::decode(&buf[..]).unwrap();
22        info!("Receive a response: {:?}", cmd_res);
23    }
24
25    Ok(())
26}
   我们打开二个终端,分别输入以下命令:
  •  
  •  
RUST_LOG=info cargo run --bin kv_serverRUST_LOG=info cargo run --bin kv_client
   服务器执行结果:
  •  
  •  
  •  
INFO kv_server: Listening on 127.0.0.1:19999 ......INFO kv_server: Client: 127.0.0.1:50655 connectedINFO kv_server: Receive a command: CmdRequest { req_data: Some(Get(Get { key: "mykey" })) }
   客户端执行结果:
  •  
  •  
INFO kv_client: Send info successedINFO kv_client: Receive a response: CmdResponse { status: 200, message: "success", value: b"" }
 

 

服务器和客户端都正常处理了收到的请求和响应。  下一篇文章我们将在服务器端使用内存来存储客户端发送过来的数据。  完整代码:https://github.com/Justin02180218/kv_server_rust

 


  审核编辑:汤梓红


打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分