import time import random from uuid import UUID from celery import shared_task from celery.contrib.abortable import AbortableTask from deployment.channels import Event from deployment.models import Deployment, Type, Status class DeploymentTask(AbortableTask): def try_exec(self, stop: int): if stop == 0: stop = 1 for i in range(1, stop): time.sleep(5 * stop - 2 * i) if random.randint(0, 10) == 10: raise Exception("unable to perform the deployment") yield i def run_slim(self): progress = 95 self.update_state(meta={"progress": progress}) time.sleep(5) status = Status.SUCCESS.name progress = 100 self.update_state(meta={"progress": progress}) self.deploy.status = status Event.send_details(self.deploy, progress) def run_medium(self): progress = 62 self.update_state(meta={"progress": progress}) time.sleep(10) if random.randint(0, 10) % 2 != 0: status = Status.FAILED.name self.deploy.error = "arg.. no chance" else: status = Status.SUCCESS.name progress = 100 self.update_state(meta={"progress": progress}) self.deploy.status = status Event.send_details(self.deploy, progress) def run_large(self): progress = 0 try: for i in self.try_exec(6): progress = i * 20 self.update_state(meta={"progress": progress}) if i != 5: Event.send_details(self.deploy, progress) except Exception as e: self.deploy.status = Status.FAILED.name self.deploy.error = e else: self.deploy.status = Status.SUCCESS.name Event.send_details(self.deploy, progress) @property def progress(self): return self.request.get("meta", {}).get("progress", 0) def launch(self, deployment_id: UUID): progress = 0 self.update_state(meta={"progress": progress}) self.deploy = Deployment.objects.get(id=deployment_id) self.deploy.task_id = self.request.id self.deploy.status = Status.RUNNING.name self.deploy.save() Event.send(self.deploy) Event.send_details(self.deploy, 0) match self.deploy.type: case Type.SLIM.name: self.run_slim() case Type.MEDIUM.name: self.run_medium() case Type.LARGE.name: self.run_large() self.deploy.task_id = None self.deploy.save() Event.send(self.deploy) @shared_task(base=DeploymentTask, bind=True, ignore_result=True) def deploy(self, deployment_id: UUID): self.launch(deployment_id)