diff --git a/Dockerfile b/Dockerfile index 85ab3ee006c67afb7a60a43676d2eb6388217847..4e09b425ffcf235d1036df2148d4f5379390f0ff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,6 +4,6 @@ RUN apt-get update && apt-get install -y python3.10-venv ENV VIRTUAL_ENV=/opt/dhruva-mt RUN python3 -m venv $VIRTUAL_ENV ENV PATH="$VIRTUAL_ENV/bin:$PATH" -RUN pip install -U ctranslate2 OpenNMT-py==1.2.0 git+https://github.com/vmujadia/tokenizer.git +RUN pip install -U ctranslate2 OpenNMT-py==1.2.0 git+https://github.com/vmujadia/tokenizer.git tenacity CMD ["tritonserver", "--model-repository=/models", "--cache-config=local,size=1048576"] EXPOSE 8000 diff --git a/triton_models/ssmt_template_model_repo/1/model.py b/triton_models/ssmt_template_model_repo/1/model.py index 7c636bb41ec0f825f12ffe035b389c5fba21a1a3..38ef5b7e211fba064bbec332a47fb40501e946b9 100644 --- a/triton_models/ssmt_template_model_repo/1/model.py +++ b/triton_models/ssmt_template_model_repo/1/model.py @@ -1,26 +1,120 @@ import os import json import numpy +from time import time from itertools import islice +from threading import Lock, Timer from ctranslate2 import Translator import triton_python_backend_utils as pb_utils +from tenacity import retry, wait_random_exponential + + +class DynamicModel(object): + def __init__( + self, path, device, device_index=None, timeout=5, timer_min_delta=0.01 + ): + self.model, self.model_path, self.model_device, self.model_device_index = ( + None, + path, + device, + device_index, + ) + self.model_lock, self.timer_lock = Lock(), Lock() + self.timeout, self.timer_min_delta = timeout, timer_min_delta + self.initialize() + + @retry(wait=wait_random_exponential(multiplier=0.5, max=10, exp_base=1.2)) + def initialize(self): + self.model = Translator( + self.model_path, + device=self.model_device, + intra_threads=1, + inter_threads=1, + device_index=self.model_device_index, + ) + self.timer = Timer(1, self.unload) + self.timer.start_time = time() + self.timer.start() + + def restart_timer(self): + with self.timer_lock: + if time() - self.timer.start_time >= self.timer_min_delta: + self.timer.cancel() + self.timer = Timer(self.timeout, self.unload) + self.timer.start_time = time() + self.timer.start() + + @retry(wait=wait_random_exponential(multiplier=0.5, max=20, exp_base=1.2)) + def load(self, reset_timer=True): + with self.timer_lock: + self.timer.cancel() + with self.model_lock: + self.model.load_model() + if reset_timer: + self.restart_timer() + + def unload(self): + with self.model_lock: + self.model.unload_model() + + @retry(wait=wait_random_exponential(multiplier=0.5, max=20, exp_base=1.2)) + def translate(self, *args, **kwargs): + if not self.model.model_is_loaded: + self.load(reset_timer=False) + results = list(self.model.translate_iterable(*args, **kwargs)) + self.restart_timer() + return results + class TritonPythonModel: def initialize(self, args): current_path = os.path.dirname(os.path.abspath(__file__)) self.model_config = json.loads(args["model_config"]) - self.device_id = int(json.loads(args['model_instance_device_id'])) - target_config = pb_utils.get_output_config_by_name(self.model_config, "OUTPUT_TEXT") + self.device_id = int(json.loads(args["model_instance_device_id"])) + target_config = pb_utils.get_output_config_by_name( + self.model_config, "OUTPUT_TEXT" + ) self.target_dtype = pb_utils.triton_string_to_numpy(target_config["data_type"]) - try: self.translator = Translator(f"{os.path.join(current_path, 'translator')}", device="cuda", intra_threads=1, inter_threads=1, device_index=[self.device_id]) - except: self.translator = Translator(f"{os.path.join(current_path, 'translator')}", device="cpu", intra_threads=4) + self.translator = DynamicModel( + f"{os.path.join(current_path, 'translator')}", + device="cuda", + device_index=[self.device_id], + ) def execute(self, requests): - source_list = [pb_utils.get_input_tensor_by_name(request, "INPUT_TEXT_TOKENIZED") for request in requests] + source_list = [ + pb_utils.get_input_tensor_by_name(request, "INPUT_TEXT_TOKENIZED") + for request in requests + ] bsize_list = [source.as_numpy().shape[0] for source in source_list] - src_sentences = [s[0].decode('utf-8').strip().split(' ') for source in source_list for s in source.as_numpy()] - tgt_sentences = [' '.join(result.hypotheses[0]).replace('@@ ', '') for result in self.translator.translate_iterable(src_sentences, max_batch_size=128, max_input_length=100, max_decoding_length=100)] - responses = [pb_utils.InferenceResponse(output_tensors=[pb_utils.Tensor("OUTPUT_TEXT", numpy.array([[s]for s in islice(tgt_sentences, bsize)], dtype='object').astype(self.target_dtype))]) for bsize in bsize_list] + src_sentences = [ + s[0].decode("utf-8").strip().split(" ") + for source in source_list + for s in source.as_numpy() + ] + tgt_sentences = [ + " ".join(result.hypotheses[0]).replace("@@ ", "") + for result in self.translator.translate( + src_sentences, + max_batch_size=128, + max_input_length=100, + max_decoding_length=100, + ) + ] + responses = [ + pb_utils.InferenceResponse( + output_tensors=[ + pb_utils.Tensor( + "OUTPUT_TEXT", + numpy.array( + [[s] for s in islice(tgt_sentences, bsize)], dtype="object" + ).astype(self.target_dtype), + ) + ] + ) + for bsize in bsize_list + ] return responses - def finalize(self): self.translator.unload_model() + def finalize(self): + self.translator.unload()