电子说
Protobuf在项目根目录下新建cmd.proto,加入如下代码:[dependencies]
anyhow = "1"
tokio = { version = "1.19", features = ["full"] }
serde = { version = "1", features = ["derive"] }
toml = "0.5"
tracing = "0.1"
tracing-subscriber = "0.3"
bytes = "1"
prost = "0.11"
[build-dependencies]
prost-build = "0.11"
1syntax = "proto3";
2
3package cmd;
4
5// 命令请求
6message CmdRequest {
7 oneof req_data {
8 Get get = 1;
9 Set set = 2;
10 Publish publish = 3;
11 Subscribe subscribe = 4;
12 Unsubscribe unsubscribe = 5;
13 }
14}
15
16// 服务器的响应
17message CmdResponse {
18 uint32 status = 1;
19 string message = 2;
20 bytes value = 3;
21}
22
23// 请求值命令
24message Get {
25 string key = 1;
26}
27
28// 存储值命令
29message Set {
30 string key = 1;
31 bytes value = 2;
32 uint32 expire = 3;
33}
34
35// 向Topic发布值命令
36message Publish {
37 string topic = 1;
38 bytes value = 2;
39}
40
41// 订阅Topic命令
42message Subscribe {
43 string topic = 1;
44}
45
46// 取消订阅命令
47message Unsubscribe {
48 string topic = 1;
49 uint32 id = 2;
50}
在src目录下创建pb目录,在根目录下创建build.rs文件,加入如下代码:
1fn main() {
2 let mut conf = prost_build::new();
3 conf.bytes(&["."]);
4 conf.type_attribute(".", "#[derive(PartialOrd)]");
5 conf.out_dir("src/pb")
6 .compile_protos(&["cmd.proto"], &["."])
7 .unwrap();
8}
在src/pb目录下已经自动生成了cmd.rs文件。在src/pb目录下创建mod.rs文件,加入如下代码:
1use bytes::Bytes;
2
3use crate::{cmd_request::ReqData, CmdRequest, Get, Publish, Set, Subscribe, Unsubscribe};
4
5pub mod cmd;
6
7impl CmdRequest {
8 // GET命令
9 pub fn get(key: impl Into) -> Self {
10 Self {
11 req_data: Some(ReqData::Get(Get { key: key.into() })),
12 }
13 }
14
15 // SET命令
16 pub fn set(key: impl Into, value: Bytes, expire: u32) -> Self {
17 Self {
18 req_data: Some(ReqData::Set(Set {
19 key: key.into(),
20 value,
21 expire,
22 })),
23 }
24 }
25
26 // PUBLISH命令
27 pub fn publish(topic: impl Into, value: Bytes) -> Self {
28 Self {
29 req_data: Some(ReqData::Publish(Publish {
30 topic: topic.into(),
31 value,
32 })),
33 }
34 }
35
36 // 订阅命令
37 pub fn subscribe(topic: impl Into) -> Self {
38 Self {
39 req_data: Some(ReqData::Subscribe(Subscribe {
40 topic: topic.into(),
41 })),
42 }
43 }
44
45 // 解除订阅命令
46 pub fn unsubscribe(topic: impl Into, id: u32) -> Self {
47 Self {
48 req_data: Some(ReqData::Unsubscribe(Unsubscribe {
49 topic: topic.into(),
50 id,
51 })),
52 }
53 }
54}
55
56impl CmdResponse {
57 pub fn new(status: u32, message: String, value: Bytes) -> Self {
58 Self {
59 status,
60 message,
61 value,
62 }
63 }
64}
在 src/lib.rs 中,引入pb模块:
1mod pb;
2pub use pb::*;
客户端 & 服务器我们使用tokio-util库的Frame里的LengthDelimitedCodec(根据长度进行编解码)对protobuf协议进行封包解包。在Cargo.toml里加入tokio-util依赖:修改src/bin/kv_server.rs代码:[dependencies]
......
futures = "0.3"
tokio-util = {version = "0.7", features = ["codec"]}
......
1#[tokio::main]
2async fn main() -> Result<(), Box> {
3 tracing_subscriber::init();
4
5 ......
6
7 loop {
8 ......
9
10 tokio::spawn(async move {
11 // 使用Frame的LengthDelimitedCodec进行编解码操作
12 let mut stream = Framed::new(stream, LengthDelimitedCodec::new());
13 while let Some(Ok(mut buf)) = stream.next().await {
14 // 对客户端发来的protobuf请求命令进行拆包
15 let cmd_req = CmdRequest::decode(&buf[..]).unwrap();
16 info!("Receive a command: {:?}", cmd_req);
17
18 buf.clear();
19
20 // 对protobuf的请求响应进行封包,然后发送给客户端。
21 let cmd_res = CmdResponse::new(200, "success".to_string(), Bytes::default());
22 cmd_res.encode(&mut buf).unwrap();
23 stream.send(buf.freeze()).await.unwrap();
24 }
25 info!("Client {:?} disconnected", addr);
26 });
27 }
28}
修改src/bin/kv_client.rs代码:
1#[tokio::main]
2async fn main() -> Result<(), Box> {
3 tracing_subscriber::init();
4
5 ......
6
7 // 使用Frame的LengthDelimitedCodec进行编解码操作
8 let mut stream = Framed::new(stream, LengthDelimitedCodec::new());
9 let mut buf = BytesMut::new();
10
11 // 创建GET命令
12 let cmd_get = CmdRequest::get("mykey");
13 cmd_get.encode(&mut buf).unwrap();
14
15 // 发送GET命令
16 stream.send(buf.freeze()).await.unwrap();
17 info!("Send info successed!");
18
19 // 接收服务器返回的响应
20 while let Some(Ok(buf)) = stream.next().await {
21 let cmd_res = CmdResponse::decode(&buf[..]).unwrap();
22 info!("Receive a response: {:?}", cmd_res);
23 }
24
25 Ok(())
26}
我们打开二个终端,分别输入以下命令:服务器执行结果:RUST_LOG=info cargo run --bin kv_server
RUST_LOG=info cargo run --bin kv_client
客户端执行结果:INFO kv_server: Listening on 127.0.0.1:19999 ......
INFO kv_server: Client: 127.0.0.1:50655 connected
INFO kv_server: Receive a command: CmdRequest { req_data: Some(Get(Get { key: "mykey" })) }
INFO kv_client: Send info successed!
INFO kv_client: Receive a response: CmdResponse { status: 200, message: "success", value: b"" }
服务器和客户端都正常处理了收到的请求和响应。 下一篇文章我们将在服务器端使用内存来存储客户端发送过来的数据。 完整代码:https://github.com/Justin02180218/kv_server_rust
审核编辑:汤梓红
全部0条评论
快来发表一下你的评论吧 !