import os import json import numpy from time import time from itertools import islice from threading import Lock, Timer from ctranslate2 import Translator import triton_python_backend_utils as pb_utils from tenacity import retry, wait_random_exponential class DynamicModel(object): def __init__( self, path, device, device_index=None, timeout=5, timer_min_delta=0.01 ): self.model, self.model_path, self.model_device, self.model_device_index = ( None, path, device, device_index, ) self.model_lock, self.timer_lock = Lock(), Lock() self.timeout, self.timer_min_delta = timeout, timer_min_delta self.initialize() @retry(wait=wait_random_exponential(multiplier=0.5, max=10, exp_base=1.2)) def initialize(self): self.model = Translator( self.model_path, device=self.model_device, intra_threads=1, inter_threads=1, device_index=self.model_device_index, ) self.timer = Timer(1, self.unload) self.timer.start_time = time() self.timer.start() def restart_timer(self): with self.timer_lock: if time() - self.timer.start_time >= self.timer_min_delta: self.timer.cancel() self.timer = Timer(self.timeout, self.unload) self.timer.start_time = time() self.timer.start() @retry(wait=wait_random_exponential(multiplier=0.5, max=20, exp_base=1.2)) def load(self, reset_timer=True): with self.timer_lock: self.timer.cancel() with self.model_lock: self.model.load_model() if reset_timer: self.restart_timer() def unload(self): with self.model_lock: self.model.unload_model() @retry(wait=wait_random_exponential(multiplier=0.5, max=20, exp_base=1.2)) def translate(self, *args, **kwargs): if not self.model.model_is_loaded: self.load(reset_timer=False) results = list(self.model.translate_iterable(*args, **kwargs)) self.restart_timer() return results class TritonPythonModel: def initialize(self, args): current_path = os.path.dirname(os.path.abspath(__file__)) self.model_config = json.loads(args["model_config"]) self.device_id = int(json.loads(args["model_instance_device_id"])) target_config = pb_utils.get_output_config_by_name( self.model_config, "OUTPUT_TEXT" ) self.target_dtype = pb_utils.triton_string_to_numpy(target_config["data_type"]) self.translator = DynamicModel( f"{os.path.join(current_path, 'translator')}", device="cuda", device_index=[self.device_id], ) def execute(self, requests): source_list = [ pb_utils.get_input_tensor_by_name(request, "INPUT_TEXT_TOKENIZED") for request in requests ] bsize_list = [source.as_numpy().shape[0] for source in source_list] src_sentences = [ s[0].decode("utf-8").strip().split(" ") for source in source_list for s in source.as_numpy() ] tgt_sentences = [ " ".join(result.hypotheses[0]).replace("@@ ", "") for result in self.translator.translate( src_sentences, max_batch_size=128, max_input_length=100, max_decoding_length=100, ) ] responses = [ pb_utils.InferenceResponse( output_tensors=[ pb_utils.Tensor( "OUTPUT_TEXT", numpy.array( [[s] for s in islice(tgt_sentences, bsize)], dtype="object" ).astype(self.target_dtype), ) ] ) for bsize in bsize_list ] return responses def finalize(self): self.translator.unload()