aetherscale

[unmaintained] code for a cloud provider tutorial
Log | Files | Refs | README | LICENSE

runtime.py (3492B)


      1 from dataclasses import dataclass
      2 import enum
      3 import logging
      4 import json
      5 from pathlib import Path
      6 import random
      7 import socket
      8 from typing import Any, Dict, Optional, List
      9 
     10 from aetherscale.qemu.exceptions import QemuException
     11 
     12 
     13 class QemuInterfaceType(enum.Enum):
     14     TAP = enum.auto()
     15     VDE = enum.auto()
     16 
     17 
     18 @dataclass
     19 class QemuInterfaceConfig:
     20     mac_address: str
     21     type: QemuInterfaceType
     22     vde_folder: Optional[Path] = None
     23     tap_device: Optional[str] = None
     24 
     25 
     26 @dataclass
     27 class QemuStartupConfig:
     28     vm_id: str
     29     hda_image: Path
     30     interfaces: List[QemuInterfaceConfig]
     31 
     32 
     33 class QemuProtocol(enum.Enum):
     34     QMP = enum.auto()
     35     QGA = enum.auto()
     36 
     37 
     38 class QemuMonitor:
     39     # TODO: Improve QMP communication, spec is here:
     40     # https://github.com/qemu/qemu/blob/master/docs/interop/qmp-spec.txt
     41     def __init__(
     42             self, socket_file: Path, protocol: QemuProtocol,
     43             timeout: Optional[float] = None):
     44         self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
     45         self.sock.connect(str(socket_file))
     46         self.f = self.sock.makefile('rw')
     47         self.protocol = protocol
     48 
     49         if timeout:
     50             self.sock.settimeout(timeout)
     51 
     52         # Initialize connection immediately
     53         self._initialize()
     54 
     55     def execute(
     56             self, command: str,
     57             arguments: Optional[Dict[str, Any]] = None) -> Any:
     58         message = {'execute': command}
     59         if arguments:
     60             message['arguments'] = arguments
     61 
     62         json_line = json.dumps(message) + '\r\n'
     63         logging.debug(f'Sending message to QEMU: {json_line}')
     64         self.sock.sendall(json_line.encode('utf-8'))
     65         return json.loads(self.readline())
     66 
     67     def _initialize(self):
     68         if self.protocol == QemuProtocol.QMP:
     69             self._initialize_qmp()
     70         elif self.protocol == QemuProtocol.QGA:
     71             self._initialize_guest_agent()
     72         else:
     73             raise ValueError('Unknown QemuProtocol')
     74 
     75     def _initialize_qmp(self):
     76         # Read the capabilities
     77         self.f.readline()
     78 
     79         # Acknowledge the QMP capability negotiation
     80         self.execute('qmp_capabilities')
     81 
     82     def _initialize_guest_agent(self):
     83         # make the server flush partial JSON from previous connections
     84         prepend_byte = b'\xff'
     85         self.sock.sendall(prepend_byte)
     86 
     87         rand_int = random.randint(100000, 1000000)
     88         self.execute('guest-sync', {'id': rand_int})
     89 
     90         return json.loads(self.readline())
     91 
     92     def readline(self) -> Any:
     93         try:
     94             logging.debug('Waiting for message from QEMU')
     95             data = self.f.readline()
     96             logging.debug(f'Received message from QEMU: {data}')
     97             return data
     98         except socket.timeout:
     99             raise QemuException(
    100                 'Could not communicate with QEMU, is QMP server or GA running?')
    101 
    102 
    103 class GuestAgentIpAddress:
    104     def __init__(self, socket_file: Path, timeout: float = 1):
    105         self.comm_channel = QemuMonitor(socket_file, QemuProtocol.QGA, timeout)
    106 
    107     def fetch_ip_addresses(self):
    108         resp = self.comm_channel.execute('guest-network-get-interfaces')
    109         return self._parse_ips_from_response(resp)
    110 
    111     def _parse_ips_from_response(self, response):
    112         ips = []
    113 
    114         try:
    115             for interface in response['return']:
    116                 for address in interface['ip-addresses']:
    117                     ips.append(address['ip-address'])
    118 
    119             return ips
    120         except KeyError:
    121             return []