以下内容如有不对,望指正

创建网络

整体流程图

源码分析(ocata)

API服务启动

/usr/bin/neutron-server从启动脚本开始

from neutron.cmd.eventlet.server import main
def _main_neutron_server():
    if cfg.CONF.web_framework == 'legacy':
        wsgi_eventlet.eventlet_wsgi_server()
    else:
        wsgi_pecan.pecan_wsgi_server()

neutron/conf/common.py可以看到web_framework默认配置是legacy

/etc/neutron/neutron.conf并没有配置web_framework,所有这里会走到wsgi_eventlet.eventlet_wsgi_server()

def eventlet_wsgi_server():
    neutron_api = service.serve_wsgi(service.NeutronApiService)
    start_api_and_rpc_workers(neutron_api)
class NeutronApiService(WsgiService):
    """Class for neutron-api service."""
    def __init__(self, app_name):
        profiler.setup('neutron-server', cfg.CONF.host)
        super(NeutronApiService, self).__init__(app_name)
 
    @classmethod
    def create(cls, app_name='neutron'):
        # Setup logging early
        config.setup_logging()
        service = cls(app_name)
        return service

这个看到其实是WSGINeutronApiService是Neutron封装的API功能类,继承于WsgiService

def serve_wsgi(cls):
    try:
        service = cls.create()
        service.start()
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_LE('Unrecoverable error: please check log '
                              'for details.'))
 
    registry.notify(resources.PROCESS, events.BEFORE_SPAWN, service)
    return service

serve_wsgi会调用NeutronApiServicecreate()创建NeutronApiService对象,调用start()启动NeutronApiService。

class WsgiService(object):
    ...
    def start(self):
        self.wsgi_app = _run_wsgi(self.app_name)
def _run_wsgi(app_name):
    app = config.load_paste_app(app_name)
    if not app:
        LOG.error(_LE('No known API applications configured.'))
        return
    return run_wsgi_app(app)
 
 
def run_wsgi_app(app):
    server = wsgi.Server("Neutron")
    server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
                 workers=_get_api_workers())
    LOG.info(_LI("Neutron service started, listening on %(host)s:%(port)s"),
             {'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port})
    return server

_run_wsgi会根据app_name找到系统中对应的目录,加载api-paste.ini文件,然后调用run_wsgi_app()启动api服务;

到这里api启动就完成了。


创建网络

上面启动过程中加载了api-paste.ini

api-paste.ini中我们看到:

[app:neutronversions]
paste.app_factory = neutron.api.versions:Versions.factory
 
[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory

还有一个filter,其实是api调用过程中会进行一些权限过滤等操作,这里我们就不看了。

下面看看APIRouter:

class APIRouter(base_wsgi.Router):
 
    @classmethod
    def factory(cls, global_config, **local_config):
        return cls(**local_config)
 
    def __init__(self, **local_config):
        mapper = routes_mapper.Mapper()
        manager.init()
        plugin = directory.get_plugin()
        ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
        ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)
 
        col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
                          member_actions=MEMBER_ACTIONS)
 
        def _map_resource(collection, resource, params, parent=None):
            allow_bulk = cfg.CONF.allow_bulk
            controller = base.create_resource(
                collection, resource, plugin, params, allow_bulk=allow_bulk,
                parent=parent, allow_pagination=True,
                allow_sorting=True)
            path_prefix = None
            if parent:
                path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
                                                  parent['member_name'],
                                                  collection)
            mapper_kwargs = dict(controller=controller,
                                 requirements=REQUIREMENTS,
                                 path_prefix=path_prefix,
                                 **col_kwargs)
            return mapper.collection(collection, resource,
                                     **mapper_kwargs)
 
        mapper.connect('index', '/', controller=Index(RESOURCES))
        for resource in RESOURCES:
            _map_resource(RESOURCES[resource], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              RESOURCES[resource], dict()))
            resource_registry.register_resource_by_name(resource)
 
        for resource in SUB_RESOURCES:
            _map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              SUB_RESOURCES[resource]['collection_name'],
                              dict()),
                          SUB_RESOURCES[resource]['parent'])
 
        # Certain policy checks require that the extensions are loaded
        # and the RESOURCE_ATTRIBUTE_MAP populated before they can be
        # properly initialized. This can only be claimed with certainty
        # once this point in the code has been reached. In the event
        # that the policies have been initialized before this point,
        # calling reset will cause the next policy check to
        # re-initialize with all of the required data in place.
        policy.reset()
        super(APIRouter, self).__init__(mapper)

这里的东西比较多,也是API比较核心的组成部分,概括下就是初始化了urlcontroller的映射关系,也就是resource.

plugin = directory.get_plugin()

这里plugin根据代码,会发现是get的一个COREplugin,其实就是setup.cfg中的Ml2Plugin

ml2 = neutron.plugins.ml2.plugin:Ml2Plugin

注册的Controller都是neutron/api/v2/base.py中的Controller类(代码太长,就不列出来了)

class Controller(object):
    LIST = 'list'
    SHOW = 'show'
    CREATE = 'create'
    UPDATE = 'update'
    DELETE = 'delete'
    ...

当我们创建network时,其实调用的是Controllercreate()

    def create(self, request, body=None, **kwargs):
        self._notifier.info(request.context,
                            self._resource + '.create.start',
                            body)
        return self._create(request, body, **kwargs)

可以看到是发了一个notification,self._resource'network',notification是'network.create.start'.

然后调用_create()

@db_api.retry_db_errors
    def _create(self, request, body, **kwargs):
        """Creates a new instance of the requested entity."""
        parent_id = kwargs.get(self._parent_id_name)
        body = Controller.prepare_request_body(request.context,
                                               body, True,
                                               self._resource, self._attr_info,
                                               allow_bulk=self._allow_bulk)
        action = self._plugin_handlers[self.CREATE]
        # Check authz
        if self._collection in body:
            # Have to account for bulk create
            items = body[self._collection]
        else:
            items = [body]
        # Ensure policy engine is initialized
        policy.init()
        # Store requested resource amounts grouping them by tenant
        # This won't work with multiple resources. However because of the
        # current structure of this controller there will hardly be more than
        # one resource for which reservations are being made
        request_deltas = collections.defaultdict(int)
        for item in items:
            self._validate_network_tenant_ownership(request,
                                                    item[self._resource])
            policy.enforce(request.context,
                           action,
                           item[self._resource],
                           pluralized=self._collection)
            if 'tenant_id' not in item[self._resource]:
                # no tenant_id - no quota check
                continue
            tenant_id = item[self._resource]['tenant_id']
            request_deltas[tenant_id] += 1
        # Quota enforcement
        reservations = []
        try:
            for (tenant, delta) in request_deltas.items():
                reservation = quota.QUOTAS.make_reservation(
                    request.context,
                    tenant,
                    {self._resource: delta},
                    self._plugin)
                reservations.append(reservation)
        except n_exc.QuotaResourceUnknown as e:
            # We don't want to quota this resource
            LOG.debug(e)
 
        def notify(create_result):
            # Ensure usage trackers for all resources affected by this API
            # operation are marked as dirty
            with request.context.session.begin():
                # Commit the reservation(s)
                for reservation in reservations:
                    quota.QUOTAS.commit_reservation(
                        request.context, reservation.reservation_id)
                resource_registry.set_resources_dirty(request.context)
 
            notifier_method = self._resource + '.create.end'
            self._notifier.info(request.context,
                                notifier_method,
                                create_result)
            registry.notify(self._resource, events.BEFORE_RESPONSE, self,
                            context=request.context, data=create_result,
                            method_name=notifier_method,
                            collection=self._collection,
                            action=action, original={})
            return create_result
 
        def do_create(body, bulk=False, emulated=False):
            kwargs = {self._parent_id_name: parent_id} if parent_id else {}
            if bulk and not emulated:
                obj_creator = getattr(self._plugin, "%s_bulk" % action)
            else:
                obj_creator = getattr(self._plugin, action)
            try:
                if emulated:
                    return self._emulate_bulk_create(obj_creator, request,
                                                     body, parent_id)
                else:
                    if self._collection in body:
                        # This is weird but fixing it requires changes to the
                        # plugin interface
                        kwargs.update({self._collection: body})
                    else:
                        kwargs.update({self._resource: body})
                    return obj_creator(request.context, **kwargs)
            except Exception:
                # In case of failure the plugin will always raise an
                # exception. Cancel the reservation
                with excutils.save_and_reraise_exception():
                    for reservation in reservations:
                        quota.QUOTAS.cancel_reservation(
                            request.context, reservation.reservation_id)
 
        if self._collection in body and self._native_bulk:
            # plugin does atomic bulk create operations
            objs = do_create(body, bulk=True)
            # Use first element of list to discriminate attributes which
            # should be removed because of authZ policies
            fields_to_strip = self._exclude_attributes_by_policy(
                request.context, objs[0])
            return notify({self._collection: [self._filter_attributes(
                obj, fields_to_strip=fields_to_strip)
                for obj in objs]})
        else:
            if self._collection in body:
                # Emulate atomic bulk behavior
                objs = do_create(body, bulk=True, emulated=True)
                return notify({self._collection: objs})
            else:
                obj = do_create(body)
                return notify({self._resource: self._view(request.context,
                                                          obj)})

这里action = self._plugin_handlers[self.CREATE]init里面可以看到对于网络创建,其实就是create_network。 自后在_create中进行了一些验证和参数处理,最终交给了do_create()

        def do_create(body, bulk=False, emulated=False):
            kwargs = {self._parent_id_name: parent_id} if parent_id else {}
            if bulk and not emulated:
                obj_creator = getattr(self._plugin, "%s_bulk" % action)
            else:
                obj_creator = getattr(self._plugin, action)
            try:
                if emulated:
                    return self._emulate_bulk_create(obj_creator, request,
                                                     body, parent_id)
                else:
                    if self._collection in body:
                        # This is weird but fixing it requires changes to the
                        # plugin interface
                        kwargs.update({self._collection: body})
                    else:
                        kwargs.update({self._resource: body})
                    return obj_creator(request.context, **kwargs)
            except Exception:
                # In case of failure the plugin will always raise an
                # exception. Cancel the reservation
                with excutils.save_and_reraise_exception():
                    for reservation in reservations:
                        quota.QUOTAS.cancel_reservation(
                            request.context, reservation.reservation_id)

do_create中,其实获取了plugin的方法,并调用,之前看到这个plugin其实就是Ml2Plugin,actionnetwork_create

    if bulk and not emulated:
        obj_creator = getattr(self._plugin, "%s_bulk" % action)
    else:
        obj_creator = getattr(self._plugin, action)
    return obj_creator(request.context, **kwargs)

所以,其实调用了Ml2Pluginnetwork_create

    @utils.transaction_guard
    @db_api.retry_if_session_inactive()
    def create_network(self, context, network):
        self._ensure_default_security_group(context,
                                            network['network']['tenant_id'])
        result, mech_context = self._create_network_db(context, network)
        kwargs = {'context': context, 'network': result}
        registry.notify(resources.NETWORK, events.AFTER_CREATE, self, **kwargs)
        try:
            self.mechanism_manager.create_network_postcommit(mech_context)
        except ml2_exc.MechanismDriverError:
            with excutils.save_and_reraise_exception():
                LOG.error(_LE("mechanism_manager.create_network_postcommit "
                              "failed, deleting network '%s'"), result['id'])
                self.delete_network(context, result['id'])
 
        return result

这里不是特别确定 这里调用_create_network_db完成实际创建网络的过程,create_network_postcommit则是创建完成之后一些mechanism_driver的一些处理。

    def _create_network_db(self, context, network):
        net_data = network[attributes.NETWORK]
        tenant_id = net_data['tenant_id']
        session = context.session
        with session.begin(subtransactions=True):
            net_db = self.create_network_db(context, network)
            result = self._make_network_dict(net_db, process_extensions=False,
                                             context=context)
            self.extension_manager.process_create_network(context, net_data,
                                                          result)
            self._process_l3_create(context, result, net_data)
            net_data['id'] = result['id']
            self.type_manager.create_network_segments(context, net_data,
                                                      tenant_id)
            self.type_manager.extend_network_dict_provider(context, result)
            # Update the transparent vlan if configured
            if utils.is_extension_supported(self, 'vlan-transparent'):
                vlt = vlantransparent.get_vlan_transparent(net_data)
                net_db['vlan_transparent'] = vlt
                result['vlan_transparent'] = vlt
 
            result[api.MTU] = self._get_network_mtu(result)
 
            if az_ext.AZ_HINTS in net_data:
                self.validate_availability_zones(context, 'network',
                                                 net_data[az_ext.AZ_HINTS])
                az_hints = az_ext.convert_az_list_to_string(
                                                net_data[az_ext.AZ_HINTS])
                net_db[az_ext.AZ_HINTS] = az_hints
                result[az_ext.AZ_HINTS] = az_hints
 
            mech_context = driver_context.NetworkContext(self, context,
                                                         result)
            self.mechanism_manager.create_network_precommit(mech_context)
 
        self._apply_dict_extend_functions('networks', result, net_db)
        return result, mech_context

这里首先通过create_network_db来创建数据库模型,使用_make_network_dict进行一些格式化,之后就是3个创建:

self.extension_manager.process_create_network(context, net_data, result)
self._process_l3_create(context, result, net_data)
net_data['id'] = result['id']
self.type_manager.create_network_segments(context, net_data, tenant_id)

process_create_network跟到最后发现是db的操作。

_process_l3_create会创建notify通知插件完成创建操作。

create_network_segments create network segments