33 lines
		
	
	
		
			777 B
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			33 lines
		
	
	
		
			777 B
		
	
	
	
		
			Python
		
	
	
	
	
	
| import time
 | |
| import random
 | |
| from uuid import UUID
 | |
| 
 | |
| from celery import shared_task
 | |
| from django_eventstream import send_event
 | |
| 
 | |
| from deployment.models import Deployment, Type, Status
 | |
| 
 | |
| 
 | |
| @shared_task
 | |
| def deploy(deployment_id: UUID):
 | |
|     deploy = Deployment.objects.get(id=deployment_id)
 | |
| 
 | |
|     deploy.status = Status.RUNNING.name
 | |
|     deploy.save()
 | |
| 
 | |
|     match deploy.type:
 | |
|         case Type.SLIM.name:
 | |
|             time.sleep(10)
 | |
|         case Type.MEDIUM.name:
 | |
|             time.sleep(60)
 | |
|         case Type.LARGE.name:
 | |
|             time.sleep(120)
 | |
| 
 | |
|     deploy.status = (
 | |
|         Status.FAILED.name if random.randint(0, 10) % 2 != 0 else Status.SUCCESS.name
 | |
|     )
 | |
|     deploy.save()
 | |
| 
 | |
|     send_event("deployment", "message", {"id": deploy.id, "status": deploy.status})
 | |
|     print("event sent")
 |