Commit 8d01770a by Bach Dániel

Merge branch 'seleniumfix' into 'master'

Selenium fix 

Better logging, hierarchy, stability fixes
Corrected time handling

See merge request !313
parents 2937f3c4 2195d4c8
...@@ -15,4 +15,8 @@ ...@@ -15,4 +15,8 @@
# You should have received a copy of the GNU General Public License along # You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from .test_acl import TestModel, Test2Model # noqa from django.conf import settings
# https://code.djangoproject.com/ticket/7835
if settings.SETTINGS_MODULE == 'circle.settings.test':
from .test_acl import TestModel, Test2Model # noqa
...@@ -40,7 +40,8 @@ INSTALLED_APPS += ( ...@@ -40,7 +40,8 @@ INSTALLED_APPS += (
) )
TEST_RUNNER = 'django_nose.NoseTestSuiteRunner' TEST_RUNNER = 'django_nose.NoseTestSuiteRunner'
path_to_selenium_test = os.path.expanduser('~/circle/circle/dashboard/tests/selenium')
path_to_selenium_test = os.path.join(SITE_ROOT, "dashboard/tests/selenium")
NOSE_ARGS = ['--stop', '--with-doctest', '--with-selenium-driver', '--selenium-driver=firefox', '-w%s' % path_to_selenium_test] NOSE_ARGS = ['--stop', '--with-doctest', '--with-selenium-driver', '--selenium-driver=firefox', '-w%s' % path_to_selenium_test]
PASSWORD_HASHERS = ['django.contrib.auth.hashers.MD5PasswordHasher'] PASSWORD_HASHERS = ['django.contrib.auth.hashers.MD5PasswordHasher']
......
...@@ -54,7 +54,7 @@ ...@@ -54,7 +54,7 @@
<dt>{% trans "result" %}</dt> <dt>{% trans "result" %}</dt>
<dd><textarea class="form-control">{{object.result|get_text:user}}</textarea></dd> <dd><textarea class="form-control" id="activity_result_text">{{object.result|get_text:user}}</textarea></dd>
<dt>{% trans "resultant state" %}</dt> <dt>{% trans "resultant state" %}</dt>
<dd>{{object.resultant_state|default:'n/a'}}</dd> <dd>{{object.resultant_state|default:'n/a'}}</dd>
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
{% for a in activities %} {% for a in activities %}
<div class="activity{% if a.pk == active.pk %} activity-active{%endif%}" <div class="activity{% if a.pk == active.pk %} activity-active{%endif%}"
data-activity-id="{{ a.pk }}" data-activity-code="{{ a.activity_code }}"> data-activity-id="{{ a.pk }}" data-activity-code="{{ a.activity_code }}" data-timestamp="{{ a.started|date:"U" }}">
<span class="timeline-icon{% if a.has_failed %} timeline-icon-failed{% endif %}"> <span class="timeline-icon{% if a.has_failed %} timeline-icon-failed{% endif %}">
<i class="fa {% if not a.finished %}fa-refresh fa-spin {% else %}fa-{{a.icon}}{% endif %}"></i> <i class="fa {% if not a.finished %}fa-refresh fa-spin {% else %}fa-{{a.icon}}{% endif %}"></i>
</span> </span>
......
...@@ -16,546 +16,78 @@ ...@@ -16,546 +16,78 @@
# #
# You should have received a copy of the GNU General Public License along # You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from selenose.cases import SeleniumTestCase from datetime import datetime
from django.contrib.auth.models import User import logging
from sys import _getframe
import random import random
import urlparse
import re import re
import time import urlparse
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.select import Select
from selenium.webdriver.common.by import By
from datetime import datetime
from selenium.common.exceptions import NoSuchElementException
random_pass = "".join([random.choice(
'0123456789abcdefghijklmnopqrstvwxyz') for n in xrange(10)])
random_accents = random_pass + "".join([random.choice(
u"áéíöóúűÁÉÍÖÓÜÚŰ") for n in xrange(5)])
wait_max_sec = 10
host = 'https:127.0.0.1'
client_name = 'test_%s' % random_accents
class UtilityMixin(object):
def login(self, username, password='password', location=None):
driver = self.driver
if location is None:
location = '/dashboard/'
driver.get('%s%s' % (host, location))
# Only if we aren't logged in already
if location not in urlparse.urlparse(self.driver.current_url).path:
try:
name_input = driver.find_element_by_id("id_username")
password_input = driver.find_element_by_id("id_password")
submit_input = driver.find_element_by_id("submit-id-submit")
except:
inputs = driver.find_elements_by_tag_name("input")
for current_input in inputs:
input_type = current_input.get_attribute("type")
if input_type == "text":
name_input = current_input
if input_type == "password":
password_input = current_input
if input_type == "submit":
submit_input = current_input
try:
name_input.clear()
name_input.send_keys(username)
password_input.clear()
password_input.send_keys(password)
submit_input.click()
try:
# If selenium runs only in a small (virtual) screen
driver.find_element_by_class_name('navbar-toggle').click()
WebDriverWait(self.driver, wait_max_sec).until(
ec.element_to_be_clickable((
By.CSS_SELECTOR,
"a[href*='/dashboard/profile/']")))
except:
time.sleep(0.5)
except:
raise Exception('Selenium cannot find the form controls')
def list_options(self, select):
try:
option_dic = {}
select = Select(select)
for option in select.options:
key = option.get_attribute('value')
if key is not None and key:
option_dic[key] = [option.text]
return option_dic
except:
raise Exception(
'Selenium cannot list the select possibilities')
def select_option(self, select, what=None):
"""
From an HTML select imput type try to choose the specified one.
Select is a selenium web element type. What represent both the
text of the option and it's ID.
"""
try:
my_choice = None
options = self.list_options(select)
select = Select(select)
if what is not None:
for key, value in options.iteritems():
if what in key:
my_choice = key
else:
if isinstance(value, list):
for single_value in value:
if what in single_value:
my_choice = key
else:
if what in value:
my_choice = key
if my_choice is None:
my_choose_list = options.keys()
my_choice = my_choose_list[random.randint(
0, len(my_choose_list) - 1)]
select.select_by_value(my_choice)
except:
raise Exception(
'Selenium cannot select the chosen one')
def get_link_by_href(self, target_href, attributes=None):
try:
links = self.driver.find_elements_by_tag_name('a')
for link in links:
href = link.get_attribute('href')
if href is not None and href:
if target_href in href:
perfect_fit = True
if isinstance(attributes, dict):
for key, target_value in attributes.iteritems():
attr_check = link.get_attribute(key)
if attr_check is not None and attr_check:
if target_value not in attr_check:
perfect_fit = False
if perfect_fit:
return link
except:
raise Exception(
'Selenium cannot find the href=%s link' % target_href)
def click_on_link(self, link):
"""
There are situations when selenium built in click() function
doesn't work as intended, that's when this function is used.
Fires a click event via javascript injection.
"""
try:
# Javascript function to simulate a click on a link
javascript = (
"var link = arguments[0];"
"var cancelled = false;"
"if(document.createEvent) {"
" var event = document.createEvent(\"MouseEvents\");"
" event.initMouseEvent("
" \"click\", true, true, window, 0, 0, 0, 0, 0,"
" false,false,false,false,0,null);"
" cancelled = !link.dispatchEvent(event);"
"} else if(link.fireEvent) {"
" cancelled = !link.fireEvent(\"onclick\");"
"} if (!cancelled) {"
" window.location = link.href;"
"}")
self.driver.execute_script(javascript, link)
except:
raise Exception(
'Selenium cannot inject javascript to the page')
def wait_and_accept_operation(self, argument=None):
"""
Accepts the operation confirmation pop up window.
Fills out the text inputs before accepting if argument is given.
"""
try:
accept = WebDriverWait(self.driver, wait_max_sec).until(
ec.element_to_be_clickable((
By.CLASS_NAME, "modal-accept")))
if argument is not None:
possible = self.driver.find_elements_by_css_selector(
"div.controls > input[type='text']")
if isinstance(argument, list):
for x in range(0, len(possible)):
possible[x].clear()
possible[x].send_keys(argument[x % len(argument)])
else:
for form in possible:
form.clear()
form.send_keys(argument)
accept.click()
except:
raise Exception("Selenium cannot accept the"
" operation confirmation")
def save_template_from_vm(self, name):
try:
WebDriverWait(self.driver, wait_max_sec).until(
ec.element_to_be_clickable((
By.CSS_SELECTOR,
"a[href$='/op/deploy/']")))
self.click_on_link(self.get_link_by_href("/op/deploy/"))
self.wait_and_accept_operation()
recent_deploy = self.recently(self.get_timeline_elements(
"vm.Instance.deploy"))
if not self.check_operation_result(recent_deploy):
print ("Selenium cannot deploy the "
"chosen template virtual machine")
raise Exception('Cannot deploy the virtual machine')
self.click_on_link(WebDriverWait(self.driver, wait_max_sec).until(
ec.element_to_be_clickable((
By.CSS_SELECTOR,
"a[href$='/op/shut_off/']"))))
self.wait_and_accept_operation()
recent_shut_off = self.recently(self.get_timeline_elements(
"vm.Instance.shut_off"))
if not self.check_operation_result(recent_shut_off):
print ("Selenium cannot shut off the "
"chosen template virtual machine")
raise Exception('Cannot shut off the virtual machine')
self.click_on_link(WebDriverWait(self.driver, wait_max_sec).until(
ec.element_to_be_clickable((
By.CSS_SELECTOR,
"a[href$='/op/save_as_template/']"))))
self.wait_and_accept_operation(name)
return name
except:
raise Exception(
'Selenium cannot save a vm as a template')
def create_base_template(self, name=None, architecture="x86-64",
method=None, op_system=None, lease=None,
network="vm"):
if name is None:
name = "template_new_%s" % client_name
if op_system is None:
op_system = "!os %s" % client_name
try:
self.driver.get('%s/dashboard/template/choose/' % host)
self.driver.find_element_by_css_selector(
"input[type='radio'][value='base_vm']").click()
self.driver.find_element_by_id(
"template-choose-next-button").click()
template_name = WebDriverWait(self.driver, wait_max_sec).until(
ec.visibility_of_element_located((
By.ID, 'id_name')))
template_name.clear()
template_name.send_keys(name)
self.select_option(self.driver.find_element_by_id(
"id_arch"), architecture)
self.select_option(self.driver.find_element_by_id(
"id_access_method"), method)
system_name = self.driver.find_element_by_id("id_system")
system_name.clear()
system_name.send_keys(op_system)
self.select_option(self.driver.find_element_by_id(
"id_lease"), lease)
self.select_option(self.driver.find_element_by_id(
"id_networks"), network)
self.driver.find_element_by_css_selector(
"input.btn[type='submit']").click()
return self.save_template_from_vm(name)
except:
raise Exception(
'Selenium cannot create a base template virtual machine')
def get_template_id(self, name=None, from_all=False):
"""
In default settings find all templates ID in the template list.
If name is specified searches that specific template's ID
from_all sets whether to use owned templates or all of them
Returns list of the templates ID
"""
try:
self.driver.get('%s/dashboard/template/list/' % host)
css_selector_of_a_template = ("a[data-original-title]"
"[href*='/dashboard/template/']")
if from_all:
self.select_option(self.driver.find_element_by_id(
'id_stype'), "all")
self.driver.find_element_by_css_selector(
"button[type='submit']").click()
try:
WebDriverWait(self.driver, wait_max_sec).until(
ec.presence_of_element_located((
By.CSS_SELECTOR, css_selector_of_a_template)))
except:
print "Selenium could not locate any templates"
template_table = self.driver.find_element_by_css_selector(
"table[class*='template-list-table']")
templates = template_table.find_elements_by_css_selector("td.name")
found_template_ids = []
for template in templates:
if name is None or name in template.text:
try:
template_link = template.find_element_by_css_selector(
css_selector_of_a_template)
template_id = re.search(
r'\d+',
template_link.get_attribute("outerHTML")).group()
found_template_ids.append(template_id)
print ("Found '%(name)s' template's ID as %(id)s" % {
'name': template.text,
'id': template_id})
except NoSuchElementException:
pass
except:
raise
if not found_template_ids and name is not None:
print ("Selenium could not find the specified "
"%(name)s template in the list" % {
'name': name})
return found_template_ids
except:
raise Exception(
'Selenium cannot found the template\'s id')
def check_operation_result(self, operation_id, restore=True):
"""
Returns wheter the operation_id result is success (returns: boolean)
"""
try:
if restore:
url_base = urlparse.urlparse(self.driver.current_url)
url_save = ("%(host)s%(url)s" % {
'host': host,
'url': urlparse.urljoin(url_base.path, url_base.query)})
if url_base.fragment:
url_save = ("%(url)s#%(fragment)s" % {
'url': url_save,
'fragment': url_base.fragment})
self.driver.get('%(host)s/dashboard/vm/activity/%(id)s/' % {
'host': host,
'id': operation_id})
result = WebDriverWait(self.driver, wait_max_sec).until(
ec.visibility_of_element_located((
By.ID, "activity_status")))
print ("%(id)s result text is '%(result)s'" % {
'id': operation_id,
'result': result.text})
if (result.text == "success"):
out = True
elif (result.text == "wait"):
time.sleep(2)
out = self.check_operation_result(operation_id, False)
else:
out = False
if restore:
print "Restoring to %s url" % url_save
self.driver.get(url_save)
return out
except:
raise Exception(
'Selenium cannot check the result of an operation')
def recently(self, timeline_dict, second=90):
try:
if isinstance(timeline_dict, dict):
for key, value in timeline_dict.iteritems():
time = datetime.strptime(key, '%Y-%m-%d %H:%M')
delta = datetime.now() - time
if delta.total_seconds() <= second:
return value
except:
raise Exception(
'Selenium cannot filter timeline activities to recent')
def get_timeline_elements(self, code=None):
try:
if code is None:
css_activity_selector = "div[data-activity-code]"
code = "all activity"
else:
css_activity_selector = ("div[data-activity-code="
"'%(code)s']" % {
'code': code})
WebDriverWait(self.driver, wait_max_sec).until(
ec.element_to_be_clickable((
By.CSS_SELECTOR, "a[href*='#activity']"))).click()
activity_dict = {}
timeline = WebDriverWait(self.driver, wait_max_sec).until(
ec.visibility_of_element_located((
By.ID, "activity-timeline")))
searched_activity = timeline.find_elements_by_css_selector(
css_activity_selector)
print "Found activity list for %s:" % code
for activity in searched_activity:
activity_id = activity.get_attribute('data-activity-id')
activity_text = activity.text
key = re.search(
r'\d+-\d+-\d+ \d+:\d+,', activity_text).group()[:-1]
print ("%(id)s @ %(activity)s" % {
'id': activity_id,
'activity': key})
activity_dict[key] = activity_id
return activity_dict
except:
raise Exception('Selenium cannot find the searched activity')
def create_template_from_base(self, delete_disk=True, name=None):
try:
if name is None:
name = "template_from_base_%s" % client_name
self.driver.get('%s/dashboard/template/choose/' % host)
choice_list = []
choices = self.driver.find_elements_by_css_selector(
"input[type='radio']")
choice_list = [item for item in choices if (
'test' not in item.get_attribute('value')
and item.get_attribute('value') != 'base_vm')]
chosen = random.randint(0, len(choice_list) - 1)
choice_list[chosen].click()
self.driver.find_element_by_id(
"template-choose-next-button").click()
if delete_disk:
self.click_on_link(
self.get_link_by_href("#resources"))
disks = WebDriverWait(self.driver, wait_max_sec).until(
ec.visibility_of_element_located((
By.ID, 'vm-details-resources-disk')))
disk_list = disks.find_elements_by_css_selector(
"h4[class*='list-group-item-heading']")
if len(disk_list) > 0:
self.click_on_link(
self.get_link_by_href("/op/remove_disk/"))
self.wait_and_accept_operation()
WebDriverWait(self.driver, wait_max_sec).until(
ec.visibility_of_element_located((
By.ID, "_activity")))
recent_remove_disk = self.recently(
self.get_timeline_elements(
"vm.Instance.remove_disk"))
if not self.check_operation_result(recent_remove_disk):
print ("Selenium cannot delete disk "
"of the chosen template")
raise Exception('Cannot delete disk')
return self.save_template_from_vm(name)
except:
raise Exception('Selenium cannot start a template from a base one')
def delete_template(self, template_id): from django.contrib.auth.models import User
try: from django.db.models import Q
self.driver.get('%s/dashboard/template/%s/' % (host, template_id))
url = urlparse.urlparse(self.driver.current_url)
self.click_on_link(
self.get_link_by_href(
"/dashboard/template/delete/%s/" % template_id))
self.wait_and_accept_operation()
WebDriverWait(self.driver, wait_max_sec).until(
ec.visibility_of_element_located((
By.CLASS_NAME, 'alert-success')))
url = urlparse.urlparse(self.driver.current_url)
if "/template/list/" not in url.path:
raise Exception()
except:
raise Exception('Selenium cannot delete the desired template')
def create_random_vm(self): from selenium.webdriver.common.by import By
try: from selenium.webdriver.support import expected_conditions as ec
self.driver.get('%s/dashboard/vm/create/' % host) from selenium.webdriver.support.ui import WebDriverWait
vm_list = [] from selenose.cases import SeleniumTestCase
pk = None
vm_list = self.driver.find_elements_by_class_name(
'vm-create-template-summary')
choice = random.randint(0, len(vm_list) - 1)
vm_list[choice].click()
WebDriverWait(self.driver, wait_max_sec).until(
ec.element_to_be_clickable((
By.CLASS_NAME, 'vm-create-start'))).click()
WebDriverWait(self.driver, wait_max_sec).until(
ec.visibility_of_element_located((
By.CLASS_NAME, 'alert-success')))
url = urlparse.urlparse(self.driver.current_url)
pk = re.search(r'\d+', url.path).group()
return pk
except:
raise Exception('Selenium cannot start a VM')
def view_change(self, target_box): from vm.models import Instance
driver = self.driver from .config import SeleniumConfig
driver.get('%s/dashboard/' % host) from .util import CircleSeleniumMixin
list_view = driver.find_element_by_id('%s-list-view' % target_box)
graph_view = driver.find_element_by_id('%s-graph-view' % target_box)
js_script = 'return arguments[0].style.display;'
required_attributes = {'data-index-box': target_box}
graph_view_link = self.get_link_by_href(
'#index-graph-view',
required_attributes).find_element_by_tag_name('i')
list_view_link = self.get_link_by_href(
'#index-list-view',
required_attributes).find_element_by_tag_name('i')
self.click_on_link(list_view_link)
states = [driver.execute_script(js_script, list_view),
driver.execute_script(js_script, graph_view)]
self.click_on_link(graph_view_link)
states.extend([driver.execute_script(js_script, list_view),
driver.execute_script(js_script, graph_view)])
self.click_on_link(list_view_link)
states.extend([driver.execute_script(js_script, list_view),
driver.execute_script(js_script, graph_view)])
return states
def delete_vm(self, pk): conf = SeleniumConfig()
try: log_formatter = logging.Formatter(conf.log_format)
# For relability reasons instead of using the JS operatation logger = logging.getLogger(conf.logger_name)
self.driver.get("%(host)s/dashboard/vm/%(id)s/op/destroy/" % { fileHandler = logging.handlers.RotatingFileHandler(
'host': host, conf.log_file, maxBytes=conf.log_size, backupCount=conf.log_backup)
'id': pk}) fileHandler.setFormatter(log_formatter)
self.wait_and_accept_operation() fileHandler.setLevel(logging.WARNING)
try: logger.addHandler(fileHandler)
status_span = WebDriverWait(self.driver, wait_max_sec).until(
ec.visibility_of_element_located((
By.ID, 'vm-details-state')))
WebDriverWait(status_span, wait_max_sec).until(
ec.visibility_of_element_located((
By.CLASS_NAME, 'fa-trash-o')))
except:
# Selenium can time-out by not realising the JS refresh
recent_destroy_vm = self.recently(
self.get_timeline_elements("vm.Instance.destroy"))
if not self.check_operation_result(recent_destroy_vm):
print ("Selenium cannot destroy "
"the chosen %(id)s vm" % {
'id': pk})
raise Exception('Cannot destroy the specified vm')
self.driver.get('%s/dashboard/vm/%s/' % (host, pk))
try:
WebDriverWait(self.driver, wait_max_sec).until(
ec.visibility_of_element_located((
By.CSS_SELECTOR,
"span[data-status*='DESTROYED']")))
return True
except:
return False
except:
raise Exception("Selenium can not destroy a VM")
class VmDetailTest(UtilityMixin, SeleniumTestCase): class BasicSeleniumTests(SeleniumTestCase, CircleSeleniumMixin):
template_ids = [] template_ids = []
vm_ids = [] vm_ids = []
def __init__(self, *args, **kwargs):
super(self.__class__, self).__init__(*args, **kwargs)
self.conf = conf
@classmethod @classmethod
def setup_class(cls): def setup_class(cls):
cls._user = User.objects.create(username=client_name, logger.warning("Selenium test started @ %(time)s" % {
is_superuser=True) 'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
cls._user.set_password(random_accents) if conf.create_user:
cls._user.save() logger.warning(
"Creating selenium test user %(name)s:%(password)s" % {
'name': conf.client_name,
'password': conf.random_pass})
cls._user = User.objects.create(username=conf.client_name,
is_superuser=True)
cls._user.set_password(conf.random_pass)
cls._user.save()
@classmethod @classmethod
def teardown_class(cls): def teardown_class(cls):
cls._user.delete() if conf.create_user:
for instance in Instance.objects.all().filter(
~Q(status=u'DESTROYED'), owner=cls._user):
logger.warning(
"Destroying the test virtual machine: %(id)s" % {
'id': instance.pk})
instance.destroy(system=True)
logger.warning("Deleting test user %(name)s" % {
'name': conf.client_name})
cls._user.delete()
logger.warning("Selenium test finished @ %(time)s" % {
'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
def test_01_login(self): def test_01_login(self):
logger.warning("Starting test %s" % _getframe().f_code.co_name)
title = 'Dashboard | CIRCLE' title = 'Dashboard | CIRCLE'
location = '/dashboard/' location = '/dashboard/'
self.login(client_name, random_accents) self.login()
self.driver.get('%s/dashboard/' % host) self.driver.get('%s/dashboard/' % conf.host)
url = urlparse.urlparse(self.driver.current_url) url = urlparse.urlparse(self.driver.current_url)
(self.assertIn('%s' % title, self.driver.title, (self.assertIn('%s' % title, self.driver.title,
'%s is not found in the title' % title) or '%s is not found in the title' % title) or
...@@ -563,62 +95,69 @@ class VmDetailTest(UtilityMixin, SeleniumTestCase): ...@@ -563,62 +95,69 @@ class VmDetailTest(UtilityMixin, SeleniumTestCase):
'URL path is not equal with %s' % location)) 'URL path is not equal with %s' % location))
def test_02_add_template_rights(self): def test_02_add_template_rights(self):
self.login(client_name, random_accents) logger.warning("Starting test %s" % _getframe().f_code.co_name)
self.login()
template_pool = self.get_template_id(from_all=True) template_pool = self.get_template_id(from_all=True)
if len(template_pool) > 1: if len(template_pool) > 1:
chosen = template_pool[random.randint(0, len(template_pool) - 1)] chosen = template_pool[random.randint(0, len(template_pool) - 1)]
elif len(template_pool) == 1: elif len(template_pool) == 1:
chosen = template_pool[0] chosen = template_pool[0]
else: else:
print "Selenium did not found any templates" logger.exception("Selenium did not found any templates")
raise Exception( raise Exception(
"System did not meet required conditions to continue") "Selenium did not found any templates")
self.driver.get('%s/dashboard/template/%s/' % (host, chosen)) self.driver.get('%s/dashboard/template/%s/' % (conf.host, chosen))
acces_form = self.driver.find_element_by_css_selector( acces_form_css = (
"form[action*='/dashboard/template/%(template_id)s/acl/']" "form[action*='/dashboard/template/%(template_id)s/acl/']"
"[method='post']" % { "[method='post']" % {
'template_id': chosen}) 'template_id': chosen})
acces_form = self.driver.find_element_by_css_selector(acces_form_css)
user_name = acces_form.find_element_by_css_selector( user_name = acces_form.find_element_by_css_selector(
"input[type='text'][id='id_name']") "input[type='text'][id='id_name']")
user_status = acces_form.find_element_by_css_selector( user_status = acces_form.find_element_by_css_selector(
"select[name='level']") "select[name='level']")
user_name.clear() user_name.clear()
user_name.send_keys(client_name) user_name.send_keys(conf.client_name)
self.select_option(user_status) self.select_option(user_status, 'user')
# For strange reasons clicking on submit button doesn't work anymore # For strange reasons clicking on submit button doesn't work anymore
acces_form.submit() acces_form.submit()
found_users = [] found_users = []
acl_users = self.driver.find_elements_by_css_selector( acces_form = self.driver.find_element_by_css_selector(acces_form_css)
acl_users = acces_form.find_elements_by_css_selector(
"a[href*='/dashboard/profile/']") "a[href*='/dashboard/profile/']")
for user in acl_users: for user in acl_users:
user_text = re.split(r':[ ]?', user.text) user_text = re.split(r':[ ]?', user.text)
if len(user_text) == 2: if len(user_text) == 2:
found_name = re.search(r'[\w\W]+(?=\))', user_text[1]).group() found_name = re.search(r'[\w\W]+(?=\))', user_text[1]).group()
print ("'%(user)s' found in ACL list for template %(id)s" % { logger.warning("'%(user)s' found in ACL "
'user': found_name, "list for template %(id)s" % {
'id': chosen}) 'user': found_name,
'id': chosen})
found_users.append(found_name) found_users.append(found_name)
self.assertIn(client_name, found_users, self.assertIn(conf.client_name, found_users,
"Could not add user to template's ACL") "Could not add user to template's ACL")
def test_03_able_to_create_template(self): def test_03_able_to_create_template(self):
self.login(client_name, random_accents) logger.warning("Starting test %s" % _getframe().f_code.co_name)
self.login()
template_list = None template_list = None
create_template = self.get_link_by_href('/dashboard/template/choose/') create_template = self.get_link_by_href('/dashboard/template/choose/')
self.click_on_link(create_template) self.click_on_link(create_template)
WebDriverWait(self.driver, wait_max_sec).until( WebDriverWait(self.driver, conf.wait_max_sec).until(
ec.visibility_of_element_located(( ec.visibility_of_element_located((
By.ID, 'confirmation-modal'))) By.ID, 'confirmation-modal')))
template_list = self.driver.find_elements_by_class_name( template_list = self.driver.find_elements_by_class_name(
'template-choose-list-element') 'template-choose-list-element')
print 'Selenium found %s template possibilities' % len(template_list) logger.warning('Selenium found %(count)s template possibilities' % {
'count': len(template_list)})
(self.assertIsNotNone( (self.assertIsNotNone(
template_list, "Selenium can not find the create template list") or template_list, "Selenium can not find the create template list") or
self.assertGreater(len(template_list), 0, self.assertGreater(len(template_list), 0,
"The create template list is empty")) "The create template list is empty"))
def test_04_create_base_template(self): def test_04_create_base_template(self):
self.login(client_name, random_accents) logger.warning("Starting test %s" % _getframe().f_code.co_name)
self.login()
created_template_id = self.get_template_id( created_template_id = self.get_template_id(
self.create_base_template()) self.create_base_template())
found = created_template_id is not None found = created_template_id is not None
...@@ -629,7 +168,8 @@ class VmDetailTest(UtilityMixin, SeleniumTestCase): ...@@ -629,7 +168,8 @@ class VmDetailTest(UtilityMixin, SeleniumTestCase):
"Could not found the created template in the template list") "Could not found the created template in the template list")
def test_05_create_template_from_base(self): def test_05_create_template_from_base(self):
self.login(client_name, random_accents) logger.warning("Starting test %s" % _getframe().f_code.co_name)
self.login()
created_template_id = self.get_template_id( created_template_id = self.get_template_id(
self.create_template_from_base()) self.create_template_from_base())
found = created_template_id is not None found = created_template_id is not None
...@@ -640,10 +180,11 @@ class VmDetailTest(UtilityMixin, SeleniumTestCase): ...@@ -640,10 +180,11 @@ class VmDetailTest(UtilityMixin, SeleniumTestCase):
"Could not found the created template in the template list") "Could not found the created template in the template list")
def test_06_delete_templates(self): def test_06_delete_templates(self):
logger.warning("Starting test %s" % _getframe().f_code.co_name)
success = False success = False
self.login(client_name, random_accents) self.login()
for template_id in self.template_ids: for template_id in self.template_ids:
print "Deleting template %s" % template_id logger.warning("Deleting template %s" % template_id)
self.delete_template(template_id) self.delete_template(template_id)
existing_templates = self.get_template_id() existing_templates = self.get_template_id()
if len(existing_templates) == 0: if len(existing_templates) == 0:
...@@ -658,52 +199,57 @@ class VmDetailTest(UtilityMixin, SeleniumTestCase): ...@@ -658,52 +199,57 @@ class VmDetailTest(UtilityMixin, SeleniumTestCase):
success, "Could not delete (all) the test template(s)") success, "Could not delete (all) the test template(s)")
def test_07_able_to_create_vm(self): def test_07_able_to_create_vm(self):
self.login(client_name, random_accents) logger.warning("Starting test %s" % _getframe().f_code.co_name)
self.login()
vm_list = None vm_list = None
create_vm_link = self.get_link_by_href('/dashboard/vm/create/') create_vm_link = self.get_link_by_href('/dashboard/vm/create/')
create_vm_link.click() create_vm_link.click()
WebDriverWait(self.driver, wait_max_sec).until( WebDriverWait(self.driver, conf.wait_max_sec).until(
ec.visibility_of_element_located(( ec.visibility_of_element_located((
By.ID, 'confirmation-modal'))) By.ID, 'confirmation-modal')))
vm_list = self.driver.find_elements_by_class_name( vm_list = self.driver.find_elements_by_class_name(
'vm-create-template-summary') 'vm-create-template-summary')
print ("Selenium found %(vm_number)s virtual machine template " logger.warning("Selenium found %(vm_number)s virtual machine"
" possibilities" % { " template possibilities" % {
'vm_number': len(vm_list)}) 'vm_number': len(vm_list)})
(self.assertIsNotNone( (self.assertIsNotNone(
vm_list, "Selenium can not find the VM list") or vm_list, "Selenium can not find the VM list") or
self.assertGreater(len(vm_list), 0, "The create VM list is empty")) self.assertGreater(len(vm_list), 0, "The create VM list is empty"))
def test_08_create_vm(self): def test_08_create_vm(self):
self.login(client_name, random_accents) logger.warning("Starting test %s" % _getframe().f_code.co_name)
self.login()
pk = self.create_random_vm() pk = self.create_random_vm()
self.vm_ids.append(pk) self.vm_ids.append(pk)
self.assertIsNotNone(pk, "Can not create a VM") self.assertIsNotNone(pk, "Can not create a VM")
def test_09_vm_view_change(self): def test_09_vm_view_change(self):
self.login(client_name, random_accents) logger.warning("Starting test %s" % _getframe().f_code.co_name)
self.login()
expected_states = ["", "none", expected_states = ["", "none",
"none", "", "none", "",
"block", "none"] "block", "none"]
states = self.view_change("vm") states = self.view_change("vm")
print 'states: [%s]' % ', '.join(map(str, states)) logger.warning('states: [%s]' % ', '.join(map(str, states)))
print 'expected: [%s]' % ', '.join(map(str, expected_states)) logger.warning('expected: [%s]' % ', '.join(map(str, expected_states)))
self.assertListEqual(states, expected_states, self.assertListEqual(states, expected_states,
"The view mode does not change for VM listing") "The view mode does not change for VM listing")
def test_10_node_view_change(self): def test_10_node_view_change(self):
self.login(client_name, random_accents) logger.warning("Starting test %s" % _getframe().f_code.co_name)
self.login()
expected_states = ["", "none", expected_states = ["", "none",
"none", "", "none", "",
"block", "none"] "block", "none"]
states = self.view_change("node") states = self.view_change("node")
print 'states: [%s]' % ', '.join(map(str, states)) logger.warning('states: [%s]' % ', '.join(map(str, states)))
print 'expected: [%s]' % ', '.join(map(str, expected_states)) logger.warning('expected: [%s]' % ', '.join(map(str, expected_states)))
self.assertListEqual(states, expected_states, self.assertListEqual(states, expected_states,
"The view mode does not change for NODE listing") "The view mode does not change for NODE listing")
def test_11_delete_vm(self): def test_11_delete_vm(self):
self.login(client_name, random_accents) logger.warning("Starting test %s" % _getframe().f_code.co_name)
self.login()
succes = True succes = True
for vm in self.vm_ids: for vm in self.vm_ids:
if not self.delete_vm(vm): if not self.delete_vm(vm):
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
import random
class SeleniumConfig(object):
# How many sec can selenium wait till certain parts of a page appears
wait_max_sec = 10
# How much sec can pass before the activity is no longer happened recently
recently_sec = 90
# Name of the logger (necessary to override test logger)
logger_name = "selenium"
# File where the log should be stored
log_file = "selenium.log"
# Log file max size in Bytes
log_size = 1024 * 1024 * 10
# Format of the log file
log_format = "%(asctime)s: %(name)s: %(levelname)s: %(message)s"
# Backup count of the logfiles
log_backup = 5
# Accented letters from which selenium can choose to name stuff
accents = u"áéíöóúűÁÉÍÖÓÜÚŰ"
# Non accented letters from which selenium can choose to name stuff
valid_chars = "0123456789abcdefghijklmnopqrstvwxyz"
# First we choose 10 random normal letters
random_pass = "".join([random.choice(
valid_chars) for n in xrange(10)])
# Then we append it with 5 random accented one
random_pass += "".join([random.choice(
accents) for n in xrange(5)])
# Then we name our client as test_%(password)s
client_name = 'test_%s' % random_pass
# Which webpage should selenium use (localhost is recommended)
host = 'https://127.0.0.1'
# In default the tests create a new user then delete it afteword
# Disable this if selenium cannot acces the database
create_user = True
"""
Note: It's possible to setup that selenium uses a distant web server
for testing. If you choose this method you must provide a distant superuser
account info for that server by overriding random_pass and client_name by
uncommenting the lines below.
"""
# client_name = "user name here"
# random_pass = "password here"
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime
import inspect
import logging
import random
import re
import time
import urlparse
from selenium.common.exceptions import (
NoSuchElementException, StaleElementReferenceException,
TimeoutException)
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.select import Select
from selenium.webdriver.support.ui import WebDriverWait
from .config import SeleniumConfig
logger = logging.getLogger(SeleniumConfig.logger_name)
class SeleniumMixin(object):
def create_screenshot(self):
name = 'ss_from_%(caller_name)s.png' % {
'caller_name': inspect.stack()[1][3]}
logger.warning('Creating screenshot "%s"' % name)
self.driver.save_screenshot(name)
def get_url(self, fragment_needed=False, fragment=None):
url_base = urlparse.urlparse(self.driver.current_url)
url_save = ("%(host)s%(url)s" % {
'host': self.conf.host,
'url': urlparse.urljoin(url_base.path, url_base.query)})
if fragment is None:
fragment = url_base.fragment
else:
fragment_needed = True
if fragment_needed and fragment:
url_save = ("%(url)s#%(fragment)s" % {
'url': url_save,
'fragment': fragment})
return url_save
def list_options(self, select):
try:
option_dic = {}
select = Select(select)
for option in select.options:
key = option.get_attribute('value')
if key is not None and key:
option_dic[key] = [option.text]
return option_dic
except:
logger.exception("Selenium cannot list the"
" select possibilities")
self.create_screenshot()
raise Exception(
'Cannot list the select possibilities')
def select_option(self, select, what=None):
"""
From an HTML select imput type try to choose the specified one.
Select is a selenium web element type. What represent both the
text of the option and it's ID.
"""
try:
my_choice = None
options = self.list_options(select)
select = Select(select)
if what is not None:
for key, value in options.iteritems():
if what in key:
my_choice = key
else:
if isinstance(value, list):
for single_value in value:
if what in single_value:
my_choice = key
else:
if what in value:
my_choice = key
if my_choice is None:
my_choose_list = options.keys()
my_choice = my_choose_list[random.randint(
0, len(my_choose_list) - 1)]
select.select_by_value(my_choice)
except:
logger.exception("Selenium cannot select the chosen one")
self.create_screenshot()
raise Exception(
'Cannot select the chosen one')
def get_link_by_href(self, target_href, attributes=None):
try:
links = self.driver.find_elements_by_tag_name('a')
for link in links:
href = link.get_attribute('href')
if href is not None and href:
if target_href in href:
perfect_fit = True
if isinstance(attributes, dict):
for key, target_value in attributes.iteritems():
attr_check = link.get_attribute(key)
if attr_check is not None and attr_check:
if target_value not in attr_check:
perfect_fit = False
if perfect_fit:
return link
except:
logger.exception(
"Selenium cannot find the href=%s link" % target_href)
self.create_screenshot()
raise Exception('Cannot find the requested href')
def click_on_link(self, link):
"""
There are situations when selenium built in click() function
doesn't work as intended, that's when this function is used.
Fires a click event via javascript injection.
"""
try:
# Javascript function to simulate a click on a link
javascript = """
var link = arguments[0];
var cancelled = false;
if(document.createEvent) {
var event = document.createEvent("MouseEvents");
event.initMouseEvent(
"click", true, true, window, 0, 0, 0, 0, 0,
false,false,false,false,0,null);
cancelled = !link.dispatchEvent(event);
} else if(link.fireEvent) {
cancelled = !link.fireEvent("onclick");
} if (!cancelled) {
window.location = link.href;
}"""
self.driver.execute_script(javascript, link)
except:
logger.exception("Selenium cannot inject javascript to the page")
self.create_screenshot()
raise Exception(
'Cannot inject javascript to the page')
def get_text(self, node, tag):
"""
There are some cases where selenium default WebElement text()
method returns less then it actually could contain. Solving that
here is a simple regular expression. Give the closest html element
then specify the html tag of the enclosed text.
"""
text = ""
try:
text_whole = re.search(
r'<%(tag)s[^>]*>([^<]+)</%(tag)s>' % {
'tag': tag},
node.get_attribute("outerHTML")).group()
text_parts = text_whole.splitlines()
for part in text_parts:
if '<' not in part and '>' not in part:
text += part
text = text.replace(" ", "")
except:
return node.text
if len(node.text) >= len(text):
text = node.text
else:
logger.warning("Better text found which is '%s'" % text)
return text.strip()
class CircleSeleniumMixin(SeleniumMixin):
def login(self, location=None):
driver = self.driver
if location is None:
location = '/dashboard/'
driver.get('%s%s' % (self.conf.host, location))
# Only if we aren't logged in already
if location not in urlparse.urlparse(self.driver.current_url).path:
try:
name_input = driver.find_element_by_id("id_username")
password_input = driver.find_element_by_id("id_password")
submit_input = driver.find_element_by_id("submit-id-submit")
except:
inputs = driver.find_elements_by_tag_name("input")
for current_input in inputs:
input_type = current_input.get_attribute("type")
if input_type == "text":
name_input = current_input
if input_type == "password":
password_input = current_input
if input_type == "submit":
submit_input = current_input
try:
name_input.clear()
name_input.send_keys(self.conf.client_name)
password_input.clear()
password_input.send_keys(self.conf.random_pass)
submit_input.click()
try:
# If selenium runs only in a small (virtual) screen
driver.find_element_by_class_name('navbar-toggle').click()
WebDriverWait(self.driver, self.conf.wait_max_sec).until(
ec.element_to_be_clickable((
By.CSS_SELECTOR,
"a[href*='/dashboard/profile/']")))
except:
time.sleep(0.5)
except:
logger.exception("Selenium cannot find the form controls")
self.create_screenshot()
raise Exception('Cannot find the form controls')
def fallback(self, fallback_url, fallback_function):
logger.warning(
"However error was anticipated falling back to %(url)s" % {
'url': fallback_url})
self.driver.get(fallback_url)
return fallback_function()
def wait_and_accept_operation(self, argument=None, try_wait=None,
fallback_url=None):
"""
Accepts the operation confirmation pop up window.
Fills out the text inputs before accepting if argument is given.
"""
try:
accept = WebDriverWait(self.driver, self.conf.wait_max_sec).until(
ec.element_to_be_clickable((
By.CLASS_NAME, "modal-accept")))
if argument is not None:
possible = self.driver.find_elements_by_css_selector(
"div.controls > input[type='text']")
if isinstance(argument, list):
for x in range(0, len(possible)):
possible[x].clear()
possible[x].send_keys(argument[x % len(argument)])
else:
for form in possible:
form.clear()
form.send_keys(argument)
accept.click()
if try_wait is not None:
WebDriverWait(self.driver, self.conf.wait_max_sec).until(
ec.visibility_of_element_located((
By.CSS_SELECTOR, try_wait)))
except TimeoutException:
logger.exception("Selenium cannot accept the"
" operation confirmation")
if fallback_url is not None:
self.fallback(
fallback_url,
lambda: self.wait_and_accept_operation(argument))
else:
self.create_screenshot()
raise Exception(
'Cannot accept the operation confirmation')
except:
logger.exception("Selenium cannot accept the"
" operation confirmation")
if fallback_url is not None:
self.fallback(
fallback_url,
lambda: self.wait_and_accept_operation(argument, try_wait))
else:
self.create_screenshot()
raise Exception(
'Cannot accept the operation confirmation')
def save_template_from_vm(self, name):
try:
url_save = self.get_url()
WebDriverWait(self.driver, self.conf.wait_max_sec).until(
ec.element_to_be_clickable((
By.CSS_SELECTOR,
"a[href$='/op/deploy/']")))
self.click_on_link(self.get_link_by_href("/op/deploy/"))
fallback_url = "%sop/deploy/" % url_save
self.wait_and_accept_operation(
try_wait="a[href$='/op/shut_off/']", fallback_url=fallback_url)
recent_deploy = self.recently(self.get_timeline_elements(
"vm.Instance.deploy", url_save))
if not self.check_operation_result(
recent_deploy, "a[href*='#activity']"):
logger.warning("Selenium cannot deploy the "
"chosen template virtual machine")
raise Exception('Cannot deploy the virtual machine')
self.click_on_link(WebDriverWait(
self.driver, self.conf.wait_max_sec).until(
ec.element_to_be_clickable((
By.CSS_SELECTOR,
"a[href$='/op/shut_off/']"))))
fallback_url = "%sop/shut_off/" % url_save
self.wait_and_accept_operation(
try_wait="a[href$='/op/deploy/']", fallback_url=fallback_url)
recent_shut_off = self.recently(self.get_timeline_elements(
"vm.Instance.shut_off", url_save))
if not self.check_operation_result(
recent_shut_off, "a[href*='#activity']"):
logger.warning("Selenium cannot shut off the "
"chosen template virtual machine")
raise Exception('Cannot shut off the virtual machine')
self.click_on_link(WebDriverWait(
self.driver, self.conf.wait_max_sec).until(
ec.element_to_be_clickable((
By.CSS_SELECTOR,
"a[href$='/op/save_as_template/']"))))
fallback_url = "%sop/save_as_template/" % url_save
self.wait_and_accept_operation(
argument=name, fallback_url=fallback_url)
recent_save_template = self.recently(self.get_timeline_elements(
"vm.Instance.save_as_template", url_save))
if not self.check_operation_result(
recent_save_template, "a[href*='#activity']"):
logger.warning("Selenium cannot save the "
"chosen virtual machine as a template")
raise Exception(
'Cannot save the virtual machine as a template')
logger.warning("Selenium created %(name)s template" % {
'name': name})
return name
except:
logger.exception("Selenium cannot save a vm as a template")
self.create_screenshot()
raise Exception(
'Cannot save a vm as a template')
def create_base_template(self, name=None, architecture="x86-64",
method=None, op_system=None, lease=None,
network="vm"):
if name is None:
name = "new_%s" % self.conf.client_name
if op_system is None:
op_system = "!os %s" % self.conf.client_name
try:
self.driver.get('%s/dashboard/template/choose/' % self.conf.host)
self.driver.find_element_by_css_selector(
"input[type='radio'][value='base_vm']").click()
self.driver.find_element_by_id(
"template-choose-next-button").click()
template_name = WebDriverWait(
self.driver, self.conf.wait_max_sec).until(
ec.visibility_of_element_located((
By.ID, 'id_name')))
template_name.clear()
template_name.send_keys(name)
self.select_option(self.driver.find_element_by_id(
"id_arch"), architecture)
self.select_option(self.driver.find_element_by_id(
"id_access_method"), method)
system_name = self.driver.find_element_by_id("id_system")
system_name.clear()
system_name.send_keys(op_system)
self.select_option(self.driver.find_element_by_id(
"id_lease"), lease)
self.select_option(self.driver.find_element_by_id(
"id_networks"), network)
self.driver.find_element_by_css_selector(
"input.btn[type='submit']").click()
return self.save_template_from_vm(name)
except:
logger.exception("Selenium cannot create a base"
" template virtual machine")
self.create_screenshot()
raise Exception(
'Cannot create a base template virtual machine')
def get_template_id(self, name=None, from_all=False):
"""
In default settings find all templates ID in the template list.
If name is specified searches that specific template's ID
from_all sets whether to use owned templates or all of them
Returns list of the templates ID
"""
try:
self.driver.get('%s/dashboard/template/list/' % self.conf.host)
css_selector_of_a_template = ("a[data-original-title]"
"[href*='/dashboard/template/']")
if from_all:
self.select_option(self.driver.find_element_by_id(
'id_stype'), "all")
self.driver.find_element_by_css_selector(
"button[type='submit']").click()
try:
WebDriverWait(self.driver, self.conf.wait_max_sec).until(
ec.presence_of_element_located((
By.CSS_SELECTOR, css_selector_of_a_template)))
except:
logger.warning("Selenium could not locate any templates")
raise Exception("Could not locate any templates")
template_table = self.driver.find_element_by_css_selector(
"table[class*='template-list-table']")
templates = template_table.find_elements_by_css_selector("td.name")
found_template_ids = []
for template in templates:
# Little magic to outsmart accented naming errors
template_name = self.get_text(template, "a")
if name is None or name in template_name:
try:
template_link = template.find_element_by_css_selector(
css_selector_of_a_template)
template_id = re.search(
r'\d+',
template_link.get_attribute("outerHTML")).group()
found_template_ids.append(template_id)
logger.warning("Found '%(name)s' "
"template's ID as %(id)s" % {
'name': template_name,
'id': template_id})
except NoSuchElementException:
pass
except:
raise
else:
logger.warning(
"Searching for %(searched)s so"
" %(name)s is dismissed" % {
'searched': name,
'name': template_name})
logger.warning(
"Dismissed template html code: %(code)s" % {
'code': template.get_attribute("outerHTML")})
if not found_template_ids and name is not None:
logger.warning("Selenium could not find the specified "
"%(name)s template in the list" % {
'name': name})
raise Exception("Could not find the specified template")
return found_template_ids
except:
logger.exception('Selenium cannot find the template\'s id')
self.create_screenshot()
raise Exception(
'Cannot find the template\'s id')
def check_operation_result(self, operation_id, restore_selector=None,
restore=True):
"""
Returns wheter the operation_id result is success (returns: boolean)
"""
try:
if restore:
url_save = self.get_url(True)
self.driver.get('%(host)s/dashboard/vm/activity/%(id)s/' % {
'host': self.conf.host,
'id': operation_id})
result = WebDriverWait(self.driver, self.conf.wait_max_sec).until(
ec.visibility_of_element_located((
By.ID, "activity_status")))
logger.warning("%(id)s's result is '%(result)s'" % {
'id': operation_id,
'result': result.text})
if (result.text == "success"):
out = True
elif (result.text == "wait"):
time.sleep(2)
out = self.check_operation_result(
operation_id=operation_id, restore=False)
else:
try:
result_text = WebDriverWait(
self.driver, self.conf.wait_max_sec).until(
ec.visibility_of_element_located((
By.ID, "activity_result_text")))
logger.warning(
"%(id)s's result text is: '%(result_text)s'" % {
'id': operation_id,
'result_text': result_text.text})
except:
logger.warning("Cannot read %(id)s's result text" % {
'id': operation_id})
out = False
if restore:
logger.warning("Restoring to %s url" % url_save)
self.driver.get(url_save)
if restore_selector is not None and restore_selector:
WebDriverWait(self.driver, self.conf.wait_max_sec).until(
ec.visibility_of_element_located((
By.CSS_SELECTOR, restore_selector)))
return out
except:
logger.exception("Selenium cannot check the"
" result of an operation")
self.create_screenshot()
raise Exception(
'Cannot check the result of an operation')
def recently(self, timeline_dict, second=None):
if second is None:
second = self.conf.recently_sec
try:
if isinstance(timeline_dict, dict):
recent = None
for key, value in timeline_dict.iteritems():
if recent is None or int(key) > int(recent):
recent = key
if len(timeline_dict) > 1:
logger.warning(
"Searching for most recent activity"
" from the received %(count)s pieces" % {
'count': len(timeline_dict)})
logger.warning("Found at %(id)s @ %(time)s" % {
'id': timeline_dict[recent],
'time': datetime.fromtimestamp(
int(recent)).strftime('%Y-%m-%d %H:%M:%S')})
logger.warning(
"Checking wheter %(id)s started in the"
" recent %(second)s seconds" % {
'id': timeline_dict[recent],
'second': second})
delta = datetime.now() - datetime.fromtimestamp(int(recent))
if delta.total_seconds() <= second:
return timeline_dict[recent]
except:
logger.exception("Selenium cannot filter timeline "
"activities to find most recent")
self.create_screenshot()
raise Exception(
'Cannot filter timeline activities to find most recent')
def get_timeline_elements(self, code=None, fallback_url=None):
try:
if code is None:
css_activity_selector = "div[data-activity-code]"
code_text = "all activity"
else:
code_text = code
css_activity_selector = ("div[data-activity-code="
"'%(code)s']" % {
'code': code})
try:
self.click_on_link(WebDriverWait(
self.driver, self.conf.wait_max_sec).until(
ec.element_to_be_clickable((
By.CSS_SELECTOR, "a[href*='#activity']"))))
activity_dict = {}
timeline = WebDriverWait(
self.driver, self.conf.wait_max_sec).until(
ec.visibility_of_element_located((
By.ID, "activity-timeline")))
searched_activity = timeline.find_elements_by_css_selector(
css_activity_selector)
logger.warning("Found activity list for %s:" % code_text)
for activity in searched_activity:
activity_id = activity.get_attribute('data-activity-id')
key = activity.get_attribute('data-timestamp')
logger.warning("%(id)s @ %(activity)s" % {
'id': activity_id,
'activity': datetime.fromtimestamp(
int(key)).strftime('%Y-%m-%d %H:%M:%S')})
activity_dict[key] = activity_id
except StaleElementReferenceException:
logger.warning('Timeline changed while processing it')
return self.get_timeline_elements(code, fallback_url)
except TimeoutException:
logger.warning('Can not found timeline in the page')
if fallback_url is not None:
return self.fallback(
fallback_url,
lambda: self.get_timeline_elements(code))
else:
self.create_screenshot()
raise Exception('Selenium could not locate the timeline')
except:
logger.exception('Selenium cannot get timeline elemets')
self.create_screenshot()
raise Exception('Cannot get timeline elements')
if len(activity_dict) == 0:
logger.warning('Found activity list is empty')
self.create_screenshot()
raise Exception('Selenium did not found any activity')
return activity_dict
except:
logger.exception('Selenium cannot find the searched activity')
self.create_screenshot()
raise Exception('Cannot find the searched activity')
def create_template_from_base(self, delete_disk=True, name=None):
try:
if name is None:
name = "from_%s" % self.conf.client_name
self.driver.get('%s/dashboard/template/choose/' % self.conf.host)
choice_list = []
choices = self.driver.find_elements_by_css_selector(
"input[type='radio']")
choice_list = [item for item in choices if (
'test' not in item.get_attribute('value')
and item.get_attribute('value') != 'base_vm')]
chosen = random.randint(0, len(choice_list) - 1)
choice_list[chosen].click()
self.driver.find_element_by_id(
"template-choose-next-button").click()
if delete_disk:
url_save = self.get_url(fragment='activity')
self.click_on_link(
self.get_link_by_href("#resources"))
disks = WebDriverWait(
self.driver, self.conf.wait_max_sec).until(
ec.visibility_of_element_located((
By.ID, 'vm-details-resources-disk')))
disk_list = disks.find_elements_by_css_selector(
"h4[class*='list-group-item-heading']")
if len(disk_list) > 0:
self.click_on_link(
self.get_link_by_href("/op/remove_disk/"))
self.wait_and_accept_operation(
try_wait="a[href*='#activity']")
recent_remove_disk = self.recently(
self.get_timeline_elements(
"vm.Instance.remove_disk", url_save))
if not self.check_operation_result(recent_remove_disk):
logger.warning("Selenium cannot delete disk "
"of the chosen template")
raise Exception('Cannot delete disk')
return self.save_template_from_vm(name)
except:
logger.exception("Selenium cannot start a"
" template from a base one")
self.create_screenshot()
raise Exception(
'Cannot start a template from a base one')
def delete_template(self, template_id):
try:
self.driver.get(
'%s/dashboard/template/%s/' % (self.conf.host, template_id))
url_save = "%(host)s/dashboard/template/delete/%(pk)s/" % {
'host': self.conf.host,
'pk': template_id}
self.click_on_link(
self.get_link_by_href(
"/dashboard/template/delete/%s/" % template_id))
self.wait_and_accept_operation(fallback_url=url_save)
WebDriverWait(self.driver, self.conf.wait_max_sec).until(
ec.visibility_of_element_located((
By.CLASS_NAME, 'alert-success')))
url = urlparse.urlparse(self.driver.current_url)
if "/template/list/" not in url.path:
logger.warning('CIRCLE does not redirect to /template/list/')
raise Exception(
'System does not redirect to template listing')
logger.warning('Successfully deleted template: id - %(pk)s' % {
'pk': template_id})
except:
logger.exception("Selenium cannot delete the desired template")
self.create_screenshot()
raise Exception('Cannot delete the desired template')
def create_random_vm(self):
try:
self.driver.get('%s/dashboard/vm/create/' % self.conf.host)
vm_list = []
pk = None
vm_list = self.driver.find_elements_by_class_name(
'vm-create-template-summary')
choice = random.randint(0, len(vm_list) - 1)
vm_list[choice].click()
try:
WebDriverWait(self.driver, self.conf.wait_max_sec).until(
ec.element_to_be_clickable((
By.CLASS_NAME, "vm-create-start"))).click()
except TimeoutException:
# Selenium can time out not findig it even though it is present
self.driver.find_element_by_tag_name('form').submit()
except:
logger.exception("Selenium could not submit create vm form")
raise Exception('Could not submit a form')
WebDriverWait(self.driver, self.conf.wait_max_sec).until(
ec.visibility_of_element_located((
By.CLASS_NAME, 'alert-success')))
url = urlparse.urlparse(self.driver.current_url)
pk = re.search(r'\d+', url.path).group()
return pk
except:
logger.exception("Selenium cannot start a VM")
self.create_screenshot()
raise Exception('Cannot start a VM')
def view_change(self, target_box):
driver = self.driver
driver.get('%s/dashboard/' % self.conf.host)
list_view = driver.find_element_by_id('%s-list-view' % target_box)
graph_view = driver.find_element_by_id('%s-graph-view' % target_box)
js_script = 'return arguments[0].style.display;'
required_attributes = {'data-index-box': target_box}
graph_view_link = self.get_link_by_href(
'#index-graph-view',
required_attributes).find_element_by_tag_name('i')
list_view_link = self.get_link_by_href(
'#index-list-view',
required_attributes).find_element_by_tag_name('i')
self.click_on_link(list_view_link)
states = [driver.execute_script(js_script, list_view),
driver.execute_script(js_script, graph_view)]
self.click_on_link(graph_view_link)
states.extend([driver.execute_script(js_script, list_view),
driver.execute_script(js_script, graph_view)])
self.click_on_link(list_view_link)
states.extend([driver.execute_script(js_script, list_view),
driver.execute_script(js_script, graph_view)])
return states
def delete_vm(self, pk):
try:
# For relability reasons instead of using the JS operatation
self.driver.get("%(host)s/dashboard/vm/%(id)s/op/destroy/" % {
'host': self.conf.host,
'id': pk})
self.wait_and_accept_operation(try_wait="a[href*='/op/recover/']")
try:
status_span = WebDriverWait(
self.driver, self.conf.wait_max_sec).until(
ec.visibility_of_element_located((
By.ID, 'vm-details-state')))
WebDriverWait(status_span, self.conf.wait_max_sec).until(
ec.visibility_of_element_located((
By.CLASS_NAME, 'fa-trash-o')))
except:
# Selenium can time-out by not realising the JS refresh
url_save = self.get_url(fragment='activity')
recent_destroy_vm = self.recently(
self.get_timeline_elements(
"vm.Instance.destroy", url_save))
if not self.check_operation_result(recent_destroy_vm):
logger.warning("Selenium cannot destroy "
"the chosen %(id)s vm" % {
'id': pk})
raise Exception('Cannot destroy the specified vm')
self.driver.get('%s/dashboard/vm/%s/' % (self.conf.host, pk))
try:
WebDriverWait(self.driver, self.conf.wait_max_sec).until(
ec.visibility_of_element_located((
By.CSS_SELECTOR,
"span[data-status*='DESTROYED']")))
logger.warning(
'Successfully deleted virtual machine: id - %(pk)s' % {
'pk': pk})
return True
except:
return False
except:
logger.exception("Selenium can not destroy a VM")
self.create_screenshot()
raise Exception("Cannot destroy a VM")
...@@ -143,8 +143,8 @@ def selenium(test=""): ...@@ -143,8 +143,8 @@ def selenium(test=""):
test = "--failed" test = "--failed"
else: else:
test += " --with-id" test += " --with-id"
run("xvfb-run ./manage.py test " run('xvfb-run --server-args="-screen 0, 1920x1080x24" ./manage.py'
"--settings=circle.settings.selenium_test %s" % test) ' test --settings=circle.settings.selenium_test %s' % test)
def pull(dir="~/circle/circle"): def pull(dir="~/circle/circle"):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment