Commit 28c8e075 by Chif Gergő

Merge branch 'realtime-status' into 'DEV'

Realtime status

See merge request !23
parents eb627644 09803af9
Pipeline #1271 failed with stage
in 5 minutes 52 seconds
...@@ -20,6 +20,10 @@ python-novaclient = "*" ...@@ -20,6 +20,10 @@ python-novaclient = "*"
keystoneauth1 = "*" keystoneauth1 = "*"
django-guardian = "*" django-guardian = "*"
djoser = "*" djoser = "*"
channels = "*"
channels-redis = "*"
celery = {extras = ["redis"],version = "*"}
django-celery-beat = "*"
[requires] [requires]
python_version = "3.6" python_version = "3.6"
# Generated by Django 3.0.5 on 2020-04-27 09:53
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('instance', '0003_auto_20200408_1503'),
]
operations = [
migrations.AlterField(
model_name='basemachinedescriptor',
name='access_protocol',
field=models.CharField(choices=[('RDP', 'Remote Desktop Protocol'), ('SSH', 'Secure Shell')], help_text='The protocol which used to connect to the machinethat created from this template', max_length=50, verbose_name='remote_access_protocol'),
),
]
...@@ -7,6 +7,9 @@ from image.models import Disk ...@@ -7,6 +7,9 @@ from image.models import Disk
from interface_openstack.implementation.vm.instance import ( from interface_openstack.implementation.vm.instance import (
OSVirtualMachineManager OSVirtualMachineManager
) )
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -251,8 +254,15 @@ class Instance(BaseMachineDescriptor): ...@@ -251,8 +254,15 @@ class Instance(BaseMachineDescriptor):
def update_status(self): def update_status(self):
remote = self.get_remote_instance() remote = self.get_remote_instance()
self.status = remote.status if self.status != remote.status:
self.save() self.status = remote.status
channels = get_channel_layer()
async_to_sync(channels.group_send)(
str(self.id), {"type": "status_changed",
"status": self.status,
"vm": str(self.id)
})
self.save()
@classmethod @classmethod
def generate_password(self): def generate_password(self):
......
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'recircle.settings.base')
app = Celery('recircle')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
# app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.broker_url = 'redis://localhost:6379/0'
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
app.conf.beat_schedule = {
'polling-status-in-every-second': {
'task': 'status.tasks.poll_status',
'schedule': 1.0,
},
}
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import status.routing
application = ProtocolTypeRouter({
# Empty for now (http->django views is added by default)
'websocket': AuthMiddlewareStack(
URLRouter(
status.routing.websocket_urlpatterns
)
),
})
...@@ -43,6 +43,8 @@ INSTALLED_APPS = [ ...@@ -43,6 +43,8 @@ INSTALLED_APPS = [
"corsheaders", "corsheaders",
"guardian", "guardian",
"django_nose", "django_nose",
"channels",
"django_celery_beat"
] ]
LOCAL_APPS = [ LOCAL_APPS = [
...@@ -51,6 +53,7 @@ LOCAL_APPS = [ ...@@ -51,6 +53,7 @@ LOCAL_APPS = [
"storage", "storage",
"template", "template",
"authorization", "authorization",
"status",
] ]
INSTALLED_APPS += LOCAL_APPS INSTALLED_APPS += LOCAL_APPS
...@@ -240,3 +243,14 @@ AUTHENTICATION_BACKENDS = ( ...@@ -240,3 +243,14 @@ AUTHENTICATION_BACKENDS = (
'django.contrib.auth.backends.ModelBackend', # this is default 'django.contrib.auth.backends.ModelBackend', # this is default
'guardian.backends.ObjectPermissionBackend', 'guardian.backends.ObjectPermissionBackend',
) )
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('127.0.0.1', 6379)],
},
},
}
ASGI_APPLICATION = "recircle.routing.application"
from django.apps import AppConfig
class StatusConfig(AppConfig):
name = 'status'
# import json
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from instance.models import Instance
from rest_framework.authtoken.models import Token
import logging
import json
logger = logging.getLogger(__name__)
class StatusConsumer(WebsocketConsumer):
# Accept connection and wait for login message
def connect(self):
self.accept()
# If login message arrives get the auth token from it
# Then subscribe to the channels of the vm's that belongs to the user
def login(self, auth_token):
try:
token = Token.objects.get(key=auth_token)
self.user = token.user
instances = Instance.objects.filter(created_by=self.user)
for inst in instances:
async_to_sync(self.channel_layer.group_add)(
str(inst.id),
self.channel_name
)
except Token.DoesNotExist:
self.disconnect(100)
# Unsubscribe from registered groups
def disconnect(self, close_code):
if hasattr(self, 'user'):
instances = Instance.objects.filter(created_by=self.user)
for inst in instances:
async_to_sync(self.channel_layer.group_discard)(
str(inst.id),
self.channel_name
)
def receive(self, text_data):
text_data_json = json.loads(text_data)
type = text_data_json['type']
if type == 'login':
self.login(text_data_json['auth_token'])
def status_changed(self, event):
self.send(text_data=json.dumps({"vm": event["vm"], "status": event["status"]}))
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/instance-statuses/$', consumers.StatusConsumer),
]
from celery import shared_task
@shared_task
def poll_status():
from instance.models import Instance
for instance in Instance.objects.filter(deleted=False):
instance.update_status()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment