Source code for openstack_dashboard.test.api_tests.network_tests

# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright 2013 NEC Corporation
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import copy
import itertools
import uuid

from django import http
from mox import IsA  # noqa

from novaclient.v1_1 import floating_ip_pools

from openstack_dashboard import api
from openstack_dashboard.test import helpers as test


[docs]class NetworkClientTestCase(test.APITestCase):
[docs] def test_networkclient_no_neutron(self): self.mox.StubOutWithMock(api.base, 'is_service_enabled') api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \ .AndReturn(False) self.mox.ReplayAll() nc = api.network.NetworkClient(self.request) self.assertIsInstance(nc.floating_ips, api.nova.FloatingIpManager) self.assertIsInstance(nc.secgroups, api.nova.SecurityGroupManager)
[docs] def test_networkclient_neutron(self): self.mox.StubOutWithMock(api.base, 'is_service_enabled') api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \ .AndReturn(True) self.neutronclient = self.stub_neutronclient() self.neutronclient.list_extensions() \ .AndReturn({'extensions': self.api_extensions.list()}) self.mox.ReplayAll() nc = api.network.NetworkClient(self.request) self.assertIsInstance(nc.floating_ips, api.neutron.FloatingIpManager) self.assertIsInstance(nc.secgroups, api.neutron.SecurityGroupManager)
[docs] def test_networkclient_neutron_with_nova_security_group(self): self.mox.StubOutWithMock(api.base, 'is_service_enabled') api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \ .AndReturn(True) self.neutronclient = self.stub_neutronclient() self.neutronclient.list_extensions().AndReturn({'extensions': []}) self.mox.ReplayAll() nc = api.network.NetworkClient(self.request) self.assertIsInstance(nc.floating_ips, api.neutron.FloatingIpManager) self.assertIsInstance(nc.secgroups, api.nova.SecurityGroupManager)
[docs]class NetworkApiNovaTestBase(test.APITestCase):
[docs] def setUp(self): super(NetworkApiNovaTestBase, self).setUp() self.mox.StubOutWithMock(api.base, 'is_service_enabled') api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \ .AndReturn(False)
[docs]class NetworkApiNovaSecurityGroupTests(NetworkApiNovaTestBase):
[docs] def test_server_update_security_groups(self): all_secgroups = self.security_groups.list() added_secgroup = all_secgroups[2] rm_secgroup = all_secgroups[0] cur_secgroups_raw = [{'id': sg.id, 'name': sg.name, 'rules': []} for sg in all_secgroups[0:2]] cur_secgroups_ret = {'security_groups': cur_secgroups_raw} new_sg_ids = [sg.id for sg in all_secgroups[1:3]] instance_id = self.servers.first().id novaclient = self.stub_novaclient() novaclient.security_groups = self.mox.CreateMockAnything() novaclient.servers = self.mox.CreateMockAnything() novaclient.client = self.mox.CreateMockAnything() novaclient.security_groups.list().AndReturn(all_secgroups) url = '/servers/%s/os-security-groups' % instance_id novaclient.client.get(url).AndReturn((200, cur_secgroups_ret)) novaclient.servers.add_security_group(instance_id, added_secgroup.name) novaclient.servers.remove_security_group(instance_id, rm_secgroup.name) self.mox.ReplayAll() api.network.server_update_security_groups( self.request, instance_id, new_sg_ids)
[docs]class NetworkApiNovaFloatingIpTests(NetworkApiNovaTestBase):
[docs] def test_floating_ip_pools_list(self): pool_names = ['pool1', 'pool2'] pools = [floating_ip_pools.FloatingIPPool( None, {'name': pool}) for pool in pool_names] novaclient = self.stub_novaclient() novaclient.floating_ip_pools = self.mox.CreateMockAnything() novaclient.floating_ip_pools.list().AndReturn(pools) self.mox.ReplayAll() ret = api.network.floating_ip_pools_list(self.request) self.assertEqual([p.name for p in ret], pool_names)
[docs] def test_floating_ip_list(self): fips = self.api_floating_ips.list() novaclient = self.stub_novaclient() novaclient.floating_ips = self.mox.CreateMockAnything() novaclient.floating_ips.list().AndReturn(fips) self.mox.ReplayAll() ret = api.network.tenant_floating_ip_list(self.request) for r, e in zip(ret, fips): for attr in ['id', 'ip', 'pool', 'fixed_ip', 'instance_id']: self.assertEqual(getattr(r, attr), getattr(e, attr)) self.assertEqual(r.port_id, e.instance_id)
[docs] def test_floating_ip_get(self): fip = self.api_floating_ips.first() novaclient = self.stub_novaclient() novaclient.floating_ips = self.mox.CreateMockAnything() novaclient.floating_ips.get(fip.id).AndReturn(fip) self.mox.ReplayAll() ret = api.network.tenant_floating_ip_get(self.request, fip.id) for attr in ['id', 'ip', 'pool', 'fixed_ip', 'instance_id']: self.assertEqual(getattr(ret, attr), getattr(fip, attr)) self.assertEqual(ret.port_id, fip.instance_id)
[docs] def test_floating_ip_allocate(self): pool_name = 'fip_pool' fip = self.api_floating_ips.first() novaclient = self.stub_novaclient() novaclient.floating_ips = self.mox.CreateMockAnything() novaclient.floating_ips.create(pool=pool_name).AndReturn(fip) self.mox.ReplayAll() ret = api.network.tenant_floating_ip_allocate(self.request, pool_name) for attr in ['id', 'ip', 'pool', 'fixed_ip', 'instance_id']: self.assertEqual(getattr(ret, attr), getattr(fip, attr)) self.assertEqual(ret.port_id, fip.instance_id)
[docs] def test_floating_ip_release(self): fip = self.api_floating_ips.first() novaclient = self.stub_novaclient() novaclient.floating_ips = self.mox.CreateMockAnything() novaclient.floating_ips.delete(fip.id) self.mox.ReplayAll() api.network.tenant_floating_ip_release(self.request, fip.id)
[docs] def test_floating_ip_associate(self): server = api.nova.Server(self.servers.first(), self.request) floating_ip = self.floating_ips.first() novaclient = self.stub_novaclient() novaclient.floating_ips = self.mox.CreateMockAnything() novaclient.servers = self.mox.CreateMockAnything() novaclient.servers.get(server.id).AndReturn(server) novaclient.floating_ips.get(floating_ip.id).AndReturn(floating_ip) novaclient.servers.add_floating_ip(server.id, floating_ip.ip) \ .AndReturn(server) self.mox.ReplayAll() api.network.floating_ip_associate(self.request, floating_ip.id, server.id)
[docs] def test_floating_ip_disassociate(self): server = api.nova.Server(self.servers.first(), self.request) floating_ip = self.api_floating_ips.first() novaclient = self.stub_novaclient() novaclient.servers = self.mox.CreateMockAnything() novaclient.floating_ips = self.mox.CreateMockAnything() novaclient.servers.get(server.id).AndReturn(server) novaclient.floating_ips.get(floating_ip.id).AndReturn(floating_ip) novaclient.servers.remove_floating_ip(server.id, floating_ip.ip) \ .AndReturn(server) self.mox.ReplayAll() api.network.floating_ip_disassociate(self.request, floating_ip.id, server.id)
[docs] def test_floating_ip_target_list(self): servers = self.servers.list() novaclient = self.stub_novaclient() novaclient.servers = self.mox.CreateMockAnything() novaclient.servers.list().AndReturn(servers) self.mox.ReplayAll() targets = api.network.floating_ip_target_list(self.request) for target, server in zip(targets, servers): self.assertEqual(target.id, server.id) self.assertEqual(target.name, '%s (%s)' % (server.name, server.id))
[docs] def test_floating_ip_target_get_by_instance(self): self.mox.ReplayAll() instance_id = self.servers.first().id ret = api.network.floating_ip_target_get_by_instance(self.request, instance_id) self.assertEqual(instance_id, ret)
[docs]class NetworkApiNeutronTestBase(test.APITestCase):
[docs] def setUp(self): super(NetworkApiNeutronTestBase, self).setUp() self.mox.StubOutWithMock(api.base, 'is_service_enabled') api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \ .AndReturn(True) self.qclient = self.stub_neutronclient() self.qclient.list_extensions() \ .AndReturn({'extensions': self.api_extensions.list()})
[docs]class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
[docs] def setUp(self): super(NetworkApiNeutronSecurityGroupTests, self).setUp() self.sg_dict = dict([(sg['id'], sg['name']) for sg in self.api_q_secgroups.list()])
def _cmp_sg_rule(self, exprule, retrule): self.assertEqual(exprule['id'], retrule.id) self.assertEqual(exprule['security_group_id'], retrule.parent_group_id) self.assertEqual(exprule['direction'], retrule.direction) self.assertEqual(exprule['ethertype'], retrule.ethertype) self.assertEqual(exprule['port_range_min'], retrule.from_port) self.assertEqual(exprule['port_range_max'], retrule.to_port) if (exprule['remote_ip_prefix'] is None and exprule['remote_group_id'] is None): expcidr = ('::/0' if exprule['ethertype'] == 'IPv6' else '0.0.0.0/0') else: expcidr = exprule['remote_ip_prefix'] self.assertEqual(expcidr, retrule.ip_range.get('cidr')) self.assertEqual(self.sg_dict.get(exprule['remote_group_id']), retrule.group.get('name')) def _cmp_sg(self, exp_sg, ret_sg): self.assertEqual(exp_sg['id'], ret_sg.id) self.assertEqual(exp_sg['name'], ret_sg.name) exp_rules = exp_sg['security_group_rules'] self.assertEqual(len(exp_rules), len(ret_sg.rules)) for (exprule, retrule) in itertools.izip(exp_rules, ret_sg.rules): self._cmp_sg_rule(exprule, retrule)
[docs] def test_security_group_list(self): sgs = self.api_q_secgroups.list() tenant_id = self.request.user.tenant_id # use deepcopy to ensure self.api_q_secgroups is not modified. self.qclient.list_security_groups(tenant_id=tenant_id) \ .AndReturn({'security_groups': copy.deepcopy(sgs)}) self.mox.ReplayAll() rets = api.network.security_group_list(self.request) self.assertEqual(len(sgs), len(rets)) for (exp, ret) in itertools.izip(sgs, rets): self._cmp_sg(exp, ret)
[docs] def test_security_group_get(self): secgroup = self.api_q_secgroups.first() sg_ids = set([secgroup['id']] + [rule['remote_group_id'] for rule in secgroup['security_group_rules'] if rule['remote_group_id']]) related_sgs = [sg for sg in self.api_q_secgroups.list() if sg['id'] in sg_ids] # use deepcopy to ensure self.api_q_secgroups is not modified. self.qclient.show_security_group(secgroup['id']) \ .AndReturn({'security_group': copy.deepcopy(secgroup)}) self.qclient.list_security_groups(id=sg_ids, fields=['id', 'name']) \ .AndReturn({'security_groups': related_sgs}) self.mox.ReplayAll() ret = api.network.security_group_get(self.request, secgroup['id']) self._cmp_sg(secgroup, ret)
[docs] def test_security_group_create(self): secgroup = self.api_q_secgroups.list()[1] body = {'security_group': {'name': secgroup['name'], 'description': secgroup['description']}} self.qclient.create_security_group(body) \ .AndReturn({'security_group': copy.deepcopy(secgroup)}) self.mox.ReplayAll() ret = api.network.security_group_create(self.request, secgroup['name'], secgroup['description']) self._cmp_sg(secgroup, ret)
[docs] def test_security_group_update(self): secgroup = self.api_q_secgroups.list()[1] secgroup = copy.deepcopy(secgroup) secgroup['name'] = 'newname' secgroup['description'] = 'new description' body = {'security_group': {'name': secgroup['name'], 'description': secgroup['description']}} self.qclient.update_security_group(secgroup['id'], body) \ .AndReturn({'security_group': secgroup}) self.mox.ReplayAll() ret = api.network.security_group_update(self.request, secgroup['id'], secgroup['name'], secgroup['description']) self._cmp_sg(secgroup, ret)
[docs] def test_security_group_delete(self): secgroup = self.api_q_secgroups.first() self.qclient.delete_security_group(secgroup['id']) self.mox.ReplayAll() api.network.security_group_delete(self.request, secgroup['id'])
[docs] def test_security_group_rule_create(self): sg_rule = [r for r in self.api_q_secgroup_rules.list() if r['protocol'] == 'tcp' and r['remote_ip_prefix']][0] sg_id = sg_rule['security_group_id'] secgroup = [sg for sg in self.api_q_secgroups.list() if sg['id'] == sg_id][0] post_rule = copy.deepcopy(sg_rule) del post_rule['id'] del post_rule['tenant_id'] post_body = {'security_group_rule': post_rule} self.qclient.create_security_group_rule(post_body) \ .AndReturn({'security_group_rule': copy.deepcopy(sg_rule)}) self.qclient.list_security_groups(id=set([sg_id]), fields=['id', 'name']) \ .AndReturn({'security_groups': [copy.deepcopy(secgroup)]}) self.mox.ReplayAll() ret = api.network.security_group_rule_create( self.request, sg_rule['security_group_id'], sg_rule['direction'], sg_rule['ethertype'], sg_rule['protocol'], sg_rule['port_range_min'], sg_rule['port_range_max'], sg_rule['remote_ip_prefix'], sg_rule['remote_group_id']) self._cmp_sg_rule(sg_rule, ret)
[docs] def test_security_group_rule_delete(self): sg_rule = self.api_q_secgroup_rules.first() self.qclient.delete_security_group_rule(sg_rule['id']) self.mox.ReplayAll() api.network.security_group_rule_delete(self.request, sg_rule['id'])
def _get_instance(self, cur_sg_ids): instance_port = [p for p in self.api_ports.list() if p['device_owner'].startswith('compute:')][0] instance_id = instance_port['device_id'] # Emulate an intance with two ports instance_ports = [] for _i in range(2): p = copy.deepcopy(instance_port) p['id'] = str(uuid.uuid4()) p['security_groups'] = cur_sg_ids instance_ports.append(p) return (instance_id, instance_ports)
[docs] def test_server_security_groups(self): cur_sg_ids = [sg['id'] for sg in self.api_q_secgroups.list()[:2]] instance_id, instance_ports = self._get_instance(cur_sg_ids) self.qclient.list_ports(device_id=instance_id) \ .AndReturn({'ports': instance_ports}) secgroups = copy.deepcopy(self.api_q_secgroups.list()) self.qclient.list_security_groups(id=set(cur_sg_ids)) \ .AndReturn({'security_groups': secgroups}) self.mox.ReplayAll() api.network.server_security_groups(self.request, instance_id)
[docs] def test_server_update_security_groups(self): cur_sg_ids = [self.api_q_secgroups.first()['id']] new_sg_ids = [sg['id'] for sg in self.api_q_secgroups.list()[:2]] instance_id, instance_ports = self._get_instance(cur_sg_ids) self.qclient.list_ports(device_id=instance_id) \ .AndReturn({'ports': instance_ports}) for p in instance_ports: body = {'port': {'security_groups': new_sg_ids}} self.qclient.update_port(p['id'], body=body).AndReturn({'port': p}) self.mox.ReplayAll() api.network.server_update_security_groups( self.request, instance_id, new_sg_ids)
[docs] def test_security_group_backend(self): self.mox.ReplayAll() self.assertEqual(api.network.security_group_backend(self.request), 'neutron')
[docs]class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
[docs] def test_floating_ip_pools_list(self): search_opts = {'router:external': True} ext_nets = [n for n in self.api_networks.list() if n['router:external']] self.qclient.list_networks(**search_opts) \ .AndReturn({'networks': ext_nets}) self.mox.ReplayAll() rets = api.network.floating_ip_pools_list(self.request) for attr in ['id', 'name']: self.assertEqual([getattr(p, attr) for p in rets], [p[attr] for p in ext_nets])
[docs] def test_floating_ip_list(self): fips = self.api_q_floating_ips.list() filters = {'tenant_id': self.request.user.tenant_id} self.qclient.list_floatingips(**filters) \ .AndReturn({'floatingips': fips}) self.qclient.list_ports(**filters) \ .AndReturn({'ports': self.api_ports.list()}) self.mox.ReplayAll() rets = api.network.tenant_floating_ip_list(self.request) assoc_port = self.api_ports.list()[1] self.assertEqual(len(fips), len(rets)) for ret, exp in zip(rets, fips): for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']: self.assertEqual(getattr(ret, attr), exp[attr]) if exp['port_id']: dev_id = assoc_port['device_id'] if exp['port_id'] else None self.assertEqual(ret.instance_id, dev_id)
[docs] def test_floating_ip_get_associated(self): fip = self.api_q_floating_ips.list()[1] assoc_port = self.api_ports.list()[1] self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip}) self.qclient.show_port(assoc_port['id']) \ .AndReturn({'port': assoc_port}) self.mox.ReplayAll() ret = api.network.tenant_floating_ip_get(self.request, fip['id']) for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']: self.assertEqual(getattr(ret, attr), fip[attr]) self.assertEqual(ret.instance_id, assoc_port['device_id'])
[docs] def test_floating_ip_get_unassociated(self): fip = self.api_q_floating_ips.list()[0] self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip}) self.mox.ReplayAll() ret = api.network.tenant_floating_ip_get(self.request, fip['id']) for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']: self.assertEqual(getattr(ret, attr), fip[attr]) self.assertIsNone(ret.instance_id)
[docs] def test_floating_ip_allocate(self): ext_nets = [n for n in self.api_networks.list() if n['router:external']] ext_net = ext_nets[0] fip = self.api_q_floating_ips.first() self.qclient.create_floatingip( {'floatingip': {'floating_network_id': ext_net['id']}}) \ .AndReturn({'floatingip': fip}) self.mox.ReplayAll() ret = api.network.tenant_floating_ip_allocate(self.request, ext_net['id']) for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']: self.assertEqual(getattr(ret, attr), fip[attr]) self.assertIsNone(ret.instance_id)
[docs] def test_floating_ip_release(self): fip = self.api_q_floating_ips.first() self.qclient.delete_floatingip(fip['id']) self.mox.ReplayAll() api.network.tenant_floating_ip_release(self.request, fip['id'])
[docs] def test_floating_ip_associate(self): fip = self.api_q_floating_ips.list()[1] assoc_port = self.api_ports.list()[1] ip_address = assoc_port['fixed_ips'][0]['ip_address'] target_id = '%s_%s' % (assoc_port['id'], ip_address) params = {'port_id': assoc_port['id'], 'fixed_ip_address': ip_address} self.qclient.update_floatingip(fip['id'], {'floatingip': params}) self.mox.ReplayAll() api.network.floating_ip_associate(self.request, fip['id'], target_id)
[docs] def test_floating_ip_disassociate(self): fip = self.api_q_floating_ips.list()[1] assoc_port = self.api_ports.list()[1] ip_address = assoc_port['fixed_ips'][0]['ip_address'] target_id = '%s_%s' % (assoc_port['id'], ip_address) self.qclient.update_floatingip(fip['id'], {'floatingip': {'port_id': None}}) self.mox.ReplayAll() api.network.floating_ip_disassociate(self.request, fip['id'], target_id)
def _get_target_id(self, port): param = {'id': port['id'], 'addr': port['fixed_ips'][0]['ip_address']} return '%(id)s_%(addr)s' % param def _get_target_name(self, port): param = {'svrid': port['device_id'], 'addr': port['fixed_ips'][0]['ip_address']} return 'server_%(svrid)s: %(addr)s' % param
[docs] def test_floating_ip_target_list(self): ports = self.api_ports.list() target_ports = [(self._get_target_id(p), self._get_target_name(p)) for p in ports if not p['device_owner'].startswith('network:')] filters = {'tenant_id': self.request.user.tenant_id} self.qclient.list_ports(**filters).AndReturn({'ports': ports}) servers = self.servers.list() novaclient = self.stub_novaclient() novaclient.servers = self.mox.CreateMockAnything() search_opts = {'project_id': self.request.user.tenant_id} novaclient.servers.list(True, search_opts).AndReturn(servers) self.mox.ReplayAll() rets = api.network.floating_ip_target_list(self.request) self.assertEqual(len(rets), len(target_ports)) for ret, exp in zip(rets, target_ports): self.assertEqual(ret.id, exp[0]) self.assertEqual(ret.name, exp[1])
[docs] def test_floating_ip_target_get_by_instance(self): ports = self.api_ports.list() candidates = [p for p in ports if p['device_id'] == '1'] search_opts = {'device_id': '1'} self.qclient.list_ports(**search_opts).AndReturn({'ports': candidates}) self.mox.ReplayAll() ret = api.network.floating_ip_target_get_by_instance(self.request, '1') self.assertEqual(ret, self._get_target_id(candidates[0]))
[docs] def test_target_floating_ip_port_by_instance(self): ports = self.api_ports.list() candidates = [p for p in ports if p['device_id'] == '1'] search_opts = {'device_id': '1'} self.qclient.list_ports(**search_opts).AndReturn({'ports': candidates}) self.mox.ReplayAll() ret = api.network.floating_ip_target_list_by_instance(self.request, '1') self.assertEqual(ret[0], self._get_target_id(candidates[0])) self.assertEqual(len(ret), len(candidates))