commit fe3431155642dbd3d34851ac348e3de1e011cb57
parent 2527a963170db1df49ab1ea5db244b609ef7eac0
Author: Stefan Koch <programming@stefan-koch.name>
Date: Thu, 10 Dec 2020 15:34:44 +0100
change rabbitmq architecture to allow multi-host
Diffstat:
3 files changed, 46 insertions(+), 21 deletions(-)
diff --git a/aetherscale/client.py b/aetherscale/client.py
@@ -6,18 +6,18 @@ import pika
import pika.exceptions
import sys
+from .config import RABBITMQ_HOST
+
+
+EXCHANGE_NAME = 'computing'
-class ServerCommunication:
- def __init__(self):
- self.queue = 'vm-queue'
+class ServerCommunication:
def __enter__(self):
self.connection = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost'))
+ pika.ConnectionParameters(host=RABBITMQ_HOST))
self.channel = self.connection.channel()
- self.channel.queue_declare(queue=self.queue)
-
self.channel.basic_consume(
queue='amq.rabbitmq.reply-to',
on_message_callback=self.on_response,
@@ -43,8 +43,8 @@ class ServerCommunication:
reply_to = 'amq.rabbitmq.reply-to'
self.channel.basic_publish(
- exchange='',
- routing_key=self.queue,
+ exchange=EXCHANGE_NAME,
+ routing_key=data['command'],
properties=pika.BasicProperties(
reply_to=reply_to,
content_type='application/json',
diff --git a/aetherscale/computing.py b/aetherscale/computing.py
@@ -17,23 +17,21 @@ from typing import List, Optional, Dict, Any, Callable
from . import interfaces
from . import execution
from . import qemu
+from .config import RABBITMQ_HOST
-# TODO: Since this is not a command line interface file anymore, switch to
-# logging from print
-# Non-VDE networking is deprecated and should not be used anymore
VDE_FOLDER = '/tmp/vde.ctl'
VDE_TAP_INTERFACE = 'tap-vde'
-QUEUE_NAME = 'vm-queue'
BASE_IMAGE_FOLDER = Path('base_images')
USER_IMAGE_FOLDER = Path('user_images')
-connection = pika.BlockingConnection(
- pika.ConnectionParameters(host='localhost'))
-channel = connection.channel()
-
-channel.queue_declare(queue=QUEUE_NAME)
+EXCHANGE_NAME = 'computing'
+COMPETING_QUEUE = 'computing-competing'
+QUEUE_COMMANDS_MAP = {
+ '': ['list-vms', 'start-vm', 'stop-vm', 'delete-vm'],
+ COMPETING_QUEUE: ['create-vm'],
+}
class QemuException(Exception):
@@ -307,14 +305,36 @@ def callback(ch, method, properties, body):
def run():
- channel.basic_consume(queue=QUEUE_NAME, on_message_callback=callback)
+ connection = pika.BlockingConnection(
+ pika.ConnectionParameters(host=RABBITMQ_HOST))
+ channel = connection.channel()
+
+ channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type='direct')
+
+ # let rabbitmq define a name for the exclusive queue
+ result = channel.queue_declare(queue='', exclusive=True)
+ exclusive_queue_name = result.method.queue
+ # setup one queue that is shared by all consumers
+ channel.queue_declare(queue=COMPETING_QUEUE)
+
+ for queue, commands in QUEUE_COMMANDS_MAP.items():
+ if queue == '':
+ queue = exclusive_queue_name
+
+ for command in commands:
+ channel.queue_bind(
+ exchange=EXCHANGE_NAME, queue=queue, routing_key=command)
+
+ channel.basic_consume(
+ queue=exclusive_queue_name, on_message_callback=callback)
+ channel.basic_consume(
+ queue=COMPETING_QUEUE, on_message_callback=callback)
# a TAP interface for VDE must already have been created
if not interfaces.check_device_existence(VDE_TAP_INTERFACE):
- print(
+ logging.error(
f'Interface {VDE_TAP_INTERFACE} does not exist. '
- 'Please create it manually and then start this service again',
- file=sys.stderr)
+ 'Please create it manually and then start this service again')
sys.exit(1)
logging.info('Bringing up VDE networking')
diff --git a/aetherscale/config.py b/aetherscale/config.py
@@ -0,0 +1,4 @@
+import os
+
+
+RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', default='localhost')+
\ No newline at end of file