commit 8736a5ea5d048752c5c46dc24004e1d4310fb586
Author: Stefan Koch <programming@stefan-koch.name>
Date: Sat, 28 Nov 2020 11:19:54 +0100
create a Python package from blog results
Diffstat:
12 files changed, 495 insertions(+), 0 deletions(-)
diff --git a/.gitignore b/.gitignore
@@ -0,0 +1,8 @@
+base_images
+user_images
+
+build
+dist
+env
+__pycache__
+*.egg-info
diff --git a/README.md b/README.md
@@ -0,0 +1,83 @@
+# aetherscale
+
+aetherscale is small hobby project to create a hosting environment that can
+be controlled via an HTTP API. I just want to have some fun and
+dive deeper into Linux tooling (networking, virtualization and so on) and
+into distributed applications. I do not think that this will become
+production-ready at any point.
+
+This is developed along with
+[a blog tutorial series about scalable computing](https://blog.stefan-koch.name/2020/11/22/programming-cloud-hosting-python-rabbitmq-qemu)
+which I am currently writing.
+
+## Installation
+
+You can install the package with:
+
+```bash
+git clone https://github.com/aufziehvogel/aetherscale
+cd aetherscale
+virtualenv venv && source venv/bin/activate
+pip install -e .
+```
+
+## Usage
+
+The server can be started with:
+
+```bash
+aetherscale
+```
+
+For example, to list all running VMs tun the following client command:
+
+```bash
+aetherscale-cli list-vms
+```
+
+
+## Overview
+
+Components which I think would be interesting to develop are:
+
+- Firewall (probably nftables, so that I can learn nftables)
+- Virtual Private Networks (probably tinc)
+- Virtual Servers (probably qemu)
+ - IPv6-only intranet to learn IPv6
+
+## Architecture
+
+My idea is that all requests to the system go through a central message
+broker. Handlers will then pick up these tasks and perform the work.
+
+Each request can have the name of a unique channel for responses. The sender
+of a message can open a channel with this name on the broker and will receive
+responses. This is useful if you have to wait until another component has
+performed their work.
+
+### Messages
+
+Create a new machine:
+
+```json
+{
+ "component": "computing",
+ "task": "create-vm",
+ "response-channel": "unique-channel-123456789",
+ "options": {
+ "image": "my-image",
+ "virtual-network": "my-virtual-subnet",
+ "public-ip": true,
+ }
+}
+```
+
+### Computing
+
+Stuff I use for computing (and thus have learnt something about so far):
+
+- Qemu
+- software bridging with `ip` (for public and private IPs)
+ - VDE could also be relevant, but currently out of scope
+- layer-2 VPN with tinc
+- `libguestfs` for analyzing and changing images
diff --git a/aetherscale-client-runner.py b/aetherscale-client-runner.py
@@ -0,0 +1,7 @@
+#!/usr/bin/env python
+
+from aetherscale import client
+
+
+if __name__ == '__main__':
+ client.main()
diff --git a/aetherscale-runner.py b/aetherscale-runner.py
@@ -0,0 +1,7 @@
+#!/usr/bin/env python
+
+from aetherscale import server
+
+
+if __name__ == '__main__':
+ server.main()
diff --git a/aetherscale/__init__.py b/aetherscale/__init__.py
@@ -0,0 +1 @@
+__version__ = '0.0.1'
diff --git a/aetherscale/__main__.py b/aetherscale/__main__.py
@@ -0,0 +1,4 @@
+from .minikloud import main
+
+if __name__ == '__main__':
+ main()
diff --git a/aetherscale/client.py b/aetherscale/client.py
@@ -0,0 +1,108 @@
+#!/usr/bin/env python
+
+import argparse
+import json
+import pika
+import sys
+
+
+class ServerCommunication:
+ def __init__(self):
+ self.queue = 'vm-queue'
+
+ def __enter__(self):
+ self.connection = pika.BlockingConnection(
+ pika.ConnectionParameters(host='localhost'))
+ self.channel = self.connection.channel()
+
+ self.channel.queue_declare(queue=self.queue)
+
+ self.channel.basic_consume(
+ queue='amq.rabbitmq.reply-to',
+ on_message_callback=self.on_response,
+ auto_ack=True)
+
+ return self
+
+ def on_response(self, ch, method, properties, body):
+ self.responses.append(json.loads(body))
+
+ # TODO: Stopping consuming on the first message only works
+ # as long as we only expect one message
+ self.channel.stop_consuming()
+
+ def on_timeout(self):
+ self.channel.stop_consuming()
+
+ def send_msg(self, data, response_expected=False):
+ self.responses = []
+
+ reply_to = None
+ if response_expected:
+ reply_to = 'amq.rabbitmq.reply-to'
+
+ self.channel.basic_publish(
+ exchange='',
+ routing_key=self.queue,
+ properties=pika.BasicProperties(
+ reply_to=reply_to,
+ content_type='application/json',
+ ),
+ body=json.dumps(data).encode('utf-8'))
+
+ if response_expected:
+ self.connection.call_later(5, self.on_timeout)
+ self.channel.start_consuming()
+
+ return self.responses
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.connection.close()
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description='Manage aetherscale instances')
+ subparsers = parser.add_subparsers(dest='subparser_name')
+ create_vm_parser = subparsers.add_parser('start-vm')
+ create_vm_parser.add_argument(
+ '--image', help='Name of the image to start', required=True)
+ create_vm_parser = subparsers.add_parser('stop-vm')
+ create_vm_parser.add_argument(
+ '--vm-id', dest='vm_id', help='ID of the VM to stop', required=True)
+ subparsers.add_parser('list-vms')
+ args = parser.parse_args()
+
+ if args.subparser_name == 'list-vms':
+ response_expected = True
+ data = {
+ 'command': 'list-vms',
+ }
+ elif args.subparser_name == 'start-vm':
+ response_expected = False
+ data = {
+ 'command': 'start-vm',
+ 'options': {
+ 'image': args.image,
+ }
+ }
+ elif args.subparser_name == 'stop-vm':
+ response_expected = True
+ data = {
+ 'command': 'stop-vm',
+ 'options': {
+ 'kill': True,
+ 'vm-id': args.vm_id,
+ }
+ }
+ else:
+ print('Command does not exist', file=sys.stderr)
+ sys.exit(1)
+
+ with ServerCommunication() as c:
+ result = c.send_msg(data, response_expected)
+ print(result)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/aetherscale/computing.py b/aetherscale/computing.py
@@ -0,0 +1,150 @@
+import json
+import os
+from pathlib import Path
+import pika
+import psutil
+import random
+import string
+import subprocess
+import sys
+from typing import List, Optional
+
+from . import interfaces
+
+
+QUEUE_NAME = 'vm-queue'
+BASE_IMAGE_FOLDER = Path('base_images')
+USER_IMAGE_FOLDER = Path('user_images')
+
+run_qemu_username = os.getenv('RUN_QEMU_AS')
+
+connection = pika.BlockingConnection(
+ pika.ConnectionParameters(host='localhost'))
+channel = connection.channel()
+
+channel.queue_declare(queue=QUEUE_NAME)
+
+
+class QemuException(Exception):
+ pass
+
+
+def create_user_image(vm_id: str, image_name: str) -> Path:
+ base_image = BASE_IMAGE_FOLDER / f'{image_name}.qcow2'
+ if not base_image.is_file():
+ raise IOError(f'Image "{image_name}" does not exist')
+
+ user_image = USER_IMAGE_FOLDER / f'{vm_id}.qcow2'
+
+ create_img_result = subprocess.run([
+ 'qemu-img', 'create', '-f', 'qcow2',
+ '-b', str(base_image.absolute()), '-F', 'qcow2', str(user_image)])
+ if create_img_result.returncode != 0:
+ raise QemuException(f'Could not create image for VM "{vm_id}"')
+
+ return user_image
+
+
+def list_vms() -> List[str]:
+ vms = []
+
+ for proc in psutil.process_iter(['pid', 'name']):
+ if proc.name().startswith('vm-'):
+ vms.append(proc.name())
+
+ return vms
+
+
+def get_process_for_vm(vm_id: str) -> Optional[psutil.Process]:
+ for proc in psutil.process_iter(['name']):
+ if proc.name() == vm_id:
+ return proc
+
+ return None
+
+
+def callback(ch, method, properties, body):
+ message = body.decode('utf-8')
+ print('Received message: ' + message)
+
+ data = json.loads(message)
+ response = None
+
+ if 'command' not in data:
+ return
+ elif data['command'] == 'list-vms':
+ response = list_vms()
+ elif data['command'] == 'stop-vm':
+ try:
+ vm_id = data['options']['vm-id']
+ except KeyError:
+ print('VM ID not specified', file=sys.stderr)
+ return
+
+ process = get_process_for_vm(vm_id)
+ if process:
+ process.kill()
+ response = {
+ 'status': 'killed',
+ 'vm-id': vm_id,
+ }
+ else:
+ response = {
+ 'status': 'error',
+ 'reason': f'VM "{vm_id}" does not exist',
+ }
+ elif data['command'] == 'start-vm':
+ vm_id = ''.join(
+ random.choice(string.ascii_lowercase) for i in range(8))
+ print(f'Starting VM "{vm_id}"')
+
+ try:
+ image_name = os.path.basename(data['options']['image'])
+ except KeyError:
+ print('Image not specified', file=sys.stderr)
+ return
+
+ try:
+ user_image = create_user_image(vm_id, image_name)
+ except (OSError, QemuException) as e:
+ print(str(e), file=sys.stderr)
+ return
+
+ tap_device = f'vm-{vm_id}'
+ if not interfaces.create_tap_device(
+ tap_device, 'br0', run_qemu_username):
+ print(f'Could not create tap device for VM "{vm_id}"',
+ file=sys.stderr)
+ return
+
+ mac_addr = interfaces.create_mac_address()
+ print(f'Assigning MAC address "{mac_addr}" to VM "{vm_id}"')
+
+ p = subprocess.Popen([
+ 'qemu-system-x86_64', '-m', '4096', '-hda', str(user_image),
+ '-device', f'virtio-net-pci,netdev=pubnet,mac={mac_addr}',
+ '-netdev', f'tap,id=pubnet,ifname={tap_device},script=no,downscript=no',
+ '-name', f'qemu-vm-{vm_id},process=vm-{vm_id}',
+ ])
+ print(f'Started VM "{vm_id}" as process ID {p.pid}')
+
+ ch.basic_ack(delivery_tag=method.delivery_tag)
+
+ if response is not None and properties.reply_to:
+ ch.basic_publish(
+ exchange='',
+ routing_key=properties.reply_to,
+ properties=pika.BasicProperties(
+ correlation_id=properties.correlation_id
+ ),
+ body=json.dumps(response))
+
+
+def run():
+ channel.basic_consume(queue=QUEUE_NAME, on_message_callback=callback)
+
+ if not interfaces.check_device_existence('br0'):
+ interfaces.init_bridge(
+ 'br0', 'enp0s25', '192.168.2.10/24', '192.168.2.1')
+
+ channel.start_consuming()
diff --git a/aetherscale/execution.py b/aetherscale/execution.py
@@ -0,0 +1,14 @@
+import logging
+import subprocess
+from typing import List
+
+
+def run_command_chain(commands: List[List[str]]) -> bool:
+ for command in commands:
+ logging.debug(f'Running command: {" ".join(command)}')
+ result = subprocess.run(command)
+
+ if result.returncode != 0:
+ return False
+
+ return True
diff --git a/aetherscale/interfaces.py b/aetherscale/interfaces.py
@@ -0,0 +1,74 @@
+import logging
+import random
+import subprocess
+from typing import Optional
+
+from . import execution
+
+
+class NetworkException(Exception):
+ pass
+
+
+def check_device_existence(device: str) -> bool:
+ # if ip link show dev [devicename] does not find [devicename], it will
+ # write a message to stderr, but none to stdout
+ result = subprocess.run(
+ ['ip', 'link', 'show', 'dev', device], stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL)
+
+ if result.stdout:
+ return True
+ else:
+ return False
+
+
+def init_bridge(
+ bridge_device: str, phys_device: str, ip: Optional[str],
+ gateway: Optional[str]) -> bool:
+ if check_device_existence(bridge_device):
+ logging.debug(
+ f'Device {bridge_device} already exists, will not re-create')
+ return True
+ else:
+ logging.debug(f'Creating bridge device {bridge_device}')
+
+ commands = [
+ ['ip', 'link', 'add', bridge_device, 'type', 'bridge'],
+ ['ip', 'link', 'set', bridge_device, 'up'],
+ ['ip', 'link', 'set', phys_device, 'up'],
+ ['ip', 'link', 'set', phys_device, 'master', bridge_device],
+ ['ip', 'addr', 'flush', 'dev', phys_device],
+ ]
+ if ip:
+ commands.append(
+ ['ip', 'addr', 'add', ip, 'dev', bridge_device])
+ if gateway:
+ commands.append(
+ ['ip', 'route', 'add', 'default',
+ 'via', gateway, 'dev', bridge_device])
+
+ return execution.run_command_chain(commands)
+
+
+def create_tap_device(
+ tap_device_name, bridge_device_name, user) -> bool:
+ creation_ok = execution.run_command_chain([
+ ['ip', 'tuntap', 'add', 'dev', tap_device_name,
+ 'mode', 'tap', 'user', user],
+ ['ip', 'link', 'set', 'dev', tap_device_name, 'up'],
+ ['ip', 'link', 'set', tap_device_name, 'master', bridge_device_name],
+ ])
+
+ return creation_ok
+
+
+def create_mac_address() -> str:
+ # Set second least significant bit of leftmost pair to 1 (local)
+ # Set least significant bit of leftmost pair to 0 (unicast)
+ mac_bits = (random.getrandbits(48) | 0x020000000000) & 0xfeffffffffff
+ mac_str = '{:012x}'.format(mac_bits)
+ return ':'.join([
+ mac_str[:2], mac_str[2:4], mac_str[4:6],
+ mac_str[6:8], mac_str[8:10], mac_str[10:],
+ ])
diff --git a/aetherscale/server.py b/aetherscale/server.py
@@ -0,0 +1,8 @@
+from . import __version__
+from .computing import run
+
+
+def main():
+ print(f'Executing aetherscale version {__version__}.')
+
+ run()
diff --git a/setup.py b/setup.py
@@ -0,0 +1,31 @@
+import re
+from setuptools import setup
+
+version = re.search(
+ r'^__version__\s*=\s*\'(.+)\'',
+ open('aetherscale/__init__.py').read(),
+ re.M).group(1)
+
+with open('README.md', 'rb') as f:
+ long_descr = f.read().decode('utf-8')
+
+install_requires = [
+ 'pika',
+ 'psutil',
+]
+
+setup(
+ name='aetherscale',
+ packages=['aetherscale'],
+ entry_points={
+ 'console_scripts': [
+ 'aetherscale=aetherscale.server:main',
+ 'aetherscale-cli=aetherscale.client:main',
+ ],
+ },
+ install_requires=install_requires,
+ version=version,
+ description='Proof-of-concept for a small cloud computing platform',
+ long_description=long_descr,
+ author='Stefan Koch',
+)