Commit bfd804f7 authored by Nikhilesh Bhatnagar's avatar Nikhilesh Bhatnagar

dynamic model loading prototype

parent 734aca61
...@@ -4,6 +4,6 @@ RUN apt-get update && apt-get install -y python3.10-venv ...@@ -4,6 +4,6 @@ RUN apt-get update && apt-get install -y python3.10-venv
ENV VIRTUAL_ENV=/opt/dhruva-mt ENV VIRTUAL_ENV=/opt/dhruva-mt
RUN python3 -m venv $VIRTUAL_ENV RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH" ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN pip install -U ctranslate2 OpenNMT-py==1.2.0 git+https://github.com/vmujadia/tokenizer.git RUN pip install -U ctranslate2 OpenNMT-py==1.2.0 git+https://github.com/vmujadia/tokenizer.git tenacity
CMD ["tritonserver", "--model-repository=/models", "--cache-config=local,size=1048576"] CMD ["tritonserver", "--model-repository=/models", "--cache-config=local,size=1048576"]
EXPOSE 8000 EXPOSE 8000
import os import os
import json import json
import numpy import numpy
from time import time
from itertools import islice from itertools import islice
from threading import Lock, Timer
from ctranslate2 import Translator from ctranslate2 import Translator
import triton_python_backend_utils as pb_utils import triton_python_backend_utils as pb_utils
from tenacity import retry, wait_random_exponential
class DynamicModel(object):
def __init__(
self, path, device, device_index=None, timeout=5, timer_min_delta=0.01
):
self.model, self.model_path, self.model_device, self.model_device_index = (
None,
path,
device,
device_index,
)
self.model_lock, self.timer_lock = Lock(), Lock()
self.timeout, self.timer_min_delta = timeout, timer_min_delta
self.initialize()
@retry(wait=wait_random_exponential(multiplier=0.5, max=10, exp_base=1.2))
def initialize(self):
self.model = Translator(
self.model_path,
device=self.model_device,
intra_threads=1,
inter_threads=1,
device_index=self.model_device_index,
)
self.timer = Timer(1, self.unload)
self.timer.start_time = time()
self.timer.start()
def restart_timer(self):
with self.timer_lock:
if time() - self.timer.start_time >= self.timer_min_delta:
self.timer.cancel()
self.timer = Timer(self.timeout, self.unload)
self.timer.start_time = time()
self.timer.start()
@retry(wait=wait_random_exponential(multiplier=0.5, max=20, exp_base=1.2))
def load(self, reset_timer=True):
with self.timer_lock:
self.timer.cancel()
with self.model_lock:
self.model.load_model()
if reset_timer:
self.restart_timer()
def unload(self):
with self.model_lock:
self.model.unload_model()
@retry(wait=wait_random_exponential(multiplier=0.5, max=20, exp_base=1.2))
def translate(self, *args, **kwargs):
if not self.model.model_is_loaded:
self.load(reset_timer=False)
results = list(self.model.translate_iterable(*args, **kwargs))
self.restart_timer()
return results
class TritonPythonModel: class TritonPythonModel:
def initialize(self, args): def initialize(self, args):
current_path = os.path.dirname(os.path.abspath(__file__)) current_path = os.path.dirname(os.path.abspath(__file__))
self.model_config = json.loads(args["model_config"]) self.model_config = json.loads(args["model_config"])
self.device_id = int(json.loads(args['model_instance_device_id'])) self.device_id = int(json.loads(args["model_instance_device_id"]))
target_config = pb_utils.get_output_config_by_name(self.model_config, "OUTPUT_TEXT") target_config = pb_utils.get_output_config_by_name(
self.model_config, "OUTPUT_TEXT"
)
self.target_dtype = pb_utils.triton_string_to_numpy(target_config["data_type"]) self.target_dtype = pb_utils.triton_string_to_numpy(target_config["data_type"])
try: self.translator = Translator(f"{os.path.join(current_path, 'translator')}", device="cuda", intra_threads=1, inter_threads=1, device_index=[self.device_id]) self.translator = DynamicModel(
except: self.translator = Translator(f"{os.path.join(current_path, 'translator')}", device="cpu", intra_threads=4) f"{os.path.join(current_path, 'translator')}",
device="cuda",
device_index=[self.device_id],
)
def execute(self, requests): def execute(self, requests):
source_list = [pb_utils.get_input_tensor_by_name(request, "INPUT_TEXT_TOKENIZED") for request in requests] source_list = [
pb_utils.get_input_tensor_by_name(request, "INPUT_TEXT_TOKENIZED")
for request in requests
]
bsize_list = [source.as_numpy().shape[0] for source in source_list] bsize_list = [source.as_numpy().shape[0] for source in source_list]
src_sentences = [s[0].decode('utf-8').strip().split(' ') for source in source_list for s in source.as_numpy()] src_sentences = [
tgt_sentences = [' '.join(result.hypotheses[0]).replace('@@ ', '') for result in self.translator.translate_iterable(src_sentences, max_batch_size=128, max_input_length=100, max_decoding_length=100)] s[0].decode("utf-8").strip().split(" ")
responses = [pb_utils.InferenceResponse(output_tensors=[pb_utils.Tensor("OUTPUT_TEXT", numpy.array([[s]for s in islice(tgt_sentences, bsize)], dtype='object').astype(self.target_dtype))]) for bsize in bsize_list] for source in source_list
for s in source.as_numpy()
]
tgt_sentences = [
" ".join(result.hypotheses[0]).replace("@@ ", "")
for result in self.translator.translate(
src_sentences,
max_batch_size=128,
max_input_length=100,
max_decoding_length=100,
)
]
responses = [
pb_utils.InferenceResponse(
output_tensors=[
pb_utils.Tensor(
"OUTPUT_TEXT",
numpy.array(
[[s] for s in islice(tgt_sentences, bsize)], dtype="object"
).astype(self.target_dtype),
)
]
)
for bsize in bsize_list
]
return responses return responses
def finalize(self): self.translator.unload_model() def finalize(self):
self.translator.unload()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment