aetherscale

[unmaintained] code for a cloud provider tutorial
Log | Files | Refs | README | LICENSE

test_qemu.py (3086B)


      1 import contextlib
      2 import json
      3 from pathlib import Path
      4 import pytest
      5 import socket
      6 import tempfile
      7 import threading
      8 import uuid
      9 
     10 from aetherscale.qemu.runtime import QemuMonitor, QemuProtocol
     11 
     12 
     13 class MockQemuServer:
     14     qmp_init_msg = {"QMP": {"version": {"qemu": {
     15         "micro": 0, "minor": 6, "major": 1
     16     }, "package": ""}, "capabilities": []}}
     17     mock_ok_response = {'return': {}}
     18 
     19     def __init__(self, socket_file: str, protocol: QemuProtocol):
     20         self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
     21         self._socket_file = socket_file
     22         self.received_executes = []
     23         self.protocol = protocol
     24 
     25     def __enter__(self):
     26         self._sock.bind(self._socket_file)
     27         return self
     28 
     29     def __exit__(self, exc_type, exc_val, exc_tb):
     30         self._sock.close()
     31 
     32     def listen(self):
     33         self._sock.listen()
     34 
     35         conn, addr = self._sock.accept()
     36         filelike = conn.makefile('rb')
     37 
     38         if self.protocol == QemuProtocol.QMP:
     39             self._send_message(self.qmp_init_msg, conn)
     40 
     41         try:
     42             while True:
     43                 msg = self._recv_message(filelike)
     44                 self.received_executes.append(msg['execute'])
     45 
     46                 # for now always return with OK status
     47                 response = self._build_response(msg)
     48                 self._send_message(response, conn)
     49         except json.JSONDecodeError:
     50             conn.close()
     51 
     52     def _build_response(self, message):
     53         if self.protocol == QemuProtocol.QGA:
     54             if message['execute'] == 'guest-sync':
     55                 return {'return': message['arguments']['id']}
     56 
     57         return self.mock_ok_response
     58 
     59     def _send_message(self, message, conn):
     60         msg_with_newline = json.dumps(message) + '\r\n'
     61         conn.send(msg_with_newline.encode('ascii'))
     62 
     63     def _recv_message(self, filelike):
     64         line = filelike.readline()
     65         return json.loads(line.decode('utf-8'))
     66 
     67 
     68 @contextlib.contextmanager
     69 def run_mock_qemu_server(
     70         socket_file: str, protocol: QemuProtocol) -> MockQemuServer:
     71     with MockQemuServer(socket_file, protocol) as mock_server:
     72         t = threading.Thread(target=mock_server.listen)
     73         t.daemon = True
     74         t.start()
     75         yield mock_server
     76 
     77 
     78 def test_initializes_with_capabilities_acceptance():
     79     sock_file = Path(tempfile.gettempdir()) / str(uuid.uuid4())
     80 
     81     with run_mock_qemu_server(str(sock_file), QemuProtocol.QMP) as mock_server:
     82         QemuMonitor(sock_file, QemuProtocol.QMP)
     83         assert 'qmp_capabilities' in mock_server.received_executes
     84 
     85 
     86 def test_timeout(timeout):
     87     sock_file = Path(tempfile.gettempdir()) / str(uuid.uuid4())
     88     # A QMP protocol client on a Guest Agent server will have to timeout,
     89     # because it expects to receive a welcome capabilities message from the
     90     # server
     91 
     92     with run_mock_qemu_server(str(sock_file), QemuProtocol.QGA) as mock_server:
     93         with timeout(1):  # if function does not finish after 1s, error-out
     94             with pytest.raises(socket.timeout):
     95                 QemuMonitor(sock_file, QemuProtocol.QMP, timeout=0.1)