commit 9631daab5629640ac9eb5aebb1a6b33cf462675c
parent 6ac8212a30496653447ff3829bd3c927ecd48b96
Author: Stefan Koch <programming@stefan-koch.name>
Date: Sat, 6 Feb 2021 17:23:26 +0100
split computing handling and RabbitMQ interface
Diffstat:
3 files changed, 151 insertions(+), 138 deletions(-)
diff --git a/aetherscale/api/broker.py b/aetherscale/api/broker.py
@@ -0,0 +1,141 @@
+import logging
+import json
+from pathlib import Path
+import pika
+from typing import Any, Callable, Dict, Iterator
+
+from aetherscale import config
+from aetherscale import services
+from aetherscale.computing import ComputingHandler, RADVD_SERVICE_NAME
+import aetherscale.vpn.radvd
+
+EXCHANGE_NAME = 'computing'
+COMPETING_QUEUE = 'computing-competing'
+QUEUE_COMMANDS_MAP = {
+ '': ['list-vms', 'start-vm', 'stop-vm', 'delete-vm'],
+ COMPETING_QUEUE: ['create-vm'],
+}
+
+
+def noop_responder(_: Dict[str, Any]):
+ pass
+
+
+def create_rabbitmq_responder(ch, reply_to: str, correlation_id: str):
+ def rabbitmq_responder(message: Dict[str, Any]):
+ ch.basic_publish(
+ exchange='',
+ routing_key=reply_to,
+ properties=pika.BasicProperties(
+ correlation_id=correlation_id
+ ),
+ body=json.dumps(message))
+
+ return rabbitmq_responder
+
+
+def callback(ch, method, properties, body, handler: ComputingHandler):
+ command_fn: Dict[str, Callable[[Dict[str, Any]], Iterator[Any]]] = {
+ 'list-vms': handler.list_vms,
+ 'create-vm': handler.create_vm,
+ 'start-vm': handler.start_vm,
+ 'stop-vm': handler.stop_vm,
+ 'delete-vm': handler.delete_vm,
+ }
+
+ message = body.decode('utf-8')
+ logging.debug('Received message: ' + message)
+
+ data = json.loads(message)
+
+ try:
+ command = data['command']
+ except KeyError:
+ logging.error('No "command" specified in message')
+ return
+
+ try:
+ fn = command_fn[command]
+ except KeyError:
+ logging.error(f'Invalid command "{command}" specified')
+ return
+
+ if properties.reply_to:
+ responder = create_rabbitmq_responder(
+ ch, properties.reply_to, properties.correlation_id)
+ else:
+ responder = noop_responder
+
+ options = data.get('options', {})
+ try:
+ for response in fn(options):
+ # if a function wants to return a response
+ # set its execution status to success
+ resp_message = {
+ 'execution-info': {
+ 'status': 'success'
+ },
+ 'response': response,
+ }
+ responder(resp_message)
+ except Exception as e:
+ logging.exception('Unhandled exception')
+ resp_message = {
+ 'execution-info': {
+ 'status': 'error',
+ # TODO: Only ouput message if it is an exception generated
+ # by us
+ 'reason': str(e),
+ }
+ }
+ responder(resp_message)
+
+ ch.basic_ack(delivery_tag=method.delivery_tag)
+
+
+def run():
+ connection = pika.BlockingConnection(
+ pika.ConnectionParameters(host=config.RABBITMQ_HOST))
+ channel = connection.channel()
+
+ channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type='direct')
+
+ # let rabbitmq define a name for the exclusive queue
+ result = channel.queue_declare(queue='', exclusive=True)
+ exclusive_queue_name = result.method.queue
+ # setup one queue that is shared by all consumers
+ channel.queue_declare(queue=COMPETING_QUEUE)
+
+ for queue, commands in QUEUE_COMMANDS_MAP.items():
+ if queue == '':
+ queue = exclusive_queue_name
+
+ for command in commands:
+ channel.queue_bind(
+ exchange=EXCHANGE_NAME, queue=queue, routing_key=command)
+
+ systemd_path = Path.home() / '.config/systemd/user'
+ service_manager = services.SystemdServiceManager(systemd_path)
+
+ # TODO: Setup or radvd does not belong here, we will remove it
+ # Guest VPNs have to handle IPv6 management on their own
+ radvd = aetherscale.vpn.radvd.Radvd(
+ config.AETHERSCALE_CONFIG_DIR / 'radvd.conf', config.VPN_48_PREFIX)
+ service_manager.install_simple_service(
+ radvd.get_start_command(), service_name=RADVD_SERVICE_NAME,
+ description='IPv6 Router Advertisment for VPNs')
+ service_manager.start_service(RADVD_SERVICE_NAME)
+
+ handler = ComputingHandler(radvd, service_manager)
+
+ bound_callback = lambda ch, method, properties, body: \
+ callback(ch, method, properties, body, handler)
+ channel.basic_consume(
+ queue=exclusive_queue_name, on_message_callback=bound_callback)
+ channel.basic_consume(
+ queue=COMPETING_QUEUE, on_message_callback=bound_callback)
+
+ try:
+ channel.start_consuming()
+ except KeyboardInterrupt:
+ print('Keyboard interrupt, stopping service')
diff --git a/aetherscale/computing.py b/aetherscale/computing.py
@@ -1,8 +1,6 @@
import logging
-import json
import os
from pathlib import Path
-import pika
import psutil
import random
import re
@@ -10,9 +8,8 @@ import shlex
import shutil
import string
import subprocess
-import sys
import tempfile
-from typing import List, Optional, Dict, Any, Callable, Tuple, Iterator
+from typing import List, Optional, Dict, Any, Tuple, Iterator
from aetherscale.paths import \
user_image_path, qemu_socket_monitor, qemu_socket_guest_agent, \
@@ -26,13 +23,6 @@ from .vpn.tinc import TincVirtualNetwork
import aetherscale.vpn.radvd
-EXCHANGE_NAME = 'computing'
-COMPETING_QUEUE = 'computing-competing'
-QUEUE_COMMANDS_MAP = {
- '': ['list-vms', 'start-vm', 'stop-vm', 'delete-vm'],
- COMPETING_QUEUE: ['create-vm'],
-}
-
RADVD_SERVICE_NAME = 'aetherscale-radvd.service'
logging.basicConfig(level=config.LOG_LEVEL)
@@ -484,128 +474,3 @@ def vm_id_from_systemd_unit(systemd_unit: str) -> str:
else:
raise ValueError(
f'{systemd_unit} is not a valid systemd unit file for a VM')
-
-
-def noop_responder(_: Dict[str, Any]):
- pass
-
-
-def create_rabbitmq_responder(ch, reply_to: str, correlation_id: str):
- def rabbitmq_responder(message: Dict[str, Any]):
- ch.basic_publish(
- exchange='',
- routing_key=reply_to,
- properties=pika.BasicProperties(
- correlation_id=correlation_id
- ),
- body=json.dumps(message))
-
- return rabbitmq_responder
-
-
-def callback(ch, method, properties, body, handler: ComputingHandler):
- command_fn: Dict[str, Callable[[Dict[str, Any]], Iterator[Any]]] = {
- 'list-vms': handler.list_vms,
- 'create-vm': handler.create_vm,
- 'start-vm': handler.start_vm,
- 'stop-vm': handler.stop_vm,
- 'delete-vm': handler.delete_vm,
- }
-
- message = body.decode('utf-8')
- logging.debug('Received message: ' + message)
-
- data = json.loads(message)
-
- try:
- command = data['command']
- except KeyError:
- logging.error('No "command" specified in message')
- return
-
- try:
- fn = command_fn[command]
- except KeyError:
- logging.error(f'Invalid command "{command}" specified')
- return
-
- if properties.reply_to:
- responder = create_rabbitmq_responder(
- ch, properties.reply_to, properties.correlation_id)
- else:
- responder = noop_responder
-
- options = data.get('options', {})
- try:
- for response in fn(options):
- # if a function wants to return a response
- # set its execution status to success
- resp_message = {
- 'execution-info': {
- 'status': 'success'
- },
- 'response': response,
- }
- responder(resp_message)
- except Exception as e:
- logging.exception('Unhandled exception')
- resp_message = {
- 'execution-info': {
- 'status': 'error',
- # TODO: Only ouput message if it is an exception generated
- # by us
- 'reason': str(e),
- }
- }
- responder(resp_message)
-
- ch.basic_ack(delivery_tag=method.delivery_tag)
-
-
-def run():
- connection = pika.BlockingConnection(
- pika.ConnectionParameters(host=config.RABBITMQ_HOST))
- channel = connection.channel()
-
- channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type='direct')
-
- # let rabbitmq define a name for the exclusive queue
- result = channel.queue_declare(queue='', exclusive=True)
- exclusive_queue_name = result.method.queue
- # setup one queue that is shared by all consumers
- channel.queue_declare(queue=COMPETING_QUEUE)
-
- for queue, commands in QUEUE_COMMANDS_MAP.items():
- if queue == '':
- queue = exclusive_queue_name
-
- for command in commands:
- channel.queue_bind(
- exchange=EXCHANGE_NAME, queue=queue, routing_key=command)
-
- systemd_path = Path.home() / '.config/systemd/user'
- service_manager = services.SystemdServiceManager(systemd_path)
- radvd = aetherscale.vpn.radvd.Radvd(
- config.AETHERSCALE_CONFIG_DIR / 'radvd.conf', config.VPN_48_PREFIX)
- service_manager.install_simple_service(
- radvd.get_start_command(), service_name=RADVD_SERVICE_NAME,
- description='IPv6 Router Advertisment for VPNs')
- service_manager.start_service(RADVD_SERVICE_NAME)
-
- handler = ComputingHandler(radvd, service_manager)
-
- bound_callback = lambda ch, method, properties, body: \
- callback(ch, method, properties, body, handler)
- channel.basic_consume(
- queue=exclusive_queue_name, on_message_callback=bound_callback)
- channel.basic_consume(
- queue=COMPETING_QUEUE, on_message_callback=bound_callback)
-
- if not networking.Iproute2Network.check_device_existence('br0'):
- print('aetherscale expects a device br0 to exist', file=sys.stderr)
- sys.exit(1)
-
- try:
- channel.start_consuming()
- except KeyboardInterrupt:
- print('Keyboard interrupt, stopping service')
diff --git a/aetherscale/server.py b/aetherscale/server.py
@@ -2,7 +2,8 @@ import sys
from aetherscale import __version__
from aetherscale import dependencies
-import aetherscale.computing
+from aetherscale import networking
+import aetherscale.api.broker
import aetherscale.api.rest
@@ -15,7 +16,13 @@ def main():
if len(missing_deps) > 0:
help_text = dependencies.build_dependency_help_text(missing_deps)
print(help_text, file=sys.stderr)
+ sys.exit(1)
+
+ if not networking.Iproute2Network.check_device_existence('br0'):
+ print('aetherscale expects a device br0 to exist', file=sys.stderr)
+ sys.exit(1)
+
elif len(sys.argv) >= 2 and sys.argv[1] == 'http':
aetherscale.api.rest.run()
else:
- aetherscale.computing.run()
+ aetherscale.api.broker.run()