繼續《nova boot代碼流程分析(三):nova與neutron的交互(1)》的分析。
#/nova/virt/libvirt/driver.py:LibvirtDriver
# NOTE(ilyaalekseyev): Implementation like in multinics
# for xenapi(tr3buchet)
def spawn(self, context, instance, image_meta, injected_files,
admin_password, network_info=None, block_device_info=None):
disk_info = blockinfo.get_disk_info(CONF.libvirt.virt_type,
instance,
image_meta,
block_device_info)
self._create_image(context, instance,
disk_info['mapping'],
network_info=network_info,
block_device_info=block_device_info,
files=injected_files,
admin_pass=admin_password)
xml = self._get_guest_xml(context, instance, network_info,
disk_info, image_meta,
block_device_info=block_device_info,
write_to_disk=True)
self._create_domain_and_network(context, xml, instance, network_info,
disk_info,
block_device_info=block_device_info)
LOG.debug("Instance is running", instance=instance)
def _wait_for_boot():
"""Called at an interval until the VM is running."""
state = self.get_info(instance).state
if state == power_state.RUNNING:
LOG.info(_LI("Instance spawned successfully."),
instance=instance)
raise loopingcall.LoopingCallDone()
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_boot)
timer.start(interval=0.5).wait()
我們這裡主要分析xml形成過程中的tap設備的寫入以及_create_domain_and_network函數中與network有關的操作。
#/nova/virt/libvirt/driver.py:LibvirtDriver
def _get_guest_xml(self, context, instance, network_info, disk_info,
image_meta, rescue=None,
block_device_info=None, write_to_disk=False):
# NOTE(danms): Stringifying a NetworkInfo will take a lock. Do
# this ahead of time so that we don't acquire it while also
# holding the logging lock.
network_info_str = str(network_info)
msg = ('Start _get_guest_xml '
'network_info=%(network_info)s '
'disk_info=%(disk_info)s '
'image_meta=%(image_meta)s rescue=%(rescue)s '
'block_device_info=%(block_device_info)s' %
{'network_info': network_info_str, 'disk_info': disk_info,
'image_meta': image_meta, 'rescue': rescue,
'block_device_info': block_device_info})
# NOTE(mriedem): block_device_info can contain auth_password so we
# need to sanitize the password in the message.
LOG.debug(strutils.mask_password(msg), instance=instance)
conf = self._get_guest_config(instance, network_info, image_meta,
disk_info, rescue, block_device_info,
context)
xml = conf.to_xml()
if write_to_disk:
instance_dir = libvirt_utils.get_instance_path(instance)
xml_path = os.path.join(instance_dir, 'libvirt.xml')
libvirt_utils.write_to_file(xml_path, xml)
LOG.debug('End _get_guest_xml xml=%(xml)s',
{'xml': xml}, instance=instance)
return xml
#/nova/virt/libvirt/driver.py:LibvirtDriver
def _get_guest_config(self, instance, network_info, image_meta,
disk_info, rescue=None, block_device_info=None,
context=None):
"""Get config data for parameters.
:param rescue: optional dictionary that should contain the key
'ramdisk_id' if a ramdisk is needed for the rescue image and
'kernel_id' if a kernel is needed for the rescue image.
"""
flavor = instance.flavor
inst_path = libvirt_utils.get_instance_path(instance)
disk_mapping = disk_info['mapping']
img_meta_prop = image_meta.get('properties', {}) if image_meta else {}
virt_type = CONF.libvirt.virt_type
... ... ...
for config in storage_configs:
guest.add_device(config)
for vif in network_info:
config = self.vif_driver.get_config(
instance, vif, image_meta,
flavor, virt_type)
guest.add_device(config)
... ... ...
# Memory balloon device only support 'qemu/kvm' and 'xen' hypervisor
if (virt_type in ('xen', 'qemu', 'kvm') and
CONF.libvirt.mem_stats_period_seconds > 0):
balloon = vconfig.LibvirtConfigMemoryBalloon()
if virt_type in ('qemu', 'kvm'):
balloon.model = 'virtio'
else:
balloon.model = 'xen'
balloon.period = CONF.libvirt.mem_stats_period_seconds
guest.add_device(balloon)
return guest
這裡tap字符設置寫到xml中的代碼為self.vif_driver.get_config函數。
#/nova/virt/libvirt/vif.py:LibvirtGenericVIFDriver
def get_config(self, instance, vif, image_meta,
inst_type, virt_type):
vif_type = vif['type']
LOG.debug('vif_type=%(vif_type)s instance=%(instance)s '
'vif=%(vif)s virt_type%(virt_type)s',
{'vif_type': vif_type, 'instance': instance,
'vif': vif, 'virt_type': virt_type})
if vif_type is None:
raise exception.NovaException(
_("vif_type parameter must be present "
"for this vif_driver implementation"))
vif_slug = self._normalize_vif_type(vif_type)
func = getattr(self, 'get_config_%s' % vif_slug, None)
if not func:
raise exception.NovaException(
_("Unexpected vif_type=%s") % vif_type)
return func(instance, vif, image_meta,
inst_type, virt_type)
這裡vif_type為bridge,因為neutron采用的mechanism driver為linuxbridge,所以get_config中獲取的func為get_config_bridge函數。
#/nova/virt/libvirt/vif.py:LibvirtGenericVIFDriver
def get_config_bridge(self, instance, vif, image_meta,
inst_type, virt_type):
"""Get VIF configurations for bridge type."""
conf = self.get_base_config(instance, vif, image_meta,
inst_type, virt_type)
designer.set_vif_host_backend_bridge_config(
conf, self.get_bridge_name(vif),
self.get_vif_devname(vif))
mac_id = vif['address'].replace(':', '')
name = "nova-instance-" + instance.name + "-" + mac_id
if self.get_firewall_required(vif):
conf.filtername = name
designer.set_vif_bandwidth_config(conf, inst_type)
return conf
#/nova/virt/libvirt/vif.py:LibvirtGenericVIFDriver
def get_vif_devname(self, vif):
if 'devname' in vif:
return vif['devname']
return ("nic" + vif['id'])[:network_model.NIC_NAME_LEN]
#/nova/virt/libvirt/vif.py:LibvirtGenericVIFDriver
def get_bridge_name(self, vif):
return vif['network']['bridge']
這裡get_config_bridge函數利用neutron返回的port信息,構建vconfig.LibvirtConfigGuestInterface()對象所需的信息,最終形成的與network有關的xml信息為:
<interface type='bridge'>
<mac address='fa:16:3e:14:51:ac'/>
<source bridge='brq5eea5aca-a1'/>
<target dev='tap80610538-99'/>
<model type='virtio'/>
<driver name='qemu'/>
<alias name='net0'/>
<address type='pci' domain='0x0000' bus='0x00' slot='0x03' function='0x0'/>
</interface>
目前tap設備的xml信息已經形成(但還未實際被創建),那麼tap設備的實際創建應該在_create_domain_and_network函數中。
#/nova/virt/libvirt/driver.py:LibvirtDriver
def _create_domain_and_network(self, context, xml, instance, network_info,
disk_info, block_device_info=None,
power_on=True, reboot=False,
vifs_already_plugged=False):
"""Do required network setup and create domain."""
block_device_mapping = driver.block_device_info_get_mapping(
block_device_info)
image_meta = utils.get_image_from_system_metadata(
instance.system_metadata)
for vol in block_device_mapping:
connection_info = vol['connection_info']
if (not reboot and 'data' in connection_info and
'volume_id' in connection_info['data']):
volume_id = connection_info['data']['volume_id']
encryption = encryptors.get_encryption_metadata(
context, self._volume_api, volume_id, connection_info)
if encryption:
encryptor = self._get_volume_encryptor(connection_info,
encryption)
encryptor.attach_volume(context, **encryption)
timeout = CONF.vif_plugging_timeout
if (self._conn_supports_start_paused and
utils.is_neutron() and not
vifs_already_plugged and power_on and timeout):
events = self._get_neutron_events(network_info)
else:
events = []
launch_flags = events and libvirt.VIR_DOMAIN_START_PAUSED or 0
domain = None
try:
with self.virtapi.wait_for_instance_event(
instance, events, deadline=timeout,
error_callback=self._neutron_failed_callback):
self.plug_vifs(instance, network_info)
self.firewall_driver.setup_basic_filtering(instance,
network_info)
self.firewall_driver.prepare_instance_filter(instance,
network_info)
with self._lxc_disk_handler(instance, image_meta,
block_device_info, disk_info):
domain = self._create_domain(
xml, instance=instance,
launch_flags=launch_flags,
power_on=power_on)
self.firewall_driver.apply_instance_filter(instance,
network_info)
except exception.VirtualInterfaceCreateException:
# Neutron reported failure and we didn't swallow it, so
# bail here
with excutils.save_and_reraise_exception():
if domain:
domain.destroy()
self.cleanup(context, instance, network_info=network_info,
block_device_info=block_device_info)
except eventlet.timeout.Timeout:
# We never heard from Neutron
LOG.warn(_LW('Timeout waiting for vif plugging callback for '
'instance %(uuid)s'), {'uuid': instance.uuid})
if CONF.vif_plugging_is_fatal:
if domain:
domain.destroy()
self.cleanup(context, instance, network_info=network_info,
block_device_info=block_device_info)
raise exception.VirtualInterfaceCreateException()
# Resume only if domain has been paused
if launch_flags & libvirt.VIR_DOMAIN_START_PAUSED:
domain.resume()
return domain
對於_create_domain_and_network函數,我們主要分析nova等待event 事件 (與neutron服務相關,即nova-event-callback機制,其blueprints鏈接為:https://blueprints.launchpad.net/neutron/+spec/nova-event-callback)
其代碼流程為:在_create_domain_and_network函數的self.virtapi.wait_for_instance_event代碼會創建一個定時器,等待neutron傳遞過來event事件,那麼怎麼才會觸發neutron發送一個event給nova呢?
當nova調用底層libvirt接口創建VM時,將會根據xml信息去創建,此時會創建一個真實的tap設備(不是創建port信息到neutron數據庫),而有nova-compute服務的host的mechanism driver對應的agent會定時的檢測device的狀態(update或delete等),對於mechanismdriver為linuxbridge的agent的默認定時檢測時間為2s,當neutron-linuxbridge-agent服務(因為本環境采用的mechanism driver為linuxbridge)檢測到有新的tap設備被增加,則會對該port設置相應的security
group rule(利用linux系統自帶的iptables去設置),然後更新neutron數據庫中相應的表信息。
而neutron數據庫中的ports表在neutron-server服務啟動時創建Ml2Plugin對象時,便被sqlalchemy自帶的event進行監聽,當ports表status字段被set時,neutron便會為nova准備event事件(並未發送),待port表被after_update或after_insert後,neutron則會將准備好的event事件發送給nova。
neutron發送event事件給nova采用HTTP請求方式,且HTTP請求發送到nova-api的extension resource上,最終通過rpc調用nova-compute將收到event事件,當nova-compute收到event事件後,表示port相關信息准備完畢,nova結束等待。此時待VM處於running狀態,表示VM創建完成。
首先,nova側獲取需要等待的event事件
#/nova/virt/libvirt/driver.py:LibvirtDriver
def _get_neutron_events(self, network_info):
# NOTE(danms): We need to collect any VIFs that are currently
# down that we expect a down->up event for. Anything that is
# already up will not undergo that transition, and for
# anything that might be stale (cache-wise) assume it's
# already up so we don't block on it.
return [('network-vif-plugged', vif['id'])
for vif in network_info if vif.get('active', True) is False]
這裡,遍歷從neutron中獲得的network_info的vif中key為active的值,如果有任何一個vif的active的值為False,則需等待network-vif-pluggend事件。
下面看看nova側的self.virtapi.wait_for_instance_event等待event的代碼。
#/nova/compute/manager.py:ComputeVirtAPI
@contextlib.contextmanager
def wait_for_instance_event(self, instance, event_names, deadline=300,
error_callback=None):
"""Plan to wait for some events, run some code, then wait.
This context manager will first create plans to wait for the
provided event_names, yield, and then wait for all the scheduled
events to complete.
Note that this uses an eventlet.timeout.Timeout to bound the
operation, so callers should be prepared to catch that
failure and handle that situation appropriately.
If the event is not received by the specified timeout deadline,
eventlet.timeout.Timeout is raised.
If the event is received but did not have a 'completed'
status, a NovaException is raised. If an error_callback is
provided, instead of raising an exception as detailed above
for the failure case, the callback will be called with the
event_name and instance, and can return True to continue
waiting for the rest of the events, False to stop processing,
or raise an exception which will bubble up to the waiter.
:param instance: The instance for which an event is expected
:param event_names: A list of event names. Each element can be a
string event name or tuple of strings to
indicate (name, tag).
:param deadline: Maximum number of seconds we should wait for all
of the specified events to arrive.
:param error_callback: A function to be called if an event arrives
"""
if error_callback is None:
error_callback = self._default_error_callback
events = {}
for event_name in event_names:
if isinstance(event_name, tuple):
name, tag = event_name
event_name = objects.InstanceExternalEvent.make_key(
name, tag)
try:
events[event_name] = (
self._compute.instance_events.prepare_for_instance_event(
instance, event_name))
except exception.NovaException:
error_callback(event_name, instance)
# NOTE(danms): Don't wait for any of the events. They
# should all be canceled and fired immediately below,
# but don't stick around if not.
deadline = 0
yield
with eventlet.timeout.Timeout(deadline):
for event_name, event in events.items():
actual_event = event.wait()
if actual_event.status == 'completed':
continue
decision = error_callback(event_name, instance)
if decision is False:
break
在wait_for_instance_event函數被執行with開始時,將首先執行wait_for_instance_event函數yield之前的代碼(具體查看python的@contextlib.contextmanager特性),而yield之前的代碼即為構造等待的event事件。待wait_for_instance_event函數被執行with內部的代碼結束時,執行wait_for_instance_event函數yield之後的代碼,即等待neutron發送nova所需的event事件。其等待時間為/etc/nova/nova.conf配置文件中的vif_plugging_timeout參數值,其默認值為300s。
如此,VM便開始調用底層libvirt的接口創建VM,同時nova側去等待neutron側發送network-vif-pluggend事件。
下面我們轉到neutron側,分析neutron-linuxbridge-agent服務如何檢測tap設備以及neutron-server如何發送event事件給nova-api。
注意:閱讀下面內容之前,先大致浏覽一下我的《neutron-server的啟動流程》的文章。
首先,我們需要找到neutron-linuxbridge-agent的入口。即查看setup.cfg文件。
[entry_points]
console_scripts =
neutron-db-manage = neutron.db.migration.cli:main
neutron-debug = neutron.debug.shell:main
neutron-dhcp-agent = neutron.cmd.eventlet.agents.dhcp:main
neutron-hyperv-agent = neutron.cmd.eventlet.plugins.hyperv_neutron_agent:main
neutron-keepalived-state-change = neutron.cmd.keepalived_state_change:main
neutron-ibm-agent = neutron.plugins.ibm.agent.sdnve_neutron_agent:main
neutron-l3-agent = neutron.cmd.eventlet.agents.l3:main
neutron-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main
neutron-metadata-agent = neutron.cmd.eventlet.agents.metadata:main
neutron-mlnx-agent = neutron.cmd.eventlet.plugins.mlnx_neutron_agent:main
neutron-nec-agent = neutron.cmd.eventlet.plugins.nec_neutron_agent:main
neutron-netns-cleanup = neutron.cmd.netns_cleanup:main
neutron-ns-metadata-proxy = neutron.cmd.eventlet.agents.metadata_proxy:main
neutron-ovsvapp-agent = neutron.cmd.eventlet.plugins.ovsvapp_neutron_agent:main
neutron-nvsd-agent = neutron.plugins.oneconvergence.agent.nvsd_neutron_agent:main
neutron-openvswitch-agent = neutron.cmd.eventlet.plugins.ovs_neutron_agent:main
neutron-ovs-cleanup = neutron.cmd.ovs_cleanup:main
neutron-restproxy-agent = neutron.plugins.bigswitch.agent.restproxy_agent:main
neutron-server = neutron.cmd.eventlet.server:main
neutron-rootwrap = oslo_rootwrap.cmd:main
neutron-rootwrap-daemon = oslo_rootwrap.cmd:daemon
neutron-usage-audit = neutron.cmd.usage_audit:main
neutron-metering-agent = neutron.cmd.eventlet.services.metering_agent:main
neutron-sriov-nic-agent = neutron.plugins.sriovnicagent.sriov_nic_agent:main
neutron-sanity-check = neutron.cmd.sanity_check:main
neutron-cisco-apic-service-agent = neutron.plugins.ml2.drivers.cisco.apic.apic_topology:service_main
neutron-cisco-apic-host-agent = neutron.plugins.ml2.drivers.cisco.apic.apic_topology:agent_main
neutron.openstack.common.notifier.rpc_notifier2 = oslo_messaging.notify._impl_messaging:MessagingV2Driver
neutron.openstack.common.notifier.rpc_notifier = oslo_messaging.notify._impl_messaging:MessagingDriver
neutron.openstack.common.notifier.test_notifier = oslo_messaging.notify._impl_test:TestDriver
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py
def main():
common_config.init(sys.argv[1:])
common_config.setup_logging()
try:
interface_mappings = q_utils.parse_mappings(
cfg.CONF.LINUX_BRIDGE.physical_interface_mappings)
except ValueError as e:
LOG.error(_LE("Parsing physical_interface_mappings failed: %s. "
"Agent terminated!"), e)
sys.exit(1)
LOG.info(_LI("Interface mappings: %s"), interface_mappings)
polling_interval = cfg.CONF.AGENT.polling_interval
agent = LinuxBridgeNeutronAgentRPC(interface_mappings,
polling_interval)
LOG.info(_LI("Agent initialized successfully, now running... "))
agent.daemon_loop()
sys.exit(0)
if __name__ == "__main__":
main()
這裡首先解析/etc/neutron/plugins/linuxbridge/linuxbridge_conf.ini配置文件中的physical_interface_mappings參數。我的OpenStack環境設置的該參數如下。
physical_interface_mappings=physnet1:eth1,physnet2:eth2
這裡physnet1用作數據網絡,physnet2用作外部網絡。
#/neutron/common/utils.py
def parse_mappings(mapping_list, unique_values=True):
"""Parse a list of mapping strings into a dictionary.
:param mapping_list: a list of strings of the form '<key>:<value>'
:param unique_values: values must be unique if True
:returns: a dict mapping keys to values
"""
mappings = {}
for mapping in mapping_list:
mapping = mapping.strip()
if not mapping:
continue
split_result = mapping.split(':')
if len(split_result) != 2:
raise ValueError(_("Invalid mapping: '%s'") % mapping)
key = split_result[0].strip()
if not key:
raise ValueError(_("Missing key in mapping: '%s'") % mapping)
value = split_result[1].strip()
if not value:
raise ValueError(_("Missing value in mapping: '%s'") % mapping)
if key in mappings:
raise ValueError(_("Key %(key)s in mapping: '%(mapping)s' not "
"unique") % {'key': key, 'mapping': mapping})
if unique_values and value in mappings.itervalues():
raise ValueError(_("Value %(value)s in mapping: '%(mapping)s' "
"not unique") % {'value': value,
'mapping': mapping})
mappings[key] = value
return mappings
最終返回interface_mappings是一個字典,其中key為physnet1和physnet2,相對應的value為eth1和eth2。
interface_mappings = {‘physnet1’: ‘eth1’, ‘physnet2’: ‘eth2’}
繼續回到neutron-linuxbridge-agent的main函數。然後讀取/etc/neutron/plugins/linuxbridge/linuxbridge_conf.ini配置文件中的polling_interval參數,用於創建LinuxBridgeNeutronAgentRPC對象。
polling_interval = 2
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeNeutronAgentRPC
class LinuxBridgeNeutronAgentRPC(object):
def __init__(self, interface_mappings, polling_interval):
self.polling_interval = polling_interval
self.setup_linux_bridge(interface_mappings)
configurations = {'interface_mappings': interface_mappings}
if self.br_mgr.vxlan_mode != lconst.VXLAN_NONE:
configurations['tunneling_ip'] = self.br_mgr.local_ip
configurations['tunnel_types'] = [p_const.TYPE_VXLAN]
configurations['l2_population'] = cfg.CONF.VXLAN.l2_population
self.agent_state = {
'binary': 'neutron-linuxbridge-agent',
'host': cfg.CONF.host,
'topic': constants.L2_AGENT_TOPIC,
'configurations': configurations,
'agent_type': constants.AGENT_TYPE_LINUXBRIDGE,
'start_flag': True}
# stores received port_updates for processing by the main loop
self.updated_devices = set()
self.context = context.get_admin_context_without_session()
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
self.sg_plugin_rpc)
self.setup_rpc(interface_mappings.values())
self.setup_linux_bridge(interface_mappings)語句時創建一個LinuxBridgeManager對象。
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeNeutronAgentRPC
def setup_linux_bridge(self, interface_mappings):
self.br_mgr = LinuxBridgeManager(interface_mappings)
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeManager
class LinuxBridgeManager(object):
def __init__(self, interface_mappings):
self.interface_mappings = interface_mappings
self.ip = ip_lib.IPWrapper()
# VXLAN related parameters:
self.local_ip = cfg.CONF.VXLAN.local_ip
self.vxlan_mode = lconst.VXLAN_NONE
if cfg.CONF.VXLAN.enable_vxlan:
self.local_int = self.get_interface_by_ip(self.local_ip)
if self.local_int:
self.check_vxlan_support()
else:
LOG.warning(_LW('VXLAN is enabled, a valid local_ip '
'must be provided'))
# Store network mapping to segments
self.network_map = {}
這裡/etc/neutron/plugins/linuxbridge/linuxbridge_conf.ini配置文件中的enable_vxlan參數值為False。且LinuxBridgeManager對象的vxlan_mode = lconst.VXLAN_NONE(即'not_supported')。
繼續回到LinuxBridgeNeutronAgentRPC類的初始化函數,後續的操作便是創建RPC-client和RPC-server,對於RPC-server(client)所對應的另一端RPC-client(server)請查看《neutron-server的啟動流程(二)》,這裡簡要分析self.setup_rpc(interface_mappings.values())語句執行流程。
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeNeutronAgentRPC
def setup_rpc(self, physical_interfaces):
if physical_interfaces:
mac = utils.get_interface_mac(physical_interfaces[0])
else:
devices = ip_lib.IPWrapper().get_devices(True)
if devices:
mac = utils.get_interface_mac(devices[0].name)
else:
LOG.error(_LE("Unable to obtain MAC address for unique ID. "
"Agent terminated!"))
exit(1)
self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
LOG.info(_LI("RPC agent_id: %s"), self.agent_id)
self.topic = topics.AGENT
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
# Handle updates from service
self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self,
self.sg_agent)]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE]]
if cfg.CONF.VXLAN.l2_population:
consumers.append([topics.L2POPULATION,
topics.UPDATE, cfg.CONF.host])
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
這裡涉及到neutron-linuxbridge-agent服務的一個RPC-server的創建和一個上報neutron-linuxbridge-agent服務狀態(_report_state)的RPC-client的創建。
其中neutron-linuxbridge-agent服務的一個RPC-server的創建如下。
#/neutron/agent/rpc.py
def create_consumers(endpoints, prefix, topic_details, start_listening=True):
"""Create agent RPC consumers.
:param endpoints: The list of endpoints to process the incoming messages.
:param prefix: Common prefix for the plugin/agent message queues.
:param topic_details: A list of topics. Each topic has a name, an
operation, and an optional host param keying the
subscription to topic.host for plugin calls.
:param start_listening: if True, it starts the processing loop
:returns: A common Connection.
"""
connection = n_rpc.create_connection(new=True)
for details in topic_details:
topic, operation, node_name = itertools.islice(
itertools.chain(details, [None]), 3)
topic_name = topics.get_topic_name(prefix, topic, operation)
connection.create_consumer(topic_name, endpoints, fanout=True)
if node_name:
node_topic_name = '%s.%s' % (topic_name, node_name)
connection.create_consumer(node_topic_name,
endpoints,
fanout=False)
if start_listening:
connection.consume_in_threads()
return connection
#/neutron/common/rpc.py:Connection
class Connection(object):
def __init__(self):
super(Connection, self).__init__()
self.servers = []
def create_consumer(self, topic, endpoints, fanout=False):
target = oslo_messaging.Target(
topic=topic, server=cfg.CONF.host, fanout=fanout)
server = get_server(target, endpoints)
self.servers.append(server)
def consume_in_threads(self):
for server in self.servers:
server.start()
return self.servers
def close(self):
for server in self.servers:
server.stop()
for server in self.servers:
server.wait()
這裡注意一個python語法。
topic, operation, node_name = itertools.islice(
itertools.chain(details, [None]), 3)
我們知道itertools.islice返回的是一個迭代器類型,需執行next方法或隱式執行next方法(如for)才能獲取所需的信息。而這裡直接就獲取到我們所需的信息了,是否有錯?
這裡沒有錯誤,在python語法中,實際上不僅僅只是元祖和列表,只要對象是可迭代的,那麼就可以執行分解操作(這裡就是分解操作),這包括字符串、文件、迭代器以及生成器。
這裡/neutron/agent/rpc.py的create_consumers函數創建/neutron/common/rpc.py:Connection對象,調用Connection類的create_consumer函數創建RPC-server,通過consume_in_threads函數開啟RPC-server等到RPC-client的請求。
上報neutron-linuxbridge-agent服務狀態(_report_state)的RPC-client的創建如下。
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeNeutronAgentRPC
def _report_state(self):
try:
devices = len(self.br_mgr.get_tap_devices())
self.agent_state.get('configurations')['devices'] = devices
self.state_rpc.report_state(self.context,
self.agent_state)
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception(_LE("Failed reporting state!"))
其中上報狀態的時間間隔為30s(/etc/neutron/neutron.conf配置文件的report_interval參數值)。對於循環周期執行_report_state函數的函數為loopingcall.FixedIntervalLoopingCall,具體如何執行請參考《nova-computePeriodic
tasks 機制》。
目前LinuxBridgeNeutronAgentRPC對象的創建便分析完成。下面便是執行LinuxBridgeNeutronAgentRPC類的daemon_loop函數去循環(時間間隔polling_interval為2s)檢測tap的增加或刪除。
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeNeutronAgentRPC
def daemon_loop(self):
LOG.info(_LI("LinuxBridge Agent RPC Daemon Started!"))
device_info = None
sync = True
while True:
start = time.time()
device_info = self.scan_devices(previous=device_info, sync=sync)
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
sync = False
if self._device_info_has_changes(device_info):
LOG.debug("Agent loop found changes! %s", device_info)
try:
sync = self.process_network_devices(device_info)
except Exception:
LOG.exception(_LE("Error in agent loop. Devices info: %s"),
device_info)
sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)
if (elapsed < self.polling_interval):
time.sleep(self.polling_interval - elapsed)
else:
LOG.debug("Loop iteration exceeded interval "
"(%(polling_interval)s vs. %(elapsed)s)!",
{'polling_interval': self.polling_interval,
'elapsed': elapsed})
首先是調用scan_devices函數來掃描host上的tap設備。
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeNeutronAgentRPC
def scan_devices(self, previous, sync):
device_info = {}
# Save and reinitialise the set variable that the port_update RPC uses.
# This should be thread-safe as the greenthread should not yield
# between these two statements.
updated_devices = self.updated_devices
self.updated_devices = set()
current_devices = self.br_mgr.get_tap_devices()
device_info['current'] = current_devices
if previous is None:
# This is the first iteration of daemon_loop().
previous = {'added': set(),
'current': set(),
'updated': set(),
'removed': set()}
if sync:
# This is the first iteration, or the previous one had a problem.
# Re-add all existing devices.
device_info['added'] = current_devices
# Retry cleaning devices that may not have been cleaned properly.
# And clean any that disappeared since the previous iteration.
device_info['removed'] = (previous['removed'] | previous['current']
- current_devices)
# Retry updating devices that may not have been updated properly.
# And any that were updated since the previous iteration.
# Only update devices that currently exist.
device_info['updated'] = (previous['updated'] | updated_devices
& current_devices)
else:
device_info['added'] = current_devices - previous['current']
device_info['removed'] = previous['current'] - current_devices
device_info['updated'] = updated_devices & current_devices
return device_info
將掃描的tap設備信息與上一次掃描的結果做比對,然記錄下對比結果返回。其中它是怎麼去掃描的呢?
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeManager
def get_tap_devices(self):
devices = set()
for device in os.listdir(BRIDGE_FS):
if device.startswith(constants.TAP_DEVICE_PREFIX):
devices.add(device)
return devices
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py
BRIDGE_FS = "/sys/class/net/"
#/neutron/common/constants.py
# Device names start with "tap"
TAP_DEVICE_PREFIX = 'tap'
這裡是在nova-compute服務所在host的/sys/class/net/目錄下去查找以‘tap’開頭的設備。如
[root@jun2 net]# ls
brq8165bc3d-40 eth0 eth1 eth1.120 eth2 lo tap712a2c63-e6 tap83e7c095-f0 tap8f4fcfbb-2b
此時調用get_tap_devices函數將獲取最後3個以tap開頭的設備。
這裡我們關注的是創建VM時,tap設備增加的情況,所以這裡neutron-linuxbridge-agent服務將檢測到device發生變化,所以處理device設備。
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeNeutronAgentRPC
def process_network_devices(self, device_info):
resync_a = False
resync_b = False
self.sg_agent.prepare_devices_filter(device_info.get('added'))
if device_info.get('updated'):
self.sg_agent.refresh_firewall()
# Updated devices are processed the same as new ones, as their
# admin_state_up may have changed. The set union prevents duplicating
# work when a device is new and updated in the same polling iteration.
devices_added_updated = (set(device_info.get('added'))
| set(device_info.get('updated')))
if devices_added_updated:
resync_a = self.treat_devices_added_updated(devices_added_updated)
if device_info.get('removed'):
resync_b = self.treat_devices_removed(device_info['removed'])
# If one of the above operations fails => resync with plugin
return (resync_a | resync_b)
在process_network_devices函數中,首先為新增的device增加security group rule,對於rule的制定的分析,我們在後續文章進行分析。因為本文分析的情況是device有增加,所以執行下面的函數。
#/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py:LinuxBridgeNeutronAgentRPC
def treat_devices_added_updated(self, devices):
try:
devices_details_list = self.plugin_rpc.get_devices_details_list(
self.context, devices, self.agent_id)
except Exception as e:
LOG.debug("Unable to get port details for "
"%(devices)s: %(e)s",
{'devices': devices, 'e': e})
# resync is needed
return True
for device_details in devices_details_list:
device = device_details['device']
LOG.debug("Port %s added", device)
if 'port_id' in device_details:
LOG.info(_LI("Port %(device)s updated. Details: %(details)s"),
{'device': device, 'details': device_details})
if device_details['admin_state_up']:
# create the networking for the port
network_type = device_details.get('network_type')
if network_type:
segmentation_id = device_details.get('segmentation_id')
else:
# compatibility with pre-Havana RPC vlan_id encoding
vlan_id = device_details.get('vlan_id')
(network_type,
segmentation_id) = lconst.interpret_vlan_id(vlan_id)
if self.br_mgr.add_interface(
device_details['network_id'],
network_type,
device_details['physical_network'],
segmentation_id,
device_details['port_id']):
# update plugin about port status
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id,
cfg.CONF.host)
else:
self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id,
cfg.CONF.host)
else:
self.remove_port_binding(device_details['network_id'],
device_details['port_id'])
else:
LOG.info(_LI("Device %s not defined on plugin"), device)
return False
經測試發現,創建VM時,neutron數據庫中ports表的status字段將由DOWN->BUILD->ACTIVE。其中nova與neutron交互創建ports信息時,其狀態為DOWN,當在treat_devices_added_updated函數中調用get_devices_details_list函數將由DOWN更新到BUILD狀態,最後在treat_devices_added_updated函數中調用update_device_up函數由BUILD更新到ACTIVE狀態。
對於更新neutron數據庫中的ports表的status狀態都是調用update_port_status函數。
#/neutron/plugins/ml2/plugin.py:Ml2Plugin
def update_port_status(self, context, port_id, status, host=None):
"""
Returns port_id (non-truncated uuid) if the port exists.
Otherwise returns None.
"""
updated = False
session = context.session
# REVISIT: Serialize this operation with a semaphore to
# prevent deadlock waiting to acquire a DB lock held by
# another thread in the same process, leading to 'lock wait
# timeout' errors.
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
port = db.get_port(session, port_id)
if not port:
LOG.warning(_LW("Port %(port)s updated up by agent not found"),
{'port': port_id})
return None
if (port.status != status and
port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE):
original_port = self._make_port_dict(port)
port.status = status
updated_port = self._make_port_dict(port)
network = self.get_network(context,
original_port['network_id'])
levels = db.get_binding_levels(session, port.id,
port.port_binding.host)
mech_context = driver_context.PortContext(
self, context, updated_port, network, port.port_binding,
levels, original_port=original_port)
self.mechanism_manager.update_port_precommit(mech_context)
updated = True
elif port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
binding = db.get_dvr_port_binding_by_host(
session, port['id'], host)
if not binding:
return
binding['status'] = status
binding.update(binding)
updated = True
if (updated and
port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE):
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
port = db.get_port(session, port_id)
if not port:
LOG.warning(_LW("Port %s not found during update"),
port_id)
return
original_port = self._make_port_dict(port)
network = self.get_network(context,
original_port['network_id'])
port.status = db.generate_dvr_port_status(session, port['id'])
updated_port = self._make_port_dict(port)
levels = db.get_binding_levels(session, port_id, host)
mech_context = (driver_context.PortContext(
self, context, updated_port, network,
binding, levels, original_port=original_port))
self.mechanism_manager.update_port_precommit(mech_context)
if updated:
self.mechanism_manager.update_port_postcommit(mech_context)
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
db.delete_dvr_port_binding_if_stale(session, binding)
return port['id']
這裡我創建VM時,增加的tap設備信息如下。
Port tap93121330-58 updated. Details: {u'profile': {}, u'allowed_address_pairs': [], u'admin_state_up': True, u'network_id': u'8165bc3d-400a-48a0-9186-bf59f7f94b05', u'segmentation_id': 120,u'device_owner': u'compute:nova',
u'physical_network': u'physnet1', u'mac_address': u'fa:16:3e:9f:6f:c5', u'device': u'tap93121330-58', u'port_security_enabled': True, u'port_id': u'93121330-58', u'fixed_ips': [{u'subnet_id': u'ec1028b2-7cb0-4feb-b974-6b8ea7e7f08f', u'ip_address': u'172.16.0.7'}],
u'network_type': u'vlan'}
不過我查詢很久,也未發現更新neutron數據庫的ports的status的函數接口,當時本以為update_port_precommit會做相應的處理,但發現linuxbridge並未重新改函數,而是繼承父類未做任何操作的update_port_precommit函數。不過測試發現的確是在update_port_status函數去更新neutron數據庫的ports的status的,所以希望了解的大神告知,謝謝。
下面我們看看創建Ml2Plugin對象(這裡neutron-linuxbridge-agent通過rpc調用,在執行neutron-server上的函數)時,在哪裡創建的監聽neutron數據庫的ports表的呢?
#/neutron/db/db_base_plugin_v2.py:NeutronDbPluginV2
class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
common_db_mixin.CommonDbMixin):
"""V2 Neutron plugin interface implementation using SQLAlchemy models.
Whenever a non-read call happens the plugin will call an event handler
class method (e.g., network_created()). The result is that this class
can be sub-classed by other classes that add custom behaviors on certain
events.
"""
# This attribute specifies whether the plugin supports or not
# bulk/pagination/sorting operations. Name mangling is used in
# order to ensure it is qualified by class
__native_bulk_support = True
__native_pagination_support = True
__native_sorting_support = True
def __init__(self):
if cfg.CONF.notify_nova_on_port_status_changes:
from neutron.notifiers import nova
# NOTE(arosen) These event listeners are here to hook into when
# port status changes and notify nova about their change.
self.nova_notifier = nova.Notifier()
event.listen(models_v2.Port, 'after_insert',
self.nova_notifier.send_port_status)
event.listen(models_v2.Port, 'after_update',
self.nova_notifier.send_port_status)
event.listen(models_v2.Port.status, 'set',
self.nova_notifier.record_port_status_changed)
NeutronDbPluginV2類是Ml2Plugin類的父類,所以在創建Ml2Plugin對象時,將初始化NeutronDbPluginV2類。而NeutronDbPluginV2類則在初始化時根據/etc/neutron/neutron.conf配置文件中的notify_nova_on_port_status_changes參數值去決定是否通知nova。
首先,當neutron數據庫的ports表的status被設置時,如BUILD->ACTIVE,將觸發調用self.nova_notifier.record_port_status_changed函數。
#/neutron/notifiers/nova.py:Notifier
def record_port_status_changed(self, port, current_port_status,
previous_port_status, initiator):
"""Determine if nova needs to be notified due to port status change.
"""
# clear out previous _notify_event
port._notify_event = None
# If there is no device_id set there is nothing we can do here.
if not port.device_id:
LOG.debug("device_id is not set on port yet.")
return
if not port.id:
LOG.warning(_LW("Port ID not set! Nova will not be notified of "
"port status change."))
return
# We only want to notify about nova ports.
if not self._is_compute_port(port):
return
# We notify nova when a vif is unplugged which only occurs when
# the status goes from ACTIVE to DOWN.
if (previous_port_status == constants.PORT_STATUS_ACTIVE and
current_port_status == constants.PORT_STATUS_DOWN):
event_name = VIF_UNPLUGGED
# We only notify nova when a vif is plugged which only occurs
# when the status goes from:
# NO_VALUE/DOWN/BUILD -> ACTIVE/ERROR.
elif (previous_port_status in [sql_attr.NO_VALUE,
constants.PORT_STATUS_DOWN,
constants.PORT_STATUS_BUILD]
and current_port_status in [constants.PORT_STATUS_ACTIVE,
constants.PORT_STATUS_ERROR]):
event_name = VIF_PLUGGED
# All the remaining state transitions are of no interest to nova
else:
LOG.debug("Ignoring state change previous_port_status: "
"%(pre_status)s current_port_status: %(cur_status)s"
" port_id %(id)s",
{'pre_status': previous_port_status,
'cur_status': current_port_status,
'id': port.id})
return
port._notify_event = (
{'server_uuid': port.device_id,
'name': event_name,
'status': NEUTRON_NOVA_EVENT_STATUS_MAP.get(current_port_status),
'tag': port.id})
record_port_status_changed函數將記錄發送給nova的event事件,這裡只是記錄event事件,並未真正發送該event事件給nova。
不過record_port_status_changed函數只會將兩類event事件通知給nova:
1. ports表的status由ACTIVE->DOWN.
2. ports表的status由NO_VALUE/DOWN/BUILD -> ACTIVE/ERROR
所以在創建VM時,neutron-linuxbridge-agent會將neutron數據庫的ports表的status由DOWN->BUILD->ACTIVE。但最終通知nova的event事件是由BUILD->ACTIVE觸發的。而該event事件為:
VIF_PLUGGED = 'network-
vif-plugged'
在neutron數據庫的ports表更新完成後(after_update),則真正發送event事件給nova是執行self.nova_notifier.send_port_status函數完成的。
#/neutron/notifiers/nova.py:Notifier
def send_port_status(self, mapper, connection, port):
event = getattr(port, "_notify_event", None)
self.batch_notifier.queue_event(event)
port._notify_event = None
send_port_status函數中event = getattr(port, "_notify_event",None)為:
port._notify_event = (
{'server_uuid': port.device_id,
'name': event_name,
'status': NEUTRON_NOVA_EVENT_STATUS_MAP.get(current_port_status),
'tag': port.id})
NEUTRON_NOVA_EVENT_STATUS_MAP = {constants.PORT_STATUS_ACTIVE: 'completed',
constants.PORT_STATUS_ERROR: 'failed',
constants.PORT_STATUS_DOWN: 'completed'}
由於ports表的status由BUILD->ACTIVE,所以port._notify_event中的status為completed。
同時send_port_status函數中的self.batch_notifier是在/neutron/notifiers/nova.py的Notifier類初始化的時候創建的一個BatchNotifier對象,如下。
#/neutron/notifiers/nova.py:Notifier
class Notifier(object):
def __init__(self):
# FIXME(jamielennox): A notifier is being created for each Controller
# and each Notifier is handling it's own auth. That means that we are
# authenticating the exact same thing len(controllers) times. This
# should be an easy thing to optimize.
auth = ks_auth.load_from_conf_options(cfg.CONF, 'nova')
endpoint_override = None
if not auth:
LOG.warning(_LW('Authenticating to nova using nova_admin_* options'
' is deprecated. This should be done using'
' an auth plugin, like password'))
if cfg.CONF.nova_admin_tenant_id:
endpoint_override = "%s/%s" % (cfg.CONF.nova_url,
cfg.CONF.nova_admin_tenant_id)
auth = DefaultAuthPlugin(
auth_url=cfg.CONF.nova_admin_auth_url,
username=cfg.CONF.nova_admin_username,
password=cfg.CONF.nova_admin_password,
tenant_id=cfg.CONF.nova_admin_tenant_id,
tenant_name=cfg.CONF.nova_admin_tenant_name,
endpoint_override=endpoint_override)
session = ks_session.Session.load_from_conf_options(cfg.CONF,
'nova',
auth=auth)
# NOTE(andreykurilin): novaclient.v1_1 was renamed to v2 and there is
# no way to import the contrib module directly without referencing v2,
# which would only work for novaclient >= 2.21.0.
novaclient_cls = nova_client.get_client_class(NOVA_API_VERSION)
server_external_events = importutils.import_module(
novaclient_cls.__module__.replace(
".client", ".contrib.server_external_events"))
self.nclient = novaclient_cls(
session=session,
region_name=cfg.CONF.nova.region_name,
extensions=[server_external_events])
self.batch_notifier = batch_notifier.BatchNotifier(
cfg.CONF.send_events_interval, self.send_events)
其中send_events_interval為/etc/neutron/neutron.conf配置文件中的send_events_interval參數值。如下
send_events_interval = 2
#/neutron/notifiers/batch_notifier.py:BatchNotifier
class BatchNotifier(object):
def __init__(self, batch_interval, callback):
self.pending_events = []
self._waiting_to_send = False
self.callback = callback
self.batch_interval = batch_interval
def queue_event(self, event):
"""Called to queue sending an event with the next batch of events.
Sending events individually, as they occur, has been problematic as it
can result in a flood of sends. Previously, there was a loopingcall
thread that would send batched events on a periodic interval. However,
maintaining a persistent thread in the loopingcall was also
problematic.
This replaces the loopingcall with a mechanism that creates a
short-lived thread on demand when the first event is queued. That
thread will sleep once for the same batch_duration to allow other
events to queue up in pending_events and then will send them when it
wakes.
If a thread is already alive and waiting, this call will simply queue
the event and return leaving it up to the thread to send it.
:param event: the event that occurred.
"""
if not event:
return
self.pending_events.append(event)
if self._waiting_to_send:
return
self._waiting_to_send = True
def last_out_sends():
eventlet.sleep(self.batch_interval)
self._waiting_to_send = False
self._notify()
eventlet.spawn_n(last_out_sends)
def _notify(self):
if not self.pending_events:
return
batched_events = self.pending_events
self.pending_events = []
self.callback(batched_events)
所以當send_port_status函數執行self.batch_notifier.queue_event(event)將調用BatchNotifier類的queue_event函數,queue_event函數將開啟一個新的線程去執行last_out_sends函數,最終調用BatchNotifier類初始化時指定的callback函數。這裡的callback函數為/neutron/notifiers/nova.py的Notifier類的send_events函數。
#/neutron/notifiers/nova.py:Notifier
def send_events(self, batched_events):
LOG.debug("Sending events: %s", batched_events)
try:
response = self.nclient.server_external_events.create(
batched_events)
except nova_exceptions.NotFound:
LOG.warning(_LW("Nova returned NotFound for event: %s"),
batched_events)
except Exception:
LOG.exception(_LE("Failed to notify nova on events: %s"),
batched_events)
else:
if not isinstance(response, list):
LOG.error(_LE("Error response returned from nova: %s"),
response)
return
response_error = False
for event in response:
try:
code = event['code']
except KeyError:
response_error = True
continue
if code != 200:
LOG.warning(_LW("Nova event: %s returned with failed "
"status"), event)
else:
LOG.info(_LI("Nova event response: %s"), event)
if response_error:
LOG.error(_LE("Error response returned from nova: %s"),
response)
send_events函數將通過HTTP請求的方式將event事件發送給nova。
下面我們看看nova側如何處理neutron發送過來的event事件。
#/nova/api/openstack/compute/contrib/server_external_events.py:ServerExternalEventsController
class ServerExternalEventsController(wsgi.Controller):
def __init__(self):
self.compute_api = compute.API()
super(ServerExternalEventsController, self).__init__()
def create(self, req, body):
"""Creates a new instance event."""
context = req.environ['nova.context']
authorize(context, action='create')
response_events = []
accepted_events = []
accepted_instances = set()
instances = {}
result = 200
body_events = body.get('events', [])
if not isinstance(body_events, list) or not len(body_events):
raise webob.exc.HTTPBadRequest()
for _event in body_events:
client_event = dict(_event)
event = objects.InstanceExternalEvent(context)
try:
event.instance_uuid = client_event.pop('server_uuid')
event.name = client_event.pop('name')
event.status = client_event.pop('status', 'completed')
event.tag = client_event.pop('tag', None)
except KeyError as missing_key:
msg = _('event entity requires key %(key)s') % missing_key
raise webob.exc.HTTPBadRequest(explanation=msg)
if client_event:
msg = (_('event entity contains unsupported items: %s') %
', '.join(client_event.keys()))
raise webob.exc.HTTPBadRequest(explanation=msg)
if event.status not in external_event_obj.EVENT_STATUSES:
raise webob.exc.HTTPBadRequest(
_('Invalid event status `%s\'') % event.status)
instance = instances.get(event.instance_uuid)
if not instance:
try:
instance = objects.Instance.get_by_uuid(
context, event.instance_uuid)
instances[event.instance_uuid] = instance
except exception.InstanceNotFound:
LOG.debug('Dropping event %(name)s:%(tag)s for unknown '
'instance %(instance_uuid)s',
dict(event.iteritems()))
_event['status'] = 'failed'
_event['code'] = 404
result = 207
# NOTE: before accepting the event, make sure the instance
# for which the event is sent is assigned to a host; otherwise
# it will not be possible to dispatch the event
if instance:
if instance.host:
accepted_events.append(event)
accepted_instances.add(instance)
LOG.info(_LI('Creating event %(name)s:%(tag)s for '
'instance %(instance_uuid)s'),
dict(event.iteritems()))
# NOTE: as the event is processed asynchronously verify
# whether 202 is a more suitable response code than 200
_event['status'] = 'completed'
_event['code'] = 200
else:
LOG.debug("Unable to find a host for instance "
"%(instance)s. Dropping event %(event)s",
{'instance': event.instance_uuid,
'event': event.name})
_event['status'] = 'failed'
_event['code'] = 422
result = 207
response_events.append(_event)
if accepted_events:
self.compute_api.external_instance_event(
context, accepted_instances, accepted_events)
else:
msg = _('No instances found for any event')
raise webob.exc.HTTPNotFound(explanation=msg)
# FIXME(cyeoh): This needs some infrastructure support so that
# we have a general way to do this
robj = wsgi.ResponseObject({'events': response_events})
robj._code = result
return robj
通過nova-api的WSGI架構將neutron的HTTP請求路由到ServerExternalEventsController類的create函數。
最終將通過RPC調用執行nova-compute的external_instance_event函數。
#/nova/compute/api.py:API
def external_instance_event(self, context, instances, events):
# NOTE(danms): The external API consumer just provides events,
# but doesn't know where they go. We need to collate lists
# by the host the affected instance is on and dispatch them
# according to host
instances_by_host = {}
events_by_host = {}
hosts_by_instance = {}
for instance in instances:
instances_on_host = instances_by_host.get(instance.host, [])
instances_on_host.append(instance)
instances_by_host[instance.host] = instances_on_host
hosts_by_instance[instance.uuid] = instance.host
for event in events:
host = hosts_by_instance[event.instance_uuid]
events_on_host = events_by_host.get(host, [])
events_on_host.append(event)
events_by_host[host] = events_on_host
for host in instances_by_host:
# TODO(salv-orlando): Handle exceptions raised by the rpc api layer
# in order to ensure that a failure in processing events on a host
# will not prevent processing events on other hosts
self.compute_rpcapi.external_instance_event(
context, instances_by_host[host], events_by_host[host])
#/nova/compute/rpcapi.py:ComputeAPI
def external_instance_event(self, ctxt, instances, events):
cctxt = self.client.prepare(
server=_compute_host(None, instances[0]),
version=self._compat_ver('4.0', '3.23'))
cctxt.cast(ctxt, 'external_instance_event', instances=instances,
events=events)
#/nova/compute/manager.py:ComputeManager
@wrap_exception()
def external_instance_event(self, context, instances, events):
# NOTE(danms): Some event types are handled by the manager, such
# as when we're asked to update the instance's info_cache. If it's
# not one of those, look for some thread(s) waiting for the event and
# unblock them if so.
for event in events:
instance = [inst for inst in instances
if inst.uuid == event.instance_uuid][0]
LOG.debug('Received event %(event)s',
{'event': event.key},
instance=instance)
if event.name == 'network-changed':
self.network_api.get_instance_nw_info(context, instance)
else:
self._process_instance_event(instance, event)
neutron側發送過來的event事件的name為:
VIF_PLUGGED = 'network-
vif-plugged'
所以執行_process_instance_event函數。
#/nova/compute/manager.py:ComputeManager
def _process_instance_event(self, instance, event):
_event = self.instance_events.pop_instance_event(instance, event)
if _event:
LOG.debug('Processing event %(event)s',
{'event': event.key}, instance=instance)
_event.send(event)
這裡執行_event.send(event)代碼便使得wait_for_instance_event函數不再wait。我們具體可以分析一下wait_for_instance_event函數如何等待該event事件的。
#/nova/compute/manager.py:ComputeVirtAPI
@contextlib.contextmanager
def wait_for_instance_event(self, instance, event_names, deadline=300,
error_callback=None):
"""Plan to wait for some events, run some code, then wait.
This context manager will first create plans to wait for the
provided event_names, yield, and then wait for all the scheduled
events to complete.
Note that this uses an eventlet.timeout.Timeout to bound the
operation, so callers should be prepared to catch that
failure and handle that situation appropriately.
If the event is not received by the specified timeout deadline,
eventlet.timeout.Timeout is raised.
If the event is received but did not have a 'completed'
status, a NovaException is raised. If an error_callback is
provided, instead of raising an exception as detailed above
for the failure case, the callback will be called with the
event_name and instance, and can return True to continue
waiting for the rest of the events, False to stop processing,
or raise an exception which will bubble up to the waiter.
:param instance: The instance for which an event is expected
:param event_names: A list of event names. Each element can be a
string event name or tuple of strings to
indicate (name, tag).
:param deadline: Maximum number of seconds we should wait for all
of the specified events to arrive.
:param error_callback: A function to be called if an event arrives
"""
if error_callback is None:
error_callback = self._default_error_callback
events = {}
for event_name in event_names:
if isinstance(event_name, tuple):
name, tag = event_name
event_name = objects.InstanceExternalEvent.make_key(
name, tag)
try:
events[event_name] = (
self._compute.instance_events.prepare_for_instance_event(
instance, event_name))
except exception.NovaException:
error_callback(event_name, instance)
# NOTE(danms): Don't wait for any of the events. They
# should all be canceled and fired immediately below,
# but don't stick around if not.
deadline = 0
yield
with eventlet.timeout.Timeout(deadline):
for event_name, event in events.items():
actual_event = event.wait()
if actual_event.status == 'completed':
continue
decision = error_callback(event_name, instance)
if decision is False:
break
創建VM時,調用wait_for_instance_event函數傳遞進來的event_names是包含('network-vif-plugged', vif['id'])元祖的列表。然後執行objects.InstanceExternalEvent.make_key語句組件event_name為network-vif-plugged-vif['id']這種形式。
#/nova/objects/external_events.py:InstanceExternalEvent
# TODO(berrange): Remove NovaObjectDictCompat
class InstanceExternalEvent(obj_base.NovaObject,
obj_base.NovaObjectDictCompat):
# Version 1.0: Initial version
# Supports network-changed and vif-plugged
VERSION = '1.0'
fields = {
'instance_uuid': fields.UUIDField(),
'name': fields.StringField(),
'status': fields.StringField(),
'tag': fields.StringField(nullable=True),
'data': fields.DictOfStringsField(),
}
@staticmethod
def make_key(name, tag=None):
if tag is not None:
return '%s-%s' % (name, tag)
else:
return name
然後通過prepare_for_instance_event函數構建events字典。
#/nova/compute/manager.py:InstanceEvents
def prepare_for_instance_event(self, instance, event_name):
"""Prepare to receive an event for an instance.
This will register an event for the given instance that we will
wait on later. This should be called before initiating whatever
action will trigger the event. The resulting eventlet.event.Event
object should be wait()'d on to ensure completion.
:param instance: the instance for which the event will be generated
:param event_name: the name of the event we're expecting
:returns: an event object that should be wait()'d on
"""
if self._events is None:
# NOTE(danms): We really should have a more specific error
# here, but this is what we use for our default error case
raise exception.NovaException('In shutdown, no new events '
'can be scheduled')
@utils.synchronized(self._lock_name(instance))
def _create_or_get_event():
if instance.uuid not in self._events:
self._events.setdefault(instance.uuid, {})
return self._events[instance.uuid].setdefault(
event_name, eventlet.event.Event())
LOG.debug('Preparing to wait for external event %(event)s',
{'event': event_name}, instance=instance)
return _create_or_get_event()
最終prepare_for_instance_event函數構建的self._events為類似如下形式:
{
'uuid2': {'event_name1': 'event1'},
'uuid1': {'event_name1': 'event1', 'event_name2': 'event2'}
}
而prepare_for_instance_event函數返回為eventlet.event.Event()對象。
待events事件構建完成後,便將使用with語句執行wait_for_instance_event函數下的語句執行完畢後,再執行wait_for_instance_event函數yield之後的等待event事件語句。
所以當neutron側發送nova側等待的event事件,觸發nova側執行InstanceEvents類的pop_instance_event函數。
#/nova/compute/manager.py:InstanceEvents
def pop_instance_event(self, instance, event):
"""Remove a pending event from the wait list.
This will remove a pending event from the wait list so that it
can be used to signal the waiters to wake up.
:param instance: the instance for which the event was generated
:param event: the nova.objects.external_event.InstanceExternalEvent
that describes the event
:returns: the eventlet.event.Event object on which the waiters
are blocked
"""
no_events_sentinel = object()
no_matching_event_sentinel = object()
@utils.synchronized(self._lock_name(instance))
def _pop_event():
if not self._events:
LOG.debug('Unexpected attempt to pop events during shutdown',
instance=instance)
return no_events_sentinel
events = self._events.get(instance.uuid)
if not events:
return no_events_sentinel
_event = events.pop(event.key, None)
if not events:
del self._events[instance.uuid]
if _event is None:
return no_matching_event_sentinel
return _event
result = _pop_event()
if result is no_events_sentinel:
LOG.debug('No waiting events found dispatching %(event)s',
{'event': event.key},
instance=instance)
return None
elif result is no_matching_event_sentinel:
LOG.debug('No event matching %(event)s in %(events)s',
{'event': event.key,
'events': self._events.get(instance.uuid, {}).keys()},
instance=instance)
return None
else:
return result
待pop_instance_event函數獲取到event後,將其返回給_process_instance_event函數,然後執行_event.send(event)語句,使得wait_for_instance_event函數yield之後的actual_event =event.wait()語句不再等待,且獲取到neutron側發送過來的event事件。
這樣nova側等待neutron側的event事件機制(nova-event-callback機制)便分析完畢。再次總結一下:
1.創建VM時, nova-compute服務調用wait_for_instance_event函數等待neutron側發送event事件。
2. neutron的neutron-linuxbridge-agent定時檢測tap設備的增加或刪除,當創建VM時,將創建新的tap設備,此時將更新neutron數據庫中的ports表,而neutron-server服務創建core_plugin時,將利用sqlalchemy自帶的event對neutron數據庫中的ports表進行監視,當ports表發生變化時,neutron-server將通過HTTP請求的方式發送event事件給nova。
3. nova側收到neutron側發送的event事件,便結束等待,繼續創建VM下面的操作。