commit 3940a34efe6b7baaf72112fe32db610747845e87
parent c77caf74e5c27493749d1408d9b9aa47e742bb96
Author: Stefan Koch <programming@stefan-koch.name>
Date: Thu, 7 Jan 2021 18:08:52 +0100
send multiple response messages on VM creation
Diffstat:
1 file changed, 51 insertions(+), 30 deletions(-)
diff --git a/aetherscale/computing.py b/aetherscale/computing.py
@@ -11,7 +11,7 @@ import subprocess
import sys
import tempfile
import time
-from typing import List, Optional, Dict, Any, Callable, Tuple
+from typing import List, Optional, Dict, Any, Callable, Tuple, Iterator
from . import networking
from .qemu import image, runtime
@@ -103,7 +103,7 @@ class ComputingHandler:
self.established_vpns: Dict[str, TincVirtualNetwork] = {}
- def list_vms(self, _: Dict[str, Any]) -> List[Dict[str, Any]]:
+ def list_vms(self, _: Dict[str, Any]) -> Iterator[List[Dict[str, Any]]]:
vms = []
for proc in psutil.process_iter(['pid', 'name']):
@@ -128,13 +128,18 @@ class ComputingHandler:
vms.append(msg)
- return vms
+ yield vms
- def create_vm(self, options: Dict[str, Any]) -> Dict[str, str]:
+ def create_vm(self, options: Dict[str, Any]) -> Iterator[Dict[str, str]]:
vm_id = ''.join(
random.choice(string.ascii_lowercase) for _ in range(8))
logging.info(f'Starting VM "{vm_id}"')
+ yield {
+ 'status': 'allocating',
+ 'vm-id': vm_id,
+ }
+
try:
image_name = os.path.basename(options['image'])
except KeyError:
@@ -200,12 +205,12 @@ class ComputingHandler:
self.service_manager.start_service(unit_name)
logging.info(f'Started VM "{vm_id}"')
- return {
+ yield {
'status': 'starting',
'vm-id': vm_id,
}
- def start_vm(self, options: Dict[str, Any]) -> Dict[str, str]:
+ def start_vm(self, options: Dict[str, Any]) -> Iterator[Dict[str, str]]:
try:
vm_id = options['vm-id']
except KeyError:
@@ -230,9 +235,9 @@ class ComputingHandler:
'vm-id': vm_id,
}
- return response
+ yield response
- def stop_vm(self, options: Dict[str, Any]) -> Dict[str, str]:
+ def stop_vm(self, options: Dict[str, Any]) -> Iterator[Dict[str, str]]:
try:
vm_id = options['vm-id']
except KeyError:
@@ -267,9 +272,9 @@ class ComputingHandler:
'vm-id': vm_id,
}
- return response
+ yield response
- def delete_vm(self, options: Dict[str, Any]) -> Dict[str, str]:
+ def delete_vm(self, options: Dict[str, Any]) -> Iterator[Dict[str, str]]:
# TODO: Once all VMs of a VPN on a host have been deleted, we can delete
# the associated VPN
@@ -288,7 +293,7 @@ class ComputingHandler:
self.service_manager.uninstall_service(unit_name)
user_image.unlink()
- return {
+ yield {
'status': 'deleted',
'vm-id': vm_id,
}
@@ -415,8 +420,25 @@ def systemd_unit_name_for_vm(vm_id: str) -> str:
return f'aetherscale-vm-{vm_id}.service'
+def noop_responder(_: Dict[str, Any]):
+ pass
+
+
+def create_rabbitmq_responder(ch, reply_to: str, correlation_id: str):
+ def rabbitmq_responder(message: Dict[str, Any]):
+ ch.basic_publish(
+ exchange='',
+ routing_key=reply_to,
+ properties=pika.BasicProperties(
+ correlation_id=correlation_id
+ ),
+ body=json.dumps(message))
+
+ return rabbitmq_responder
+
+
def callback(ch, method, properties, body, handler: ComputingHandler):
- command_fn: Dict[str, Callable[[Dict[str, Any]], Dict[str, Any]]] = {
+ command_fn: Dict[str, Callable[[Dict[str, Any]], Iterator[Any]]] = {
'list-vms': handler.list_vms,
'create-vm': handler.create_vm,
'start-vm': handler.start_vm,
@@ -441,17 +463,24 @@ def callback(ch, method, properties, body, handler: ComputingHandler):
logging.error(f'Invalid command "{command}" specified')
return
+ if properties.reply_to:
+ responder = create_rabbitmq_responder(
+ ch, properties.reply_to, properties.correlation_id)
+ else:
+ responder = noop_responder
+
options = data.get('options', {})
try:
- response = fn(options)
- # if a function wants to return a response
- # set its execution status to success
- resp_message = {
- 'execution-info': {
- 'status': 'success'
- },
- 'response': response,
- }
+ for response in fn(options):
+ # if a function wants to return a response
+ # set its execution status to success
+ resp_message = {
+ 'execution-info': {
+ 'status': 'success'
+ },
+ 'response': response,
+ }
+ responder(resp_message)
except Exception as e:
logging.exception('Unhandled exception')
resp_message = {
@@ -461,18 +490,10 @@ def callback(ch, method, properties, body, handler: ComputingHandler):
'reason': str(e),
}
}
+ responder(resp_message)
ch.basic_ack(delivery_tag=method.delivery_tag)
- if properties.reply_to:
- ch.basic_publish(
- exchange='',
- routing_key=properties.reply_to,
- properties=pika.BasicProperties(
- correlation_id=properties.correlation_id
- ),
- body=json.dumps(resp_message))
-
def run():
connection = pika.BlockingConnection(