runtime.py (3492B)
1 from dataclasses import dataclass 2 import enum 3 import logging 4 import json 5 from pathlib import Path 6 import random 7 import socket 8 from typing import Any, Dict, Optional, List 9 10 from aetherscale.qemu.exceptions import QemuException 11 12 13 class QemuInterfaceType(enum.Enum): 14 TAP = enum.auto() 15 VDE = enum.auto() 16 17 18 @dataclass 19 class QemuInterfaceConfig: 20 mac_address: str 21 type: QemuInterfaceType 22 vde_folder: Optional[Path] = None 23 tap_device: Optional[str] = None 24 25 26 @dataclass 27 class QemuStartupConfig: 28 vm_id: str 29 hda_image: Path 30 interfaces: List[QemuInterfaceConfig] 31 32 33 class QemuProtocol(enum.Enum): 34 QMP = enum.auto() 35 QGA = enum.auto() 36 37 38 class QemuMonitor: 39 # TODO: Improve QMP communication, spec is here: 40 # https://github.com/qemu/qemu/blob/master/docs/interop/qmp-spec.txt 41 def __init__( 42 self, socket_file: Path, protocol: QemuProtocol, 43 timeout: Optional[float] = None): 44 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 45 self.sock.connect(str(socket_file)) 46 self.f = self.sock.makefile('rw') 47 self.protocol = protocol 48 49 if timeout: 50 self.sock.settimeout(timeout) 51 52 # Initialize connection immediately 53 self._initialize() 54 55 def execute( 56 self, command: str, 57 arguments: Optional[Dict[str, Any]] = None) -> Any: 58 message = {'execute': command} 59 if arguments: 60 message['arguments'] = arguments 61 62 json_line = json.dumps(message) + '\r\n' 63 logging.debug(f'Sending message to QEMU: {json_line}') 64 self.sock.sendall(json_line.encode('utf-8')) 65 return json.loads(self.readline()) 66 67 def _initialize(self): 68 if self.protocol == QemuProtocol.QMP: 69 self._initialize_qmp() 70 elif self.protocol == QemuProtocol.QGA: 71 self._initialize_guest_agent() 72 else: 73 raise ValueError('Unknown QemuProtocol') 74 75 def _initialize_qmp(self): 76 # Read the capabilities 77 self.f.readline() 78 79 # Acknowledge the QMP capability negotiation 80 self.execute('qmp_capabilities') 81 82 def _initialize_guest_agent(self): 83 # make the server flush partial JSON from previous connections 84 prepend_byte = b'\xff' 85 self.sock.sendall(prepend_byte) 86 87 rand_int = random.randint(100000, 1000000) 88 self.execute('guest-sync', {'id': rand_int}) 89 90 return json.loads(self.readline()) 91 92 def readline(self) -> Any: 93 try: 94 logging.debug('Waiting for message from QEMU') 95 data = self.f.readline() 96 logging.debug(f'Received message from QEMU: {data}') 97 return data 98 except socket.timeout: 99 raise QemuException( 100 'Could not communicate with QEMU, is QMP server or GA running?') 101 102 103 class GuestAgentIpAddress: 104 def __init__(self, socket_file: Path, timeout: float = 1): 105 self.comm_channel = QemuMonitor(socket_file, QemuProtocol.QGA, timeout) 106 107 def fetch_ip_addresses(self): 108 resp = self.comm_channel.execute('guest-network-get-interfaces') 109 return self._parse_ips_from_response(resp) 110 111 def _parse_ips_from_response(self, response): 112 ips = [] 113 114 try: 115 for interface in response['return']: 116 for address in interface['ip-addresses']: 117 ips.append(address['ip-address']) 118 119 return ips 120 except KeyError: 121 return []