from uuid import uuid4 from celery.result import AsyncResult from celery.contrib.abortable import AbortableAsyncResult from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist from django.core.paginator import Paginator from django.http import ( HttpResponseRedirect, HttpResponseServerError, HttpResponseBadRequest, HttpResponseForbidden, ) from django.shortcuts import render, get_object_or_404 from deployment.channels import Event from deployment.forms import DeploymentForm from deployment.models import Deployment, Status from deployment.tasks import deploy as launch_deploy def check_user_credits(user: User) -> bool: if not user.is_superuser: try: dep_user = user.deploymentuser except (ObjectDoesNotExist, AttributeError): return False else: if dep_user.credits < 0: return True deployments = Deployment.objects.filter(user=user) return dep_user.credits > len(deployments) return True def index(request): deployments = Deployment.objects.filter(user=request.user.id).order_by( "-created_at" ) paginator = Paginator(deployments, 10) page_number = request.GET.get("page") page_obj = paginator.get_page(page_number) return render( request, "deployment/board.html", { "page_obj": page_obj, "range_pages": (i + 1 for i in range(page_obj.paginator.num_pages)), "url": f"events/{request.user.id}/", "can_create": check_user_credits(request.user), }, ) def deploy(request, deployment_id): deployment = get_object_or_404(Deployment, id=deployment_id) if request.method == "POST": if deployment.status not in (Status.READY.name, Status.FAILED.name): return HttpResponseBadRequest("deployment is undeployable") # override previous errors if deployment.error: deployment.error = None deployment.status = Status.PENDING.name deployment.save() launch_deploy.delay(deployment_id) if page := request.GET.get("page", ""): return HttpResponseRedirect(f"/deployment?page={page}") return HttpResponseRedirect("/deployment") def abort(request, deployment_id): deployment = get_object_or_404(Deployment, id=deployment_id) if request.method == "POST": if deployment.status not in (Status.RUNNING.name, Status.PENDING.name): return HttpResponseBadRequest("deployment is unabortable") res = AbortableAsyncResult(str(deployment.task_id)) progress = res.info.get("progress", 0) res.abort() res.revoke(terminate=True) deployment.status = Status.FAILED.name deployment.error = f"aborted by {request.user}" deployment.task_id = None deployment.save() Event.send_details(deployment, progress) Event.send(deployment) return HttpResponseRedirect(f"/deployment/{deployment.id}") def details(request, deployment_id): deployment = get_object_or_404(Deployment, id=deployment_id) if deployment.status == Status.RUNNING.name: # retrieve the progression task in Redis res = AsyncResult(str(deployment.task_id)) deployment.progress = res.info.get("progress", 0) if request.method == "POST": if deployment.status == Status.RUNNING.name: return HttpResponseBadRequest("deployment is running") if deployment.status == Status.PENDING.name: return HttpResponseBadRequest("deployment is pending") try: deployment.delete() except Exception as e: return HttpResponseServerError(e) return HttpResponseRedirect("/deployment") return render( request, "deployment/details.html", {"deployment": deployment, "url": f"events/{request.user.id}/{deployment.id}/"}, ) def create(request): if request.method == "POST": if not check_user_credits(request.user): return HttpResponseForbidden( "unable to launch deployment, contact administrator for support" ) form = DeploymentForm(request.POST) if not form.is_valid(): return HttpResponseBadRequest( f"deployment creation inputs are invalid: {form.errors}" ) try: Deployment.objects.create( user=request.user, id=uuid4(), **form.cleaned_data ) except Exception as e: return HttpResponseServerError(e) return HttpResponseRedirect("/deployment")