aetherscale

[unmaintained] code for a cloud provider tutorial
Log | Files | Refs | README | LICENSE

commit cea5d876331b128dd8df0c8cbec6fbbda89db366
parent 090cdee42031307a55306510872077afeae09df1
Author: Stefan Koch <programming@stefan-koch.name>
Date:   Sat, 12 Dec 2020 12:50:08 +0100

implement IP retrieval from guest agent

Diffstat:
Maetherscale/computing.py | 43++++++++++++++++++++++++++++++++++---------
Maetherscale/qemu.py | 84++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
Atests/conftest.py | 22++++++++++++++++++++++
Mtests/test_qemu.py | 47++++++++++++++++++++++++++++++++++++-----------
4 files changed, 171 insertions(+), 25 deletions(-)

diff --git a/aetherscale/computing.py b/aetherscale/computing.py @@ -36,10 +36,6 @@ QUEUE_COMMANDS_MAP = { logging.basicConfig(level=LOG_LEVEL) -class QemuException(Exception): - pass - - def user_image_path(vm_id: str) -> Path: return USER_IMAGE_FOLDER / f'{vm_id}.qcow2' @@ -48,6 +44,10 @@ def qemu_socket_monitor(vm_id: str) -> Path: return Path(f'/tmp/aetherscale-qmp-{vm_id}.sock') +def qemu_socket_guest_agent(vm_id: str) -> Path: + return Path(f'/tmp/aetherscale-qga-{vm_id}.sock') + + def create_user_image(vm_id: str, image_name: str) -> Path: base_image = BASE_IMAGE_FOLDER / f'{image_name}.qcow2' if not base_image.is_file(): @@ -59,17 +59,35 @@ def create_user_image(vm_id: str, image_name: str) -> Path: 'qemu-img', 'create', '-f', 'qcow2', '-b', str(base_image.absolute()), '-F', 'qcow2', str(user_image)]) if create_img_result.returncode != 0: - raise QemuException(f'Could not create image for VM "{vm_id}"') + raise qemu.QemuException(f'Could not create image for VM "{vm_id}"') return user_image -def list_vms(_: Dict[str, Any]) -> List[str]: +def list_vms(_: Dict[str, Any]) -> List[Dict[str, Any]]: vms = [] for proc in psutil.process_iter(['pid', 'name']): if proc.name().startswith('vm-'): - vms.append(proc.name()) + vm_id = proc.name()[3:] + + socket_file = qemu_socket_guest_agent(vm_id) + hint = None + ip_addresses = [] + try: + fetcher = qemu.GuestAgentIpAddress(socket_file) + ip_addresses = fetcher.fetch_ip_addresses() + except qemu.QemuException: + hint = 'Could not retrieve IP address for guest' + + msg = { + 'vm-id': vm_id, + 'ip-addresses': ip_addresses, + } + if hint: + msg['hint'] = hint + + vms.append(msg) return vms @@ -86,7 +104,7 @@ def create_vm(options: Dict[str, Any]) -> Dict[str, str]: try: user_image = create_user_image(vm_id, image_name) - except (OSError, QemuException): + except (OSError, qemu.QemuException): raise mac_addr = interfaces.create_mac_address() @@ -228,11 +246,18 @@ def create_qemu_systemd_unit( qemu_monitor_path = qemu_socket_monitor(qemu_config.vm_id) socket_quoted = shlex.quote(f'unix:{qemu_monitor_path},server,nowait') + qga_monitor_path = qemu_socket_guest_agent(qemu_config.vm_id) + qga_chardev_quoted = shlex.quote( + f'socket,path={qga_monitor_path},server,nowait,id=qga0') + command = f'qemu-system-x86_64 -m 4096 -accel kvm -hda {hda_quoted} ' \ f'-device {device_quoted} -netdev {netdev_quoted} ' \ f'-name {name_quoted} ' \ '-nographic ' \ - f'-qmp {socket_quoted}' + f'-qmp {socket_quoted} ' \ + f'-chardev {qga_chardev_quoted} ' \ + '-device virtio-serial ' \ + '-device virtserialport,chardev=qga0,name=org.qemu.guest_agent.0' with tempfile.NamedTemporaryFile(mode='w+t', delete=False) as f: f.write('[Unit]\n') diff --git a/aetherscale/qemu.py b/aetherscale/qemu.py @@ -1,28 +1,102 @@ +import enum +import logging import json from pathlib import Path +import random import socket -from typing import Any +from typing import Any, Dict, Optional + + +class QemuException(Exception): + pass + + +class QemuProtocol(enum.Enum): + QMP = enum.auto() + QGA = enum.auto() class QemuMonitor: # TODO: Improve QMP communication, spec is here: # https://github.com/qemu/qemu/blob/master/docs/interop/qmp-spec.txt - def __init__(self, socket_file: Path): + def __init__( + self, socket_file: Path, protocol: QemuProtocol, + timeout: Optional[float] = None): self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock.connect(str(socket_file)) self.f = self.sock.makefile('rw') + self.protocol = protocol + + if timeout: + self.sock.settimeout(timeout) # Initialize connection immediately self._initialize() - def execute(self, command: str) -> Any: - json_line = json.dumps({'execute': command}) + '\r\n' + def execute( + self, command: str, + arguments: Optional[Dict[str, Any]] = None) -> Any: + message = {'execute': command} + if arguments: + message['arguments'] = arguments + + json_line = json.dumps(message) + '\r\n' + logging.debug(f'Sending message to QEMU: {json_line}') self.sock.sendall(json_line.encode('utf-8')) - return json.loads(self.f.readline()) + return json.loads(self.readline()) def _initialize(self): + if self.protocol == QemuProtocol.QMP: + self._initialize_qmp() + elif self.protocol == QemuProtocol.QGA: + self._initialize_guest_agent() + else: + raise ValueError('Unknown QemuProtocol') + + def _initialize_qmp(self): # Read the capabilities self.f.readline() # Acknowledge the QMP capability negotiation self.execute('qmp_capabilities') + + def _initialize_guest_agent(self): + # make the server flush partial JSON from previous connections + prepend_byte = b'\xff' + self.sock.sendall(prepend_byte) + + rand_int = random.randint(100000, 1000000) + self.execute('guest-sync', {'id': rand_int}) + + return json.loads(self.readline()) + + def readline(self) -> Any: + try: + logging.debug('Waiting for message from QEMU') + data = self.f.readline() + logging.debug(f'Received message from QEMU: {data}') + return data + except socket.timeout: + raise QemuException( + 'Could not communicate with QEMU, is QMP server or GA running?') + + +class GuestAgentIpAddress: + def __init__(self, socket_file: Path, timeout: float = 1): + self.comm_channel = QemuMonitor(socket_file, QemuProtocol.QGA, timeout) + + def fetch_ip_addresses(self): + resp = self.comm_channel.execute('guest-network-get-interfaces') + return self._parse_ips_from_response(resp) + + def _parse_ips_from_response(self, response): + ips = [] + + try: + for interface in response['return']: + for address in interface['ip-addresses']: + ips.append(address['ip-address']) + + return ips + except KeyError: + return [] diff --git a/tests/conftest.py b/tests/conftest.py @@ -0,0 +1,22 @@ +from contextlib import contextmanager +import pytest +import signal + + +@pytest.fixture +def timeout(): + """Run a block of code with a specified timeout. If the block is not + finished after the defined time, raise an exception.""" + def raise_exception(signum, frame): + raise TimeoutError + + @contextmanager + def timeout_function(seconds: int): + try: + signal.signal(signal.SIGALRM, raise_exception) + signal.alarm(seconds) + yield None + finally: + signal.signal(signal.SIGALRM, signal.SIG_IGN) + + return timeout_function diff --git a/tests/test_qemu.py b/tests/test_qemu.py @@ -1,24 +1,27 @@ import contextlib import json from pathlib import Path +import pytest +import signal import socket import tempfile import threading import uuid -from aetherscale.qemu import QemuMonitor +from aetherscale.qemu import QemuMonitor, QemuProtocol class MockQemuServer: - init_msg = {"QMP": {"version": {"qemu": { + qmp_init_msg = {"QMP": {"version": {"qemu": { "micro": 0, "minor": 6, "major": 1 }, "package": ""}, "capabilities": []}} mock_ok_response = {'return': {}} - def __init__(self, socket_file: str): + def __init__(self, socket_file: str, protocol: QemuProtocol): self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self._socket_file = socket_file self.received_executes = [] + self.protocol = protocol def __enter__(self): self._sock.bind(self._socket_file) @@ -33,7 +36,8 @@ class MockQemuServer: conn, addr = self._sock.accept() filelike = conn.makefile('rb') - self._send_message(self.init_msg, conn) + if self.protocol == QemuProtocol.QMP: + self._send_message(self.qmp_init_msg, conn) try: while True: @@ -41,10 +45,18 @@ class MockQemuServer: self.received_executes.append(msg['execute']) # for now always return with OK status - self._send_message(self.mock_ok_response, conn) - except json.JSONDecodeError as e: + response = self._build_response(msg) + self._send_message(response, conn) + except json.JSONDecodeError: conn.close() + def _build_response(self, message): + if self.protocol == QemuProtocol.QGA: + if message['execute'] == 'guest-sync': + return {'return': message['arguments']['id']} + + return self.mock_ok_response + def _send_message(self, message, conn): msg_with_newline = json.dumps(message) + '\r\n' conn.send(msg_with_newline.encode('ascii')) @@ -55,8 +67,9 @@ class MockQemuServer: @contextlib.contextmanager -def run_mock_qemu_server(socket_file: str) -> MockQemuServer: - with MockQemuServer(socket_file) as mock_server: +def run_mock_qemu_server( + socket_file: str, protocol: QemuProtocol) -> MockQemuServer: + with MockQemuServer(socket_file, protocol) as mock_server: t = threading.Thread(target=mock_server.listen) t.daemon = True t.start() @@ -64,8 +77,20 @@ def run_mock_qemu_server(socket_file: str) -> MockQemuServer: def test_initializes_with_capabilities_acceptance(): - socket_file = Path(tempfile.gettempdir()) / str(uuid.uuid4()) + sock_file = Path(tempfile.gettempdir()) / str(uuid.uuid4()) - with run_mock_qemu_server(str(socket_file)) as mock_server: - QemuMonitor(socket_file) + with run_mock_qemu_server(str(sock_file), QemuProtocol.QMP) as mock_server: + QemuMonitor(sock_file, QemuProtocol.QMP) assert 'qmp_capabilities' in mock_server.received_executes + + +def test_timeout(timeout): + sock_file = Path(tempfile.gettempdir()) / str(uuid.uuid4()) + # A QMP protocol client on a Guest Agent server will have to timeout, + # because it expects to receive a welcome capabilities message from the + # server + + with run_mock_qemu_server(str(sock_file), QemuProtocol.QGA) as mock_server: + with timeout(1): # if function does not finish after 1s, error-out + with pytest.raises(socket.timeout): + QemuMonitor(sock_file, QemuProtocol.QMP, timeout=0.1)