commit 154c08bb218d616b08b38d2520d2008e86ccffe9
parent 720ddbbce85e1c2cccc202ec5c8f6bc4094c9937
Author: Stefan Koch <programming@stefan-koch.name>
Date: Mon, 14 Dec 2020 22:26:12 +0100
write unit tests for computing
Diffstat:
11 files changed, 519 insertions(+), 267 deletions(-)
diff --git a/aetherscale/computing.py b/aetherscale/computing.py
@@ -15,17 +15,15 @@ import time
from typing import List, Optional, Dict, Any, Callable
from . import interfaces
-from . import execution
from .qemu import image, runtime
-from .config import LOG_LEVEL, RABBITMQ_HOST
+from .qemu.exceptions import QemuException
+from . import config
+from . import services
VDE_FOLDER = '/tmp/vde.ctl'
VDE_TAP_INTERFACE = 'tap-vde'
-BASE_IMAGE_FOLDER = Path('base_images')
-USER_IMAGE_FOLDER = Path('user_images')
-
EXCHANGE_NAME = 'computing'
COMPETING_QUEUE = 'computing-competing'
QUEUE_COMMANDS_MAP = {
@@ -33,11 +31,11 @@ QUEUE_COMMANDS_MAP = {
COMPETING_QUEUE: ['create-vm'],
}
-logging.basicConfig(level=LOG_LEVEL)
+logging.basicConfig(level=config.LOG_LEVEL)
def user_image_path(vm_id: str) -> Path:
- return USER_IMAGE_FOLDER / f'{vm_id}.qcow2'
+ return config.USER_IMAGE_FOLDER / f'{vm_id}.qcow2'
def qemu_socket_monitor(vm_id: str) -> Path:
@@ -49,7 +47,8 @@ def qemu_socket_guest_agent(vm_id: str) -> Path:
def create_user_image(vm_id: str, image_name: str) -> Path:
- base_image = BASE_IMAGE_FOLDER / f'{image_name}.qcow2'
+ base_image = config.BASE_IMAGE_FOLDER / f'{image_name}.qcow2'
+ print(base_image)
if not base_image.is_file():
raise IOError(f'Image "{image_name}" does not exist')
@@ -59,162 +58,211 @@ def create_user_image(vm_id: str, image_name: str) -> Path:
'qemu-img', 'create', '-f', 'qcow2',
'-b', str(base_image.absolute()), '-F', 'qcow2', str(user_image)])
if create_img_result.returncode != 0:
- raise runtime.QemuException(f'Could not create image for VM "{vm_id}"')
+ raise QemuException(f'Could not create image for VM "{vm_id}"')
return user_image
-def list_vms(_: Dict[str, Any]) -> List[Dict[str, Any]]:
- vms = []
-
- for proc in psutil.process_iter(['pid', 'name']):
- if proc.name().startswith('vm-'):
- vm_id = proc.name()[3:]
-
- socket_file = qemu_socket_guest_agent(vm_id)
- hint = None
- ip_addresses = []
- try:
- fetcher = runtime.GuestAgentIpAddress(socket_file)
- ip_addresses = fetcher.fetch_ip_addresses()
- except runtime.QemuException:
- hint = 'Could not retrieve IP address for guest'
-
- msg = {
- 'vm-id': vm_id,
- 'ip-addresses': ip_addresses,
- }
- if hint:
- msg['hint'] = hint
-
- vms.append(msg)
-
- return vms
-
-
-def create_vm(options: Dict[str, Any]) -> Dict[str, str]:
- vm_id = ''.join(
- random.choice(string.ascii_lowercase) for _ in range(8))
- logging.info(f'Starting VM "{vm_id}"')
-
- try:
- image_name = os.path.basename(options['image'])
- except KeyError:
- raise ValueError('Image not specified')
-
- try:
- user_image = create_user_image(vm_id, image_name)
- except (OSError, runtime.QemuException):
- raise
-
- if 'init-script' in options:
- with image.guestmount(user_image) as guest_fs:
- image.install_startup_script(options['init-script'], guest_fs)
-
- mac_addr = interfaces.create_mac_address()
- logging.debug(f'Assigning MAC address "{mac_addr}" to VM "{vm_id}"')
-
- qemu_config = QemuStartupConfig(
- vm_id=vm_id,
- hda_image=user_image,
- mac_addr=mac_addr,
- vde_folder=Path(VDE_FOLDER))
- unit_name = systemd_unit_name_for_vm(vm_id)
- create_qemu_systemd_unit(unit_name, qemu_config)
- execution.start_systemd_unit(unit_name)
-
- logging.info(f'Started VM "{vm_id}"')
- return {
- 'status': 'starting',
- 'vm-id': vm_id,
- }
-
-
-def start_vm(options: Dict[str, Any]) -> Dict[str, str]:
- try:
- vm_id = options['vm-id']
- except KeyError:
- raise ValueError('VM ID not specified')
+@dataclass
+class QemuStartupConfig:
+ vm_id: str
+ hda_image: Path
+ mac_addr: str
+ vde_folder: Path
- unit_name = systemd_unit_name_for_vm(vm_id)
- if not execution.systemd_unit_exists(unit_name):
- raise RuntimeError('VM does not exist')
- elif execution.systemctl_is_running(unit_name):
- response = {
+class ComputingHandler:
+ def __init__(self, service_manager: services.ServiceManager):
+ self.service_manager = service_manager
+
+ def list_vms(self, _: Dict[str, Any]) -> List[Dict[str, Any]]:
+ vms = []
+
+ for proc in psutil.process_iter(['pid', 'name']):
+ if proc.name().startswith('vm-'):
+ vm_id = proc.name()[3:]
+
+ socket_file = qemu_socket_guest_agent(vm_id)
+ hint = None
+ ip_addresses = []
+ try:
+ fetcher = runtime.GuestAgentIpAddress(socket_file)
+ ip_addresses = fetcher.fetch_ip_addresses()
+ except QemuException:
+ hint = 'Could not retrieve IP address for guest'
+
+ msg = {
+ 'vm-id': vm_id,
+ 'ip-addresses': ip_addresses,
+ }
+ if hint:
+ msg['hint'] = hint
+
+ vms.append(msg)
+
+ return vms
+
+ def create_vm(self, options: Dict[str, Any]) -> Dict[str, str]:
+ vm_id = ''.join(
+ random.choice(string.ascii_lowercase) for _ in range(8))
+ logging.info(f'Starting VM "{vm_id}"')
+
+ try:
+ image_name = os.path.basename(options['image'])
+ except KeyError:
+ raise ValueError('Image not specified')
+
+ try:
+ user_image = create_user_image(vm_id, image_name)
+ except (OSError, QemuException):
+ raise
+
+ if 'init-script' in options:
+ with image.guestmount(user_image) as guest_fs:
+ image.install_startup_script(options['init-script'], guest_fs)
+
+ mac_addr = interfaces.create_mac_address()
+ logging.debug(f'Assigning MAC address "{mac_addr}" to VM "{vm_id}"')
+
+ qemu_config = QemuStartupConfig(
+ vm_id=vm_id,
+ hda_image=user_image,
+ mac_addr=mac_addr,
+ vde_folder=Path(VDE_FOLDER))
+ unit_name = systemd_unit_name_for_vm(vm_id)
+ self._create_qemu_systemd_unit(unit_name, qemu_config)
+ self.service_manager.start_service(unit_name)
+
+ logging.info(f'Started VM "{vm_id}"')
+ return {
'status': 'starting',
'vm-id': vm_id,
- 'hint': f'VM "{vm_id}" was already started',
}
- else:
- execution.start_systemd_unit(unit_name)
- execution.enable_systemd_unit(unit_name)
- response = {
- 'status': 'starting',
- 'vm-id': vm_id,
- }
+ def start_vm(self, options: Dict[str, Any]) -> Dict[str, str]:
+ try:
+ vm_id = options['vm-id']
+ except KeyError:
+ raise ValueError('VM ID not specified')
- return response
+ unit_name = systemd_unit_name_for_vm(vm_id)
+ if not self.service_manager.service_exists(unit_name):
+ raise RuntimeError('VM does not exist')
+ elif self.service_manager.service_is_running(unit_name):
+ response = {
+ 'status': 'starting',
+ 'vm-id': vm_id,
+ 'hint': f'VM "{vm_id}" was already started',
+ }
+ else:
+ self.service_manager.start_service(unit_name)
+ self.service_manager.enable_service(unit_name)
-def stop_vm(options: Dict[str, Any]) -> Dict[str, str]:
- try:
- vm_id = options['vm-id']
- except KeyError:
- raise ValueError('VM ID not specified')
+ response = {
+ 'status': 'starting',
+ 'vm-id': vm_id,
+ }
- kill_flag = bool(options.get('kill', False))
- stop_status = 'killed' if kill_flag else 'stopped'
+ return response
- unit_name = systemd_unit_name_for_vm(vm_id)
+ def stop_vm(self, options: Dict[str, Any]) -> Dict[str, str]:
+ try:
+ vm_id = options['vm-id']
+ except KeyError:
+ raise ValueError('VM ID not specified')
- if not execution.systemd_unit_exists(unit_name):
- raise RuntimeError('VM does not exist')
- elif not execution.systemctl_is_running(unit_name):
- response = {
- 'status': stop_status,
- 'vm-id': vm_id,
- 'hint': f'VM "{vm_id}" was not running',
- }
- else:
- execution.disable_systemd_unit(unit_name)
+ kill_flag = bool(options.get('kill', False))
+ stop_status = 'killed' if kill_flag else 'stopped'
- if kill_flag:
- execution.stop_systemd_unit(unit_name)
- else:
- qemu_socket = qemu_socket_monitor(vm_id)
- qm = runtime.QemuMonitor(qemu_socket, protocol=qemu.QemuProtocol.QMP)
- qm.execute('system_powerdown')
+ unit_name = systemd_unit_name_for_vm(vm_id)
- response = {
- 'status': stop_status,
- 'vm-id': vm_id,
- }
+ if not self.service_manager.service_exists(unit_name):
+ raise RuntimeError('VM does not exist')
+ elif not self.service_manager.service_is_running(unit_name):
+ response = {
+ 'status': stop_status,
+ 'vm-id': vm_id,
+ 'hint': f'VM "{vm_id}" was not running',
+ }
+ else:
+ self.service_manager.disable_service(unit_name)
+
+ if kill_flag:
+ self.service_manager.stop_service(unit_name)
+ else:
+ qemu_socket = qemu_socket_monitor(vm_id)
+ qm = runtime.QemuMonitor(
+ qemu_socket, protocol=runtime.QemuProtocol.QMP)
+ qm.execute('system_powerdown')
+
+ response = {
+ 'status': stop_status,
+ 'vm-id': vm_id,
+ }
- return response
+ return response
+ def delete_vm(self, options: Dict[str, Any]) -> Dict[str, str]:
+ try:
+ vm_id = options['vm-id']
+ except KeyError:
+ raise ValueError('VM ID not specified')
-def delete_vm(options: Dict[str, Any]) -> Dict[str, str]:
- try:
- vm_id = options['vm-id']
- except KeyError:
- raise ValueError('VM ID not specified')
+ # force kill stop when a VM is deleted
+ options['kill'] = True
+ self.stop_vm(options)
- # force kill stop when a VM is deleted
- options['kill'] = True
- stop_vm(options)
+ unit_name = systemd_unit_name_for_vm(vm_id)
+ user_image = user_image_path(vm_id)
- unit_name = systemd_unit_name_for_vm(vm_id)
- user_image = user_image_path(vm_id)
+ self.service_manager.uninstall_service(unit_name)
+ user_image.unlink()
- execution.delete_systemd_unit(unit_name)
- user_image.unlink()
+ return {
+ 'status': 'deleted',
+ 'vm-id': vm_id,
+ }
- return {
- 'status': 'deleted',
- 'vm-id': vm_id,
- }
+ def _create_qemu_systemd_unit(
+ self, unit_name: str, qemu_config: QemuStartupConfig):
+ hda_quoted = shlex.quote(str(qemu_config.hda_image.absolute()))
+ device_quoted = shlex.quote(
+ f'virtio-net-pci,netdev=pubnet,mac={qemu_config.mac_addr}')
+ netdev_quoted = shlex.quote(
+ f'vde,id=pubnet,sock={str(qemu_config.vde_folder)}')
+ name_quoted = shlex.quote(
+ f'qemu-vm-{qemu_config.vm_id},process=vm-{qemu_config.vm_id}')
+
+ qemu_monitor_path = qemu_socket_monitor(qemu_config.vm_id)
+ socket_quoted = shlex.quote(f'unix:{qemu_monitor_path},server,nowait')
+
+ qga_monitor_path = qemu_socket_guest_agent(qemu_config.vm_id)
+ qga_chardev_quoted = shlex.quote(
+ f'socket,path={qga_monitor_path},server,nowait,id=qga0')
+
+ command = \
+ f'qemu-system-x86_64 -m 4096 -accel kvm -hda {hda_quoted} ' \
+ f'-device {device_quoted} -netdev {netdev_quoted} ' \
+ f'-name {name_quoted} ' \
+ '-nographic ' \
+ f'-qmp {socket_quoted} ' \
+ f'-chardev {qga_chardev_quoted} ' \
+ '-device virtio-serial ' \
+ '-device virtserialport,chardev=qga0,name=org.qemu.guest_agent.0'
+
+ with tempfile.NamedTemporaryFile(mode='w+t', delete=False) as f:
+ f.write('[Unit]\n')
+ f.write(f'Description=aetherscale VM {qemu_config.vm_id}\n')
+ f.write('\n')
+ f.write('[Service]\n')
+ f.write(f'ExecStart={command}\n')
+ f.write('\n')
+ f.write('[Install]\n')
+ f.write('WantedBy=default.target\n')
+
+ self.service_manager.install_service(Path(f.name), unit_name)
+ os.remove(f.name)
def get_process_for_vm(vm_id: str) -> Optional[psutil.Process]:
@@ -229,61 +277,13 @@ def systemd_unit_name_for_vm(vm_id: str) -> str:
return f'aetherscale-vm-{vm_id}.service'
-@dataclass
-class QemuStartupConfig:
- vm_id: str
- hda_image: Path
- mac_addr: str
- vde_folder: Path
-
-
-def create_qemu_systemd_unit(
- unit_name: str, qemu_config: QemuStartupConfig):
- hda_quoted = shlex.quote(str(qemu_config.hda_image.absolute()))
- device_quoted = shlex.quote(
- f'virtio-net-pci,netdev=pubnet,mac={qemu_config.mac_addr}')
- netdev_quoted = shlex.quote(
- f'vde,id=pubnet,sock={str(qemu_config.vde_folder)}')
- name_quoted = shlex.quote(
- f'qemu-vm-{qemu_config.vm_id},process=vm-{qemu_config.vm_id}')
-
- qemu_monitor_path = qemu_socket_monitor(qemu_config.vm_id)
- socket_quoted = shlex.quote(f'unix:{qemu_monitor_path},server,nowait')
-
- qga_monitor_path = qemu_socket_guest_agent(qemu_config.vm_id)
- qga_chardev_quoted = shlex.quote(
- f'socket,path={qga_monitor_path},server,nowait,id=qga0')
-
- command = f'qemu-system-x86_64 -m 4096 -accel kvm -hda {hda_quoted} ' \
- f'-device {device_quoted} -netdev {netdev_quoted} ' \
- f'-name {name_quoted} ' \
- '-nographic ' \
- f'-qmp {socket_quoted} ' \
- f'-chardev {qga_chardev_quoted} ' \
- '-device virtio-serial ' \
- '-device virtserialport,chardev=qga0,name=org.qemu.guest_agent.0'
-
- with tempfile.NamedTemporaryFile(mode='w+t', delete=False) as f:
- f.write('[Unit]\n')
- f.write(f'Description=aetherscale VM {qemu_config.vm_id}\n')
- f.write('\n')
- f.write('[Service]\n')
- f.write(f'ExecStart={command}\n')
- f.write('\n')
- f.write('[Install]\n')
- f.write('WantedBy=default.target\n')
-
- execution.copy_systemd_unit(Path(f.name), unit_name)
- os.remove(f.name)
-
-
-def callback(ch, method, properties, body):
+def callback(ch, method, properties, body, handler: ComputingHandler):
command_fn: Dict[str, Callable[[Dict[str, Any]], Dict[str, Any]]] = {
- 'list-vms': list_vms,
- 'create-vm': create_vm,
- 'start-vm': start_vm,
- 'stop-vm': stop_vm,
- 'delete-vm': delete_vm,
+ 'list-vms': handler.list_vms,
+ 'create-vm': handler.create_vm,
+ 'start-vm': handler.start_vm,
+ 'stop-vm': handler.stop_vm,
+ 'delete-vm': handler.delete_vm,
}
message = body.decode('utf-8')
@@ -337,7 +337,7 @@ def callback(ch, method, properties, body):
def run():
connection = pika.BlockingConnection(
- pika.ConnectionParameters(host=RABBITMQ_HOST))
+ pika.ConnectionParameters(host=config.RABBITMQ_HOST))
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type='direct')
@@ -356,10 +356,16 @@ def run():
channel.queue_bind(
exchange=EXCHANGE_NAME, queue=queue, routing_key=command)
+ systemd_path = Path.home() / '.config/systemd/user'
+ service_manager = services.SystemdServiceManager(systemd_path)
+ handler = ComputingHandler(service_manager)
+
+ bound_callback = lambda ch, method, properties, body: \
+ callback(ch, method, properties, body, handler)
channel.basic_consume(
- queue=exclusive_queue_name, on_message_callback=callback)
+ queue=exclusive_queue_name, on_message_callback=bound_callback)
channel.basic_consume(
- queue=COMPETING_QUEUE, on_message_callback=callback)
+ queue=COMPETING_QUEUE, on_message_callback=bound_callback)
# a TAP interface for VDE must already have been created
if not interfaces.check_device_existence(VDE_TAP_INTERFACE):
@@ -369,13 +375,13 @@ def run():
sys.exit(1)
logging.info('Bringing up VDE networking')
- execution.copy_systemd_unit(
+ service_manager.install_service(
Path('data/systemd/aetherscale-vde.service'),
'aetherscale-vde.service')
- execution.start_systemd_unit('aetherscale-vde.service')
+ service_manager.start_service('aetherscale-vde.service')
# Give systemd a bit time to start VDE
time.sleep(0.5)
- if not execution.systemctl_is_running('aetherscale-vde.service'):
+ if not service_manager.service_is_running('aetherscale-vde.service'):
logging.error('Failed to start VDE networking.')
sys.exit(1)
diff --git a/aetherscale/config.py b/aetherscale/config.py
@@ -1,6 +1,11 @@
import logging
import os
+from pathlib import Path
+
LOG_LEVEL = os.getenv('LOG_LEVEL', default=logging.WARNING)
RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', default='localhost')
+
+BASE_IMAGE_FOLDER = Path(os.getenv('BASE_IMAGE_FOLDER', default='base_images'))
+USER_IMAGE_FOLDER = Path(os.getenv('USER_IMAGE_FOLDER', default='user_images'))
diff --git a/aetherscale/execution.py b/aetherscale/execution.py
@@ -16,57 +16,3 @@ def run_command_chain(commands: List[List[str]]) -> bool:
return True
-def systemd_unit_path(unit_name: str) -> Path:
- systemd_unit_dir = Path().home() / '.config/systemd/user'
- return systemd_unit_dir / unit_name
-
-
-def copy_systemd_unit(unit_file: Path, unit_name: str):
- if '.' not in unit_name:
- raise ValueError('Unit name must contain the suffix, e.g. .service')
-
- target_unit_path = systemd_unit_path(unit_name)
- target_unit_path.parent.mkdir(parents=True, exist_ok=True)
-
- shutil.copyfile(unit_file, target_unit_path)
-
- # Reload system
- subprocess.run(['systemctl', '--user', 'daemon-reload'])
-
-
-def delete_systemd_unit(unit_name: str):
- systemd_unit_path(unit_name).unlink(missing_ok=True)
-
-
-def start_systemd_unit(unit_name: str) -> bool:
- return run_command_chain([
- ['systemctl', '--user', 'start', unit_name],
- ])
-
-
-def stop_systemd_unit(unit_name: str) -> bool:
- return run_command_chain([
- ['systemctl', '--user', 'stop', unit_name],
- ])
-
-
-def enable_systemd_unit(unit_name: str) -> bool:
- return run_command_chain([
- ['systemctl', '--user', 'enable', unit_name],
- ])
-
-
-def disable_systemd_unit(unit_name: str) -> bool:
- return run_command_chain([
- ['systemctl', '--user', 'disable', unit_name],
- ])
-
-
-def systemctl_is_running(unit_name: str) -> bool:
- result = subprocess.run([
- 'systemctl', '--user', 'is-active', '--quiet', unit_name])
- return result.returncode == 0
-
-
-def systemd_unit_exists(unit_name: str) -> bool:
- return systemd_unit_path(unit_name).is_file()
diff --git a/aetherscale/qemu/exceptions.py b/aetherscale/qemu/exceptions.py
@@ -0,0 +1,2 @@
+class QemuException(Exception):
+ pass
diff --git a/aetherscale/qemu/image.py b/aetherscale/qemu/image.py
@@ -8,6 +8,7 @@ import tempfile
from typing import List, TextIO, Iterator
from aetherscale.execution import run_command_chain
+from aetherscale.qemu.exceptions import QemuException
import aetherscale.timing
@@ -20,8 +21,12 @@ def guestmount(image_path: Path) -> Iterator[Path]:
logging.debug(f'Mounting {image_path} at {mount_dir}')
try:
- run_command_chain([
+ success = run_command_chain([
['guestmount', '-a', str(image_path.absolute()), '-i', mount_dir]])
+
+ if not success:
+ raise QemuException(f'Could not mount image {image_path}')
+
yield Path(mount_dir)
finally:
logging.debug(f'Unmounting {mount_dir}')
diff --git a/aetherscale/qemu/runtime.py b/aetherscale/qemu/runtime.py
@@ -6,9 +6,7 @@ import random
import socket
from typing import Any, Dict, Optional
-
-class QemuException(Exception):
- pass
+from aetherscale.qemu.exceptions import QemuException
class QemuProtocol(enum.Enum):
diff --git a/aetherscale/services.py b/aetherscale/services.py
@@ -0,0 +1,102 @@
+from abc import ABC, abstractmethod
+from pathlib import Path
+import shutil
+import subprocess
+
+from aetherscale.execution import run_command_chain
+
+
+class ServiceManager(ABC):
+ @abstractmethod
+ def install_service(self, config_file: Path, service_name: str) -> bool:
+ """Installs a service on the system for possible activation"""
+
+ @abstractmethod
+ def uninstall_service(self, service_name: str) -> bool:
+ """Removes a service from the system once it's no longer needed"""
+
+ @abstractmethod
+ def start_service(self, service_name: str) -> bool:
+ """Start a service"""
+
+ @abstractmethod
+ def stop_service(self, service_name: str) -> bool:
+ """Stop a service"""
+
+ @abstractmethod
+ def enable_service(self, service_name: str) -> bool:
+ """Enable a service so that it will be auto-started on reboots"""
+
+ @abstractmethod
+ def disable_service(self, service_name: str) -> bool:
+ """Remove a service from autostart"""
+
+ @abstractmethod
+ def service_is_running(self, service_name: str) -> bool:
+ """Check whether a service is currently running"""
+
+ @abstractmethod
+ def service_exists(self, service_name: str) -> bool:
+ """Check whether a service is currently installed"""
+
+
+class SystemdServiceManager(ServiceManager):
+ def __init__(self, unit_folder: Path):
+ self.unit_folder = unit_folder
+
+ def install_service(self, config_file: Path, service_name: str) -> bool:
+ if '.' not in service_name:
+ raise ValueError('Unit name must contain the suffix, e.g. .service')
+
+ target_unit_path = self._systemd_unit_path(service_name)
+ target_unit_path.parent.mkdir(parents=True, exist_ok=True)
+
+ try:
+ shutil.copyfile(config_file, target_unit_path)
+ except OSError:
+ return False
+
+ # Reload system
+ r = subprocess.run(['systemctl', '--user', 'daemon-reload'])
+ return r.returncode == 0
+
+ def uninstall_service(self, service_name: str) -> bool:
+ if '.' not in service_name:
+ raise ValueError('Unit name must contain the suffix, e.g. .service')
+
+ try:
+ self._systemd_unit_path(service_name).unlink(missing_ok=True)
+ return True
+ except OSError:
+ return False
+
+ def start_service(self, service_name: str) -> bool:
+ return run_command_chain([
+ ['systemctl', '--user', 'start', service_name],
+ ])
+
+ def stop_service(self, service_name: str) -> bool:
+ return run_command_chain([
+ ['systemctl', '--user', 'stop', service_name],
+ ])
+
+ def enable_service(self, service_name: str) -> bool:
+ return run_command_chain([
+ ['systemctl', '--user', 'enable', service_name],
+ ])
+
+ def disable_service(self, service_name: str) -> bool:
+ return run_command_chain([
+ ['systemctl', '--user', 'disable', service_name],
+ ])
+
+ def service_is_running(self, service_name: str) -> bool:
+ result = subprocess.run([
+ 'systemctl', '--user', 'is-active', '--quiet', service_name])
+ return result.returncode == 0
+
+ def service_exists(self, service_name: str) -> bool:
+ return self._systemd_unit_path(service_name).is_file()
+
+ def _systemd_unit_path(self, service_name: str) -> Path:
+ return self.unit_folder / service_name
diff --git a/tests/conftest.py b/tests/conftest.py
@@ -1,9 +1,72 @@
from pathlib import Path
+
import pytest
+from aetherscale.services import ServiceManager
import aetherscale.timing
@pytest.fixture
+def tmppath(tmpdir):
+ yield Path(tmpdir)
+
+
+@pytest.fixture
def timeout():
return aetherscale.timing.timeout
+
+
+@pytest.fixture
+def mock_service_manager():
+ class MockServiceManager(ServiceManager):
+ def __init__(self):
+ self.services = set()
+ self.started_services = set()
+ self.enabled_services = set()
+
+ def install_service(self, config_file: Path, service_name: str) -> bool:
+ self.services.add(service_name)
+ return True
+
+ def uninstall_service(self, service_name: str) -> bool:
+ try:
+ self.services.remove(service_name)
+ except KeyError:
+ # should not fail if was already uninstalled
+ pass
+
+ return True
+
+ def start_service(self, service_name: str) -> bool:
+ self.started_services.add(service_name)
+ return True
+
+ def stop_service(self, service_name: str) -> bool:
+ try:
+ self.started_services.remove(service_name)
+ except KeyError:
+ # should not fail if was already stopped
+ pass
+
+ return True
+
+ def enable_service(self, service_name: str) -> bool:
+ self.enabled_services.add(service_name)
+ return True
+
+ def disable_service(self, service_name: str) -> bool:
+ try:
+ self.enabled_services.remove(service_name)
+ except KeyError:
+ # should not fail if was already disabled
+ pass
+
+ return True
+
+ def service_is_running(self, service_name: str) -> bool:
+ return service_name in self.started_services
+
+ def service_exists(self, service_name: str) -> bool:
+ return service_name in self.services
+
+ return MockServiceManager()
diff --git a/tests/test_computing.py b/tests/test_computing.py
@@ -0,0 +1,76 @@
+from contextlib import contextmanager
+import os
+from pathlib import Path
+import pytest
+import subprocess
+from typing import Iterator
+from unittest import mock
+import uuid
+
+from aetherscale import computing
+from aetherscale.services import ServiceManager
+
+
+@contextmanager
+def base_image(directory: Path) -> Iterator[Path]:
+ random_name = str(uuid.uuid4())
+ try:
+ img_file = directory / f'{random_name}.qcow2'
+ subprocess.run([
+ 'qemu-img', 'create', '-f', 'qcow2', str(img_file), '1G'])
+ yield img_file
+ finally:
+ os.unlink(img_file)
+
+
+def test_create_user_image(tmppath):
+ with mock.patch('aetherscale.config.BASE_IMAGE_FOLDER', tmppath), \
+ mock.patch('aetherscale.config.USER_IMAGE_FOLDER', tmppath):
+
+ with base_image(tmppath) as img:
+ user_image = computing.create_user_image('my-vm-id', img.stem)
+ user_image.is_file()
+
+
+def test_vm_lifecycle(tmppath, mock_service_manager: ServiceManager):
+ with mock.patch('aetherscale.config.BASE_IMAGE_FOLDER', tmppath), \
+ mock.patch('aetherscale.config.USER_IMAGE_FOLDER', tmppath):
+
+ handler = computing.ComputingHandler(mock_service_manager)
+ with base_image(tmppath) as img:
+ result = handler.create_vm({'image': img.stem})
+ vm_id = result['vm-id']
+ service_name = computing.systemd_unit_name_for_vm(vm_id)
+ assert result['status'] == 'starting'
+ assert mock_service_manager.service_is_running(service_name)
+
+ # TODO: Test graceful stop, needs mock of QemuMonitor
+ result = handler.stop_vm({'vm-id': vm_id, 'kill': True})
+ assert result['status'] == 'killed'
+ assert mock_service_manager.service_exists(service_name)
+ assert not mock_service_manager.service_is_running(service_name)
+
+ result = handler.start_vm({'vm-id': vm_id})
+ assert result['status'] == 'starting'
+ assert mock_service_manager.service_exists(service_name)
+ assert mock_service_manager.service_is_running(service_name)
+
+ result = handler.delete_vm({'vm-id': vm_id})
+ assert result['status'] == 'deleted'
+ assert not mock_service_manager.service_exists(service_name)
+ assert not mock_service_manager.service_is_running(service_name)
+
+
+def test_run_missing_base_image(tmppath, mock_service_manager: ServiceManager):
+ with mock.patch('aetherscale.config.BASE_IMAGE_FOLDER', tmppath), \
+ mock.patch('aetherscale.config.USER_IMAGE_FOLDER', tmppath):
+
+ handler = computing.ComputingHandler(mock_service_manager)
+
+ # specify invalid base image
+ with pytest.raises(OSError):
+ handler.create_vm({'image': 'some-missing-image'})
+
+ # do not specify a base image
+ with pytest.raises(ValueError):
+ handler.create_vm({})
diff --git a/tests/test_qemu_image.py b/tests/test_qemu_image.py
@@ -1,18 +1,26 @@
import os
-from pathlib import Path
+import pytest
-from aetherscale.qemu.image import install_startup_script, STARTUP_FILENAME
+from aetherscale.qemu import image
+from aetherscale.qemu.exceptions import QemuException
-def test_copies_startup_script_to_vm_dir(tmpdir):
- tmpdir = Path(tmpdir)
-
+def test_copies_startup_script_to_vm_dir(tmppath):
# Create directories that normally exist in mounted OS
- (tmpdir / 'etc/systemd/system').mkdir(parents=True, exist_ok=True)
- (tmpdir / 'root').mkdir(parents=True, exist_ok=True)
+ (tmppath / 'etc/systemd/system').mkdir(parents=True, exist_ok=True)
+ (tmppath / 'root').mkdir(parents=True, exist_ok=True)
- install_startup_script('echo something', tmpdir)
+ image.install_startup_script('echo something', tmppath)
- assert os.path.isfile(tmpdir / f'root/{STARTUP_FILENAME}.sh')
+ assert os.path.isfile(tmppath / f'root/{image.STARTUP_FILENAME}.sh')
assert os.path.isfile(
- tmpdir / f'etc/systemd/system/{STARTUP_FILENAME}.service')
+ tmppath / f'etc/systemd/system/{image.STARTUP_FILENAME}.service')
+
+
+def test_mount_invalid_image(tmppath):
+ imagepath = tmppath / 'image.qcow2'
+ imagepath.touch()
+
+ with pytest.raises(QemuException):
+ with image.guestmount(imagepath):
+ pass
diff --git a/tests/test_services.py b/tests/test_services.py
@@ -0,0 +1,41 @@
+from pathlib import Path
+import tempfile
+from unittest import mock
+
+from aetherscale.services import SystemdServiceManager
+
+
+def test_systemd_creates_file(tmppath: Path):
+ systemd = SystemdServiceManager(tmppath)
+ with tempfile.NamedTemporaryFile('wt') as f:
+ f.write('[Unit]')
+ f.flush()
+
+ systemd.install_service(Path(f.name), 'test.service')
+ assert systemd.service_exists('test.service')
+ assert (tmppath / 'test.service').is_file()
+
+ systemd.uninstall_service('test.service')
+ assert not (tmppath / 'test.service').is_file()
+ assert not systemd.service_exists('test.service')
+
+
+@mock.patch('subprocess.run')
+def test_systemd_calls_system_binary(subprocess_run, tmppath):
+ systemd = SystemdServiceManager(tmppath)
+
+ keyword_pairs = [
+ (systemd.enable_service, 'enable'),
+ (systemd.disable_service, 'disable'),
+ (systemd.start_service, 'start'),
+ (systemd.stop_service, 'stop'),
+ (systemd.service_is_running, 'is-active'),
+ ]
+
+ # we don't want to check the exact call as this might be valid in some
+ # different forms; but we want to make sure that at least the right
+ # keywords are inside the command
+ for function, keyword in keyword_pairs:
+ function('test.service')
+ assert 'systemctl' in subprocess_run.call_args[0][0]
+ assert keyword in subprocess_run.call_args[0][0]