class GrpcClient:
def __init__(self, stub):
self._stub_class = stub.__class__
self._stub = stub
def __iter__(self):
return self
def __getattr__(self, name):
func = self._stub_class.__getattribute__(self._stub, name)
def wrapper(*args, **kwargs):
request_meta = {
"request_type": "grpc",
"name": name,
"start_time": time.time(),
"response_length": 0,
"exception": None,
"context": None,
"response": None,
}
request_meta["responses"] = func(*args, **kwargs)
print("***********")
for response in request_meta["responses"]:
try:
start_perf_counter = time.perf_counter()
print(MessageToJson(response))
print(len(MessageToJson(response)))
request_meta["response_length"] = len(MessageToJson(response)) # каждый ответ
except grpc.RpcError as e:
request_meta["exception"] = e
request_meta["response_time"] = (
time.perf_counter() - start_perf_counter) * 1000
events.request.fire(**request_meta)
yield request_meta["response"]
return wrapper
class GrpcUser(User):
abstract = True
stub_class = None
def __init__(self, environment):
super().__init__(environment)
self._channel = grpc.insecure_channel(ADDRESS)
self._channel_closed = False
stub = self.stub_class(self._channel)
self.client = GrpcClient(stub)
def stop(self, force=False):
self._channel_closed = True
time.sleep(1)
self._channel.close()
super().stop(force=True)
class HelloGrpcUser(GrpcUser):
host = ADDRESS
stub_class = stt_pb2_grpc.SpeechToTextServiceStub
@task
def streamingRecognize(self):
if not self._channel_closed:
self.client.StreamingRecognize(generate_requests(
"test.wav"
))
time.sleep(1)