创建新信息#
这是一个如何用Flower在服务器和客户端之间创建新类型的信息的简要指导。
假设我们在脚本code:`server.py`和code:`numpy_client.py`中有以下的示例函数...
在服务器端:
def example_request(self, client: ClientProxy) -> Tuple[str, int]:
question = "Could you find the sum of the list, Bob?"
l = [1, 2, 3]
return client.request(question, l)
在客户端:
def example_response(self, question: str, l: List[int]) -> Tuple[str, int]:
response = "Here you go Alice!"
answer = sum(question)
return response, answer
现在让我们来看看,为了让服务器和客户端之间的这个简单的函数正常工作,我们需要实现哪些功能!
协议缓冲区的信息类型#
The first thing we need to do is to define a message type for the RPC system in transport.proto
.
Note that we have to do it for both the request and response messages. For more details on the syntax of proto3, please see the official documentation.
在 ServerMessage
代码块中:
message ExampleIns{
string question=1;
repeated int64 l=2;
}
oneof msg {
ReconnectIns reconnect_ins = 1;
GetPropertiesIns get_properties_ins = 2;
GetParametersIns get_parameters_ins = 3;
FitIns fit_ins = 4;
EvaluateIns evaluate_ins = 5;
ExampleIns example_ins = 6;
}
在 ClientMessage 代码块中:
message ExampleRes{
string response = 1;
int64 answer = 2;
}
oneof msg {
DisconnectRes disconnect_res = 1;
GetPropertiesRes get_properties_res = 2;
GetParametersRes get_parameters_res = 3;
FitRes fit_res = 4;
EvaluateRes evaluate_res = 5;
ExampleRes examples_res = 6;
}
确保在 oneof msg
中也添加一个新创建的消息类型字段。
完成后,我们将使用:
$ python -m flwr_tool.protoc
如果编译成功,你应该会看到以下信息:
Writing mypy to flwr/proto/transport_pb2.pyi
Writing mypy to flwr/proto/transport_pb2_grpc.pyi
序列化和反序列化函数#
下一步是添加函数,以便将 Python 数据类型序列化和反序列化为我们定义的 RPC 消息类型或从我们定义的 RPC 消息类型反序列化和反序列化 Python 数据类型。您应该在 serde.py
中添加这些函数。
四种函数:
def example_msg_to_proto(question: str, l: List[int]) -> ServerMessage.ExampleIns:
return ServerMessage.ExampleIns(question=question, l=l)
def example_msg_from_proto(msg: ServerMessage.ExampleIns) -> Tuple[str, List[int]]:
return msg.question, msg.l
def example_res_to_proto(response: str, answer: int) -> ClientMessage.ExampleRes:
return ClientMessage.ExampleRes(response=response, answer=answer)
def example_res_from_proto(res: ClientMessage.ExampleRes) -> Tuple[str, int]:
return res.response, res.answer
从服务器发送信息#
现在,在客户端代理类(例如 grpc_client_proxy.py
)中使用刚才创建的 serde 函数编写请求函数:
def request(self, question: str, l: List[int]) -> Tuple[str, int]:
request_msg = serde.example_msg_to_proto(question, l)
client_msg: ClientMessage = self.bridge.request(
ServerMessage(example_ins=request_msg)
)
response, answer = serde.example_res_from_proto(client_msg.examples_res)
return response, answer
由客户端接收信息#
最后一步 修改 message_handler.py
中的代码,检查信息的字段并调用 example_response
函数。记住使用 serde 函数!
在句柄函数内:
if server_msg.HasField("example_ins"):
return _example_response(client, server_msg.example_ins), 0, True
并增加一个新函数:
def _example_response(client: Client, msg: ServerMessage.ExampleIns) -> ClientMessage:
question,l = serde.evaluate_ins_from_proto(msg)
response, answer = client.example_response(question,l)
example_res = serde.example_res_to_proto(response,answer)
return ClientMessage(examples_res=example_res)
希望您在运行程序时能得到预期的结果!
('Here you go Alice!', 6)