openstack-neutron/0001-add-distributed-traffic-feature-support.patch
2023-10-27 10:50:02 +08:00

2584 lines
110 KiB
Diff

Author: wangkuntian <wangkuntian@uniontech.com>
Date: Fri Oct 13 16:25:17 2023 +0800
feat: add distributed traffic feature
---
agent/l3/dvr_edge_ha_router.py | 4 +-
agent/l3/extensions/rg_port_forwarding.py | 398 ++++++++++++++++++++++++++++
agent/l3/ha.py | 9 -
agent/l3/ha_router.py | 74 +++---
agent/l3/keepalived_state_change.py | 12 +-
agent/l3/router_info.py | 19 +-
agent/linux/dhcp.py | 137 +++++++++-
agent/linux/interface.py | 43 +--
agent/linux/keepalived.py | 1 +
api/rpc/agentnotifiers/dhcp_rpc_agent_api.py | 9 +
api/rpc/callbacks/resources.py | 3 +
conf/agent/l3/keepalived.py | 2 +
conf/common.py | 16 +-
conf/policies/__init__.py | 2 +
conf/policies/rg_port_forwarding.py | 76 ++++++
db/l3_attrs_db.py | 7 +-
db/l3_db.py | 27 +-
db/l3_hamode_db.py | 97 +++++++
db/migration/alembic_migrations/versions/EXPAND_HEAD | 2 +-
.../train/expand/1c19a98b5eef_add_router_configurations.py | 36 +++
.../expand/cab12b72ed90_add_router_gateway_port_forwarding.py | 55 ++++
db/models/l3_attrs.py | 2 +
db/models/rg_port_forwarding.py | 59 +++++
extensions/rg_port_forwarding.py | 119 +++++++++
objects/rg_port_forwarding.py | 87 ++++++
objects/router.py | 15 +-
scheduler/l3_agent_scheduler.py | 107 ++++++--
services/l3_router/l3_router_plugin.py | 12 +
services/rg_portforwarding/__init__.py | 0
services/rg_portforwarding/common/__init__.py | 0
services/rg_portforwarding/common/exceptions.py | 77 ++++++
services/rg_portforwarding/pf_plugin.py | 369 ++++++++++++++++++++++++++
32 files changed, 1749 insertions(+), 127 deletions(-)
diff --git a/agent/l3/dvr_edge_ha_router.py b/agent/l3/dvr_edge_ha_router.py
index 71f740bef9..b92f70b70f 100644
--- a/agent/l3/dvr_edge_ha_router.py
+++ b/agent/l3/dvr_edge_ha_router.py
@@ -114,9 +114,7 @@ class DvrEdgeHaRouter(dvr_edge_router.DvrEdgeRouter,
def _external_gateway_added(self, ex_gw_port, interface_name,
ns_name, preserve_ips):
- link_up = self.external_gateway_link_up()
- self._plug_external_gateway(ex_gw_port, interface_name, ns_name,
- link_up=link_up)
+ self._plug_external_gateway(ex_gw_port, interface_name, ns_name)
def _is_this_snat_host(self):
return self.agent_conf.agent_mode == constants.L3_AGENT_MODE_DVR_SNAT
diff --git a/agent/l3/extensions/rg_port_forwarding.py b/agent/l3/extensions/rg_port_forwarding.py
new file mode 100644
index 0000000000..a159e4df34
--- /dev/null
+++ b/agent/l3/extensions/rg_port_forwarding.py
@@ -0,0 +1,398 @@
+# Copyright (c) 2023 UnionTech
+# All rights reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+from typing import Optional, List
+from oslo_concurrency import lockutils
+from oslo_log import log as logging
+
+from neutron_lib import constants
+from neutron_lib.rpc import Connection
+from neutron_lib.context import Context
+from neutron_lib.agent import l3_extension
+
+from neutron.agent.linux.ip_lib import IPDevice
+from neutron.agent.l3.router_info import RouterInfo
+from neutron.agent.linux.iptables_manager import IptablesManager
+
+from neutron.api.rpc.handlers import resources_rpc
+from neutron.api.rpc.callbacks import resources, events
+from neutron.api.rpc.callbacks.consumer import registry
+
+from neutron.common import coordination
+
+from neutron.objects.ports import Port
+from neutron.objects.router import Router
+from neutron.objects.rg_port_forwarding import RGPortForwarding
+
+LOG = logging.getLogger(__name__)
+
+PORT_FORWARDING_PREFIX = 'rg_portforwarding-'
+DEFAULT_PORT_FORWARDING_CHAIN = 'rg-pf'
+PORT_FORWARDING_CHAIN_PREFIX = 'pf-'
+
+
+def _get_port_forwarding_chain_name(pf_id):
+ chain_name = PORT_FORWARDING_CHAIN_PREFIX + pf_id
+ return chain_name[:constants.MAX_IPTABLES_CHAIN_LEN_WRAP]
+
+
+class RGPortForwardingMapping(object):
+ def __init__(self):
+ self.managed_port_forwardings = {}
+ self.router_pf_mapping = collections.defaultdict(set)
+
+ @lockutils.synchronized('rg-port-forwarding-cache')
+ def check_port_forwarding_changes(self, new_pf: RGPortForwarding) -> bool:
+ old_pf = self.managed_port_forwardings.get(new_pf.id)
+ return old_pf != new_pf
+
+ @lockutils.synchronized('rg-port-forwarding-cache')
+ def set_port_forwardings(self, port_forwardings: List[RGPortForwarding]):
+ for port_forwarding in port_forwardings:
+ self._set_router_port_forwarding(port_forwarding,
+ port_forwarding.router_id)
+
+ def _set_router_port_forwarding(self,
+ port_forwarding: RGPortForwarding,
+ router_id: str):
+ self.router_pf_mapping[router_id].add(port_forwarding.id)
+ self.managed_port_forwardings[port_forwarding.id] = port_forwarding
+
+ @lockutils.synchronized('rg-port-forwarding-cache')
+ def update_port_forwardings(self, port_forwardings):
+ for port_forwarding in port_forwardings:
+ self.managed_port_forwardings[port_forwarding.id] = port_forwarding
+
+ @lockutils.synchronized('rg-port-forwarding-cache')
+ def del_port_forwardings(self, port_forwardings):
+ for port_forwarding in port_forwardings:
+ if not self.managed_port_forwardings.get(port_forwarding.id):
+ continue
+ self.managed_port_forwardings.pop(port_forwarding.id)
+ self.router_pf_mapping[port_forwarding.router_id].discard(
+ port_forwarding.id)
+ if not self.router_pf_mapping[port_forwarding.router_id]:
+ self.router_pf_mapping.pop(port_forwarding.router_id)
+
+ @lockutils.synchronized('rg-port-forwarding-cache')
+ def clean_port_forwardings_by_router_id(self, router_id: str):
+ pf_ids = self.router_pf_mapping.pop(router_id, [])
+ for pf_id in pf_ids:
+ self.managed_port_forwardings.pop(pf_id, None)
+
+
+class RGPortForwardingAgentExtension(l3_extension.L3AgentExtension):
+ SUPPORTED_RESOURCE_TYPES = [resources.RGPORTFORWARDING]
+
+ def consume_api(self, agent_api):
+ self.agent_api = agent_api
+
+ def initialize(self, connection, driver_type):
+ self.mapping = RGPortForwardingMapping()
+ self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
+ self._register_rpc_consumers()
+
+ def _register_rpc_consumers(self):
+ registry.register(self._handle_notification,
+ resources.RGPORTFORWARDING)
+ self._connection = Connection()
+ endpoints = [resources_rpc.ResourcesPushRpcCallback()]
+ topic = resources_rpc.resource_type_versioned_topic(
+ resources.RGPORTFORWARDING)
+ self._connection.create_consumer(topic, endpoints, fanout=True)
+ self._connection.consume_in_threads()
+
+ def _handle_notification(self, context: Context,
+ resource_type: str,
+ forwardings, event_type):
+ for forwarding in forwardings:
+ self._process_port_forwarding_event(
+ context, forwarding, event_type)
+
+ def _get_gw_port_and_ip(self,
+ ri: RouterInfo) -> (Optional[Port], Optional[str]):
+ ex_gw_port = ri.get_ex_gw_port()
+ ex_gw_port_ip = self._get_gw_port_ip(ex_gw_port)
+ if not ex_gw_port_ip:
+ LOG.error(f"Router {ri.router_id} external port "
+ f"{ex_gw_port['id']} does not have any IP addresses")
+ return None, None
+ return ex_gw_port, ex_gw_port_ip
+
+ def _process_port_forwarding_event(self, context: Context,
+ port_forwarding: RGPortForwarding,
+ event_type: str):
+ router_id = port_forwarding.router_id
+ ri = self._get_router_info(router_id)
+ if not self._check_if_need_process(ri, force=True):
+ return
+
+ ex_gw_port, ex_gw_port_ip = self._get_gw_port_and_ip(ri)
+ if not ex_gw_port or not ex_gw_port_ip:
+ return
+
+ (interface_name, namespace,
+ iptables_manager) = self._get_resource_by_router(ri, ex_gw_port)
+
+ if event_type == events.CREATED:
+ self._process_create([port_forwarding], ri, interface_name,
+ ex_gw_port_ip, namespace, iptables_manager)
+ elif event_type == events.UPDATED:
+ self._process_update([port_forwarding], interface_name,
+ ex_gw_port_ip, namespace, iptables_manager)
+ elif event_type == events.DELETED:
+ self._process_delete([port_forwarding], interface_name,
+ ex_gw_port_ip, namespace, iptables_manager)
+
+ def ha_state_change(self, context: Context, data: Router) -> None:
+ pass
+
+ def update_network(self, context: Context, data: dict) -> None:
+ pass
+
+ def add_router(self, context: Context, data: Router) -> None:
+ LOG.info(f"call add_router for {data['id']}")
+ self.process_port_forwarding(context, data)
+
+ def update_router(self, context: Context, data: Router) -> None:
+ LOG.info(f"call update_router for {data['id']}")
+ self.process_port_forwarding(context, data)
+
+ def delete_router(self, context: Context, data: Router) -> None:
+ self.mapping.clean_port_forwardings_by_router_id(data['id'])
+
+ def _get_router_info(self, router_id) -> Optional[RouterInfo]:
+ router_info = self.agent_api.get_router_info(router_id)
+ if router_info:
+ return router_info
+ LOG.debug("Router %s is not managed by this agent. "
+ "It was possibly deleted concurrently.", router_id)
+
+ @staticmethod
+ def _check_if_need_process(ri: RouterInfo, force: bool = False) -> bool:
+ if not ri or not ri.get_ex_gw_port():
+ return False
+
+ if force:
+ return True
+
+ is_distributed = ri.router.get('distributed')
+ agent_mode = ri.agent_conf.agent_mode
+ if (is_distributed and
+ agent_mode in [constants.L3_AGENT_MODE_DVR_NO_EXTERNAL,
+ constants.L3_AGENT_MODE_DVR]):
+ # just support centralized cases
+ return False
+
+ if is_distributed and not ri.snat_namespace.exists():
+ return False
+
+ return True
+
+ def process_port_forwarding(self, context: Context, data: Router):
+ ri = self._get_router_info(data['id'])
+ if not self._check_if_need_process(ri):
+ return
+ self.check_local_port_forwardings(context, ri)
+
+ @staticmethod
+ def _get_gw_port_ip(gw_port: dict) -> Optional[str]:
+ fixed_ips = gw_port.get('fixed_ips', [])
+ if not fixed_ips:
+ return
+ return fixed_ips[0].get('ip_address', None)
+
+ @staticmethod
+ def _get_resource_by_router(ri: RouterInfo, ex_gw_port: dict) -> (
+ str, str, IptablesManager):
+ is_distributed = ri.router.get('distributed')
+ if is_distributed:
+ interface_name = ri.get_snat_external_device_interface_name(
+ ex_gw_port)
+ namespace = ri.snat_namespace.name
+ iptables_manager = ri.snat_iptables_manager
+ else:
+ interface_name = ri.get_external_device_interface_name(ex_gw_port)
+ namespace = ri.ns_name
+ iptables_manager = ri.iptables_manager
+
+ return interface_name, namespace, iptables_manager
+
+ def check_local_port_forwardings(self, context: Context, ri: RouterInfo):
+ pfs = self.resource_rpc.bulk_pull(
+ context, resources.RGPORTFORWARDING,
+ filter_kwargs={'router_id': ri.router_id})
+ if not pfs:
+ return
+ ex_gw_port, ex_gw_port_ip = self._get_gw_port_and_ip(ri)
+ if not ex_gw_port_ip or not ex_gw_port_ip:
+ return
+ (interface_name, namespace,
+ iptables_manager) = self._get_resource_by_router(ri, ex_gw_port)
+ local_pfs = set(self.mapping.managed_port_forwardings.keys())
+ new_pfs = []
+ updated_pfs = []
+ current_pfs = set()
+ for pf in pfs:
+ if pf.id in self.mapping.managed_port_forwardings:
+ if self.mapping.check_port_forwarding_changes(pf):
+ updated_pfs.append(pf)
+ else:
+ new_pfs.append(pf)
+ current_pfs.add(pf.id)
+
+ remove_pf_ids_set = local_pfs - current_pfs
+ remove_pfs = [self.mapping.managed_port_forwardings[pf_id]
+ for pf_id in remove_pf_ids_set]
+
+ self._process_create(new_pfs, ri, interface_name, ex_gw_port_ip,
+ namespace, iptables_manager)
+
+ self._process_update(updated_pfs, interface_name, ex_gw_port_ip,
+ namespace, iptables_manager)
+
+ self._process_delete(remove_pfs, interface_name, ex_gw_port_ip,
+ namespace, iptables_manager)
+
+ @staticmethod
+ def _install_default_rules(iptables_manager: IptablesManager):
+ default_rule = '-j %s-%s' % (iptables_manager.wrap_name,
+ DEFAULT_PORT_FORWARDING_CHAIN)
+ LOG.info(f'Add default chain {DEFAULT_PORT_FORWARDING_CHAIN}')
+ LOG.info(f'Add default rule {default_rule}')
+ iptables_manager.ipv4['nat'].add_chain(DEFAULT_PORT_FORWARDING_CHAIN)
+ iptables_manager.ipv4['nat'].add_rule('PREROUTING', default_rule)
+ iptables_manager.apply()
+
+ @staticmethod
+ def _get_rg_rules(port_forward: RGPortForwarding, wrap_name: str):
+ chain_rule_list = []
+ pf_chain_name = _get_port_forwarding_chain_name(port_forward.id)
+ chain_rule_list.append(
+ (DEFAULT_PORT_FORWARDING_CHAIN, f'-j {wrap_name}-{pf_chain_name}'))
+ gw_ip_address = port_forward.gw_ip_address
+ protocol = port_forward.protocol
+ internal_ip_address = str(port_forward.internal_ip_address)
+ internal_port = port_forward.internal_port
+ external_port = port_forward.external_port
+ chain_rule = (
+ pf_chain_name,
+ f'-d {gw_ip_address}/32 -p {protocol} -m {protocol} '
+ f'--dport {external_port} '
+ f'-j DNAT --to-destination {internal_ip_address}:{internal_port}'
+ )
+ chain_rule_list.append(chain_rule)
+ return chain_rule_list
+
+ def _rule_apply(self,
+ iptables_manager: IptablesManager,
+ port_forwarding: RGPortForwarding,
+ rule_tag: str):
+ iptables_manager.ipv4['nat'].clear_rules_by_tag(rule_tag)
+ if (DEFAULT_PORT_FORWARDING_CHAIN not in
+ iptables_manager.ipv4['nat'].chains):
+ self._install_default_rules(iptables_manager)
+
+ for chain, rule in self._get_rg_rules(port_forwarding,
+ iptables_manager.wrap_name):
+ LOG.info(f'Add router gateway port forwarding '
+ f'rule {rule} in {chain}')
+ if chain not in iptables_manager.ipv4['nat'].chains:
+ iptables_manager.ipv4['nat'].add_chain(chain)
+ iptables_manager.ipv4['nat'].add_rule(chain, rule, tag=rule_tag)
+
+ def _store_local(self, pf_objs: List[RGPortForwarding], event_type: str):
+ if event_type == events.CREATED:
+ self.mapping.set_port_forwardings(pf_objs)
+ elif event_type == events.UPDATED:
+ self.mapping.update_port_forwardings(pf_objs)
+ elif event_type == events.DELETED:
+ self.mapping.del_port_forwardings(pf_objs)
+
+ def _process_create(self,
+ port_forwardings: List[RGPortForwarding],
+ ri: RouterInfo,
+ interface_name: str,
+ interface_ip: str,
+ namespace: str,
+ iptables_manager: IptablesManager):
+ if not port_forwardings:
+ return
+
+ ha_port = ri.router.get(constants.HA_INTERFACE_KEY, None)
+ if ha_port and ha_port['status'] == constants.PORT_STATUS_ACTIVE:
+ ri.enable_keepalived()
+
+ for port_forwarding in port_forwardings:
+ if port_forwarding.id in self.mapping.managed_port_forwardings:
+ LOG.debug("Skip port forwarding %s for create, as it had been "
+ "managed by agent", port_forwarding.id)
+ continue
+ rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id
+ port_forwarding.gw_ip_address = interface_ip
+ self._rule_apply(iptables_manager, port_forwarding, rule_tag)
+ iptables_manager.apply()
+ self._store_local(port_forwardings, events.CREATED)
+
+ def _process_update(self,
+ port_forwardings: List[RGPortForwarding],
+ interface_name: str,
+ interface_ip: str,
+ namespace: str,
+ iptables_manager: IptablesManager):
+ if not port_forwardings:
+ return
+ device = IPDevice(interface_name, namespace=namespace)
+ for port_forwarding in port_forwardings:
+ # check if port forwarding change from OVO and router rpc
+ if not self.mapping.check_port_forwarding_changes(port_forwarding):
+ LOG.debug("Skip port forwarding %s for update, as there is no "
+ "difference between the memory managed by agent",
+ port_forwarding.id)
+ continue
+ current_chain = _get_port_forwarding_chain_name(port_forwarding.id)
+ iptables_manager.ipv4['nat'].remove_chain(current_chain)
+ ori_pf = self.mapping.managed_port_forwardings[port_forwarding.id]
+ device.delete_socket_conntrack_state(interface_ip,
+ ori_pf.external_port,
+ protocol=ori_pf.protocol)
+ rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id
+ port_forwarding.gw_ip_address = interface_ip
+ self._rule_apply(iptables_manager, port_forwarding, rule_tag)
+ iptables_manager.apply()
+ self._store_local(port_forwardings, events.UPDATED)
+
+ @coordination.synchronized('router-lock-ns-{namespace}')
+ def _process_delete(self,
+ port_forwardings: List[RGPortForwarding],
+ interface_name: str,
+ interface_ip: str,
+ namespace: str,
+ iptables_manager: IptablesManager):
+ if not port_forwardings:
+ return
+ device = IPDevice(interface_name, namespace=namespace)
+ for port_forwarding in port_forwardings:
+ current_chain = _get_port_forwarding_chain_name(port_forwarding.id)
+ iptables_manager.ipv4['nat'].remove_chain(current_chain)
+ device.delete_socket_conntrack_state(
+ interface_ip,
+ port_forwarding.external_port,
+ protocol=port_forwarding.protocol)
+
+ iptables_manager.apply()
+
+ self._store_local(port_forwardings, events.DELETED)
diff --git a/agent/l3/ha.py b/agent/l3/ha.py
index 17891dc983..182fa68175 100644
--- a/agent/l3/ha.py
+++ b/agent/l3/ha.py
@@ -163,15 +163,6 @@ class AgentMixin(object):
'agent %(host)s',
state_change_data)
- # Set external gateway port link up or down according to state
- if state == 'master':
- ri.set_external_gw_port_link_status(link_up=True, set_gw=True)
- elif state == 'backup':
- ri.set_external_gw_port_link_status(link_up=False)
- else:
- LOG.warning('Router %s has status %s, '
- 'no action to router gateway device.',
- router_id, state)
# TODO(dalvarez): Fix bug 1677279 by moving the IPv6 parameters
# configuration to keepalived-state-change in order to remove the
# dependency that currently exists on l3-agent running for the IPv6
diff --git a/agent/l3/ha_router.py b/agent/l3/ha_router.py
index 0a21902771..ef10ff76e4 100644
--- a/agent/l3/ha_router.py
+++ b/agent/l3/ha_router.py
@@ -17,6 +17,7 @@ import shutil
import signal
import netaddr
+from typing import Optional, List
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants as n_consts
from neutron_lib.utils import runtime
@@ -137,6 +138,22 @@ class HaRouter(router.RouterInfo):
else:
return False
+ @property
+ def configurations(self) -> Optional[dict]:
+ return self.router.get('configurations', {})
+
+ @property
+ def master(self) -> Optional[str]:
+ if self.configurations:
+ return self.configurations.get('master_agent', None)
+ return None
+
+ @property
+ def slaves(self) -> Optional[List[str]]:
+ if self.configurations:
+ return self.configurations.get('slave_agents', [])
+ return []
+
def initialize(self, process_monitor):
ha_port = self.router.get(n_consts.HA_INTERFACE_KEY)
if not ha_port:
@@ -162,19 +179,32 @@ class HaRouter(router.RouterInfo):
throttle_restart_value=(
self.agent_conf.ha_vrrp_advert_int * THROTTLER_MULTIPLIER))
+ # The following call is required to ensure that if the state path does
+ # not exist it gets created.
+ self.keepalived_manager.get_full_config_file_path('test')
+
config = self.keepalived_manager.config
interface_name = self.get_ha_device_name()
subnets = self.ha_port.get('subnets', [])
ha_port_cidrs = [subnet['cidr'] for subnet in subnets]
+ nopreempt = True
+ state = 'BACKUP'
+ priority = self.ha_priority
+ if self.slaves and self.master:
+ nopreempt = False
+ if self.master == self.agent_conf.host:
+ state = 'MASTER'
+ priority = keepalived.HA_DEFAULT_MASTER_PRIORITY
+
instance = keepalived.KeepalivedInstance(
- 'BACKUP',
+ state,
interface_name,
self.ha_vr_id,
ha_port_cidrs,
- nopreempt=True,
+ nopreempt=nopreempt,
advert_int=self.agent_conf.ha_vrrp_advert_int,
- priority=self.ha_priority,
+ priority=priority,
vrrp_health_check_interval=(
self.agent_conf.ha_vrrp_health_check_interval),
ha_conf_dir=self.keepalived_manager.get_conf_dir())
@@ -396,13 +426,16 @@ class HaRouter(router.RouterInfo):
ha_device = self.get_ha_device_name()
ha_cidr = self._get_primary_vip()
config_dir = self.keepalived_manager.get_conf_dir()
- state_change_log = (
- "%s/neutron-keepalived-state-change.log") % config_dir
+ state_change_log = f"{config_dir}/neutron-keepalived-state-change.log"
def callback(pid_file):
+ LOG.info(f'Router: {self.router_id} master is {self.master}, '
+ f'salves are {self.slaves}.')
cmd = [
STATE_CHANGE_PROC_NAME,
'--router_id=%s' % self.router_id,
+ '--master_agent=%s' % self.master,
+ '--slave_agents=%s' % ','.join(self.slaves),
'--namespace=%s' % self.ha_namespace,
'--conf_dir=%s' % config_dir,
'--log-file=%s' % state_change_log,
@@ -453,9 +486,7 @@ class HaRouter(router.RouterInfo):
return port1_filtered == port2_filtered
def external_gateway_added(self, ex_gw_port, interface_name):
- link_up = self.external_gateway_link_up()
- self._plug_external_gateway(ex_gw_port, interface_name,
- self.ns_name, link_up=link_up)
+ self._plug_external_gateway(ex_gw_port, interface_name, self.ns_name)
self._add_gateway_vip(ex_gw_port, interface_name)
self._disable_ipv6_addressing_on_interface(interface_name)
@@ -519,30 +550,3 @@ class HaRouter(router.RouterInfo):
if (self.keepalived_manager.get_process().active and
self.ha_state == 'master'):
super(HaRouter, self).enable_radvd(internal_ports)
-
- def external_gateway_link_up(self):
- # Check HA router ha_state for its gateway port link state.
- # 'backup' instance will not link up the gateway port.
- return self.ha_state == 'master'
-
- def set_external_gw_port_link_status(self, link_up, set_gw=False):
- link_state = "up" if link_up else "down"
- LOG.info('Set router %s gateway device link state to %s.',
- self.router_id, link_state)
-
- ex_gw_port = self.get_ex_gw_port()
- ex_gw_port_id = (ex_gw_port and ex_gw_port['id'] or
- self.ex_gw_port and self.ex_gw_port['id'])
- if ex_gw_port_id:
- interface_name = self.get_external_device_name(ex_gw_port_id)
- ns_name = self.get_gw_ns_name()
- if (not self.driver.set_link_status(
- interface_name, namespace=ns_name, link_up=link_up) and
- link_up):
- LOG.error('Gateway interface for router %s was not set up; '
- 'router will not work properly', self.router_id)
- if link_up and set_gw:
- preserve_ips = self.get_router_preserve_ips()
- self._external_gateway_settings(ex_gw_port, interface_name,
- ns_name, preserve_ips)
- self.routes_updated([], self.routes)
diff --git a/agent/l3/keepalived_state_change.py b/agent/l3/keepalived_state_change.py
index 7fd9e4269e..8c10a8b00f 100644
--- a/agent/l3/keepalived_state_change.py
+++ b/agent/l3/keepalived_state_change.py
@@ -47,9 +47,12 @@ class KeepalivedUnixDomainConnection(agent_utils.UnixDomainHTTPConnection):
class MonitorDaemon(daemon.Daemon):
- def __init__(self, pidfile, router_id, user, group, namespace, conf_dir,
- interface, cidr):
+ def __init__(self, pidfile, host, router_id, master, slaves, user, group,
+ namespace, conf_dir, interface, cidr):
+ self.host = host
self.router_id = router_id
+ self.master = master
+ self.slaves = slaves
self.namespace = namespace
self.conf_dir = conf_dir
self.interface = interface
@@ -62,6 +65,8 @@ class MonitorDaemon(daemon.Daemon):
user=user, group=group)
def run(self):
+ LOG.debug(f'Router: {self.router_id} master is {self.master}, '
+ f'salves are {self.slaves}.')
self._thread_ip_monitor = threading.Thread(
target=ip_lib.ip_monitor,
args=(self.namespace, self.queue, self.event_stop,
@@ -169,7 +174,10 @@ def main():
keepalived.register_l3_agent_keepalived_opts()
configure(cfg.CONF)
MonitorDaemon(cfg.CONF.pid_file,
+ cfg.CONF.host,
cfg.CONF.router_id,
+ cfg.CONF.master_agent,
+ cfg.CONF.slave_agents,
cfg.CONF.user,
cfg.CONF.group,
cfg.CONF.namespace,
diff --git a/agent/l3/router_info.py b/agent/l3/router_info.py
index ea2b488cd2..eabdcc6e54 100644
--- a/agent/l3/router_info.py
+++ b/agent/l3/router_info.py
@@ -697,16 +697,14 @@ class RouterInfo(BaseRouterInfo):
return [common_utils.ip_to_cidr(ip['floating_ip_address'])
for ip in floating_ips]
- def _plug_external_gateway(self, ex_gw_port, interface_name, ns_name,
- link_up=True):
+ def _plug_external_gateway(self, ex_gw_port, interface_name, ns_name):
self.driver.plug(ex_gw_port['network_id'],
ex_gw_port['id'],
interface_name,
ex_gw_port['mac_address'],
namespace=ns_name,
prefix=EXTERNAL_DEV_PREFIX,
- mtu=ex_gw_port.get('mtu'),
- link_up=link_up)
+ mtu=ex_gw_port.get('mtu'))
def _get_external_gw_ips(self, ex_gw_port):
gateway_ips = []
@@ -766,11 +764,7 @@ class RouterInfo(BaseRouterInfo):
LOG.debug("External gateway added: port(%s), interface(%s), ns(%s)",
ex_gw_port, interface_name, ns_name)
self._plug_external_gateway(ex_gw_port, interface_name, ns_name)
- self._external_gateway_settings(ex_gw_port, interface_name,
- ns_name, preserve_ips)
- def _external_gateway_settings(self, ex_gw_port, interface_name,
- ns_name, preserve_ips):
# Build up the interface and gateway IP addresses that
# will be added to the interface.
ip_cidrs = common_utils.fixed_ip_cidrs(ex_gw_port['fixed_ips'])
@@ -815,19 +809,18 @@ class RouterInfo(BaseRouterInfo):
return any(netaddr.IPAddress(gw_ip).version == 6
for gw_ip in gateway_ips)
- def get_router_preserve_ips(self):
+ def external_gateway_added(self, ex_gw_port, interface_name):
preserve_ips = self._list_floating_ip_cidrs() + list(
self.centralized_port_forwarding_fip_set)
preserve_ips.extend(self.agent.pd.get_preserve_ips(self.router_id))
- return preserve_ips
- def external_gateway_added(self, ex_gw_port, interface_name):
- preserve_ips = self.get_router_preserve_ips()
self._external_gateway_added(
ex_gw_port, interface_name, self.ns_name, preserve_ips)
def external_gateway_updated(self, ex_gw_port, interface_name):
- preserve_ips = self.get_router_preserve_ips()
+ preserve_ips = self._list_floating_ip_cidrs() + list(
+ self.centralized_port_forwarding_fip_set)
+ preserve_ips.extend(self.agent.pd.get_preserve_ips(self.router_id))
self._external_gateway_added(
ex_gw_port, interface_name, self.ns_name, preserve_ips)
diff --git a/agent/linux/dhcp.py b/agent/linux/dhcp.py
index 249e1a8199..5b82fe328f 100644
--- a/agent/linux/dhcp.py
+++ b/agent/linux/dhcp.py
@@ -19,6 +19,7 @@ import os
import re
import shutil
import time
+from typing import List
import netaddr
from neutron_lib.api.definitions import extra_dhcp_opt as edo_ext
@@ -40,6 +41,7 @@ from neutron.agent.linux import iptables_manager
from neutron.cmd import runtime_checks as checks
from neutron.common import ipv6_utils
from neutron.common import utils as common_utils
+from neutron.conf.common import NETWORK_HOST_OPTS
from neutron.ipam import utils as ipam_utils
LOG = logging.getLogger(__name__)
@@ -81,6 +83,51 @@ def port_requires_dhcp_configuration(port):
constants.DEVICE_OWNER_DHCP]
+class Octopus(object):
+ def __init__(self):
+ self.tentacles = collections.defaultdict(Tentacle)
+
+ def __str__(self):
+ lines = ['']
+ for subnet_id, tentacle in self.tentacles.items():
+ line = (f"Subnet {subnet_id} has multi routers "
+ f"{len(tentacle.gateway_ports) > 1} "
+ f"{tentacle}")
+ lines.append(line)
+ return '\n'.join(lines)
+
+
+class Tentacle(object):
+ def __init__(self, gateway_ip: str):
+ self.gateway_ip = gateway_ip
+ self.tags = []
+ self.gateway_ports = collections.defaultdict(Sucker)
+ self.suckers = collections.defaultdict(Sucker)
+
+ def __str__(self):
+ lines = [""]
+ for port_id, sucker in self.suckers.items():
+ line = f" Port {port_id} {sucker}"
+ lines.append(line)
+ return '\n'.join(lines)
+
+
+class Sucker(object):
+ def __init__(self, host: str, device_owner: str, ip_address: str):
+ self.host = host
+ self.device_owner = device_owner
+ self.ip_address = ip_address
+ self.tag = None
+
+ def subnet_tag(self, subnet_id: str):
+ return f'{self.host}-subnet-{subnet_id}'
+
+ def __str__(self):
+ return (f"ip: {self.ip_address} \t"
+ f"binding_host: {self.host} \t"
+ f"device_owner: {self.device_owner}")
+
+
class DictModel(dict):
"""Convert dict into an object that provides attribute access to values."""
@@ -149,6 +196,10 @@ class DhcpBase(object):
self.process_monitor = process_monitor
self.device_manager = DeviceManager(self.conf, plugin)
self.version = version
+ self.octopus = Octopus()
+ self._init_octopus()
+ self.compute_to_network = dict()
+ self._init_compute_to_network()
@abc.abstractmethod
def enable(self):
@@ -193,6 +244,31 @@ class DhcpBase(object):
"""True if the metadata-proxy should be enabled for the network."""
raise NotImplementedError()
+ def _init_compute_to_network(self):
+ for network_node in self.conf.network_nodes:
+ self.conf.register_opts(NETWORK_HOST_OPTS, group=network_node)
+ network_group = self.conf.get(network_node, None)
+ if network_group:
+ compute_nodes = network_group.get('compute_nodes', [])
+ for compute_node in compute_nodes:
+ self.compute_to_network[compute_node] = network_node
+
+ def _init_octopus(self):
+ for subnet in self.network.subnets:
+ self.octopus.tentacles[subnet.id] = Tentacle(subnet.gateway_ip)
+
+ for port in self.network.ports:
+ for ip in port.fixed_ips:
+ host = port.get('binding:host_id')
+ device_owner = port.get('device_owner')
+ ip_address = ip.get('ip_address')
+ tentacle = self.octopus.tentacles[ip.subnet_id]
+ sucker = Sucker(host, device_owner, ip_address)
+ tentacle.suckers[port.id] = sucker
+ if (device_owner in (constants.DEVICE_OWNER_HA_REPLICATED_INT,
+ constants.DEVICE_OWNER_ROUTER_INTF)):
+ tentacle.gateway_ports[host] = sucker
+
@six.add_metaclass(abc.ABCMeta)
class DhcpLocalProcess(DhcpBase):
@@ -841,8 +917,18 @@ class Dnsmasq(DhcpLocalProcess):
(port.mac_address, tag, name, ip_address,
'set:', self._PORT_TAG_PREFIX % port.id))
else:
- buf.write('%s,%s%s,%s\n' %
- (port.mac_address, tag, name, ip_address))
+ subnet_tag = f'subnet-{alloc.subnet_id}'
+ if self.conf.enable_set_route_for_single_port:
+ tentacle = self.octopus.tentacles[alloc.subnet_id]
+ sucker = tentacle.suckers[port.id]
+ tentacle.tags.append(subnet_tag)
+ if sucker.device_owner.startswith(
+ constants.DEVICE_OWNER_COMPUTE_PREFIX):
+ subnet_tag = sucker.subnet_tag(alloc.subnet_id)
+ sucker.tag = subnet_tag
+
+ buf.write(f'{port.mac_address},{tag}{name},{ip_address},'
+ f'set:{subnet_tag}\n')
file_utils.replace_file(filename, buf.getvalue())
LOG.debug('Done building host file %s', filename)
@@ -1059,7 +1145,8 @@ class Dnsmasq(DhcpLocalProcess):
"""Write a dnsmasq compatible options file."""
options, subnet_index_map = self._generate_opts_per_subnet()
options += self._generate_opts_per_port(subnet_index_map)
-
+ if self.conf.enable_set_route_for_single_port:
+ options += self._generate_opts_for_compute_port(options)
name = self.get_conf_file_name('opts')
file_utils.replace_file(name, '\n'.join(options))
return name
@@ -1220,6 +1307,50 @@ class Dnsmasq(DhcpLocalProcess):
vx_ips))))
return options
+ def _generate_opts_for_compute_port(self, options: List[str]) -> List[str]:
+ new_options = []
+ LOG.debug(self.octopus)
+ if not self.compute_to_network:
+ LOG.warning('CONF.enable_set_route_for_single_port is True, '
+ 'but not configured.')
+ return new_options
+ for subnet_id, tentacle in self.octopus.tentacles.items():
+ if len(tentacle.tags) == 0:
+ continue
+ if len(tentacle.gateway_ports) <= 1:
+ LOG.info(f'Subnet {subnet_id} is not bound '
+ f'to different routers, '
+ f'so skip generate options for compute ports.')
+ continue
+ for port_id, sucker in tentacle.suckers.items():
+ if not sucker.tag:
+ continue
+ if not sucker.device_owner.startswith(
+ constants.DEVICE_OWNER_COMPUTE_PREFIX):
+ continue
+ network_node = self.compute_to_network.get(sucker.host, None)
+ if not network_node:
+ LOG.warning(f'Compute host {sucker.host} not configured.')
+ continue
+ port = tentacle.gateway_ports.get(network_node, None)
+ if not port:
+ LOG.warning(f'Subnet {subnet_id} does not have gateway '
+ f'port on network host {network_node}.')
+ continue
+ if tentacle.gateway_ip == port.ip_address:
+ continue
+ for option in options.copy():
+ if subnet_id in option:
+ option = option.replace(f'subnet-{subnet_id}',
+ sucker.tag)
+ if ('option:classless-static-route' in option or
+ ',249,' in option or 'option:router' in option):
+ gateway_ip = option.split(',')[-1]
+ option = option.replace(gateway_ip,
+ port.ip_address)
+ new_options.append(option)
+ return new_options
+
def _make_subnet_interface_ip_map(self):
subnet_lookup = dict(
(netaddr.IPNetwork(subnet.cidr), subnet.id)
diff --git a/agent/linux/interface.py b/agent/linux/interface.py
index 3ac476d7ba..2e6455707c 100644
--- a/agent/linux/interface.py
+++ b/agent/linux/interface.py
@@ -259,17 +259,16 @@ class LinuxInterfaceDriver(object):
@abc.abstractmethod
def plug_new(self, network_id, port_id, device_name, mac_address,
- bridge=None, namespace=None, prefix=None, mtu=None,
- link_up=True):
+ bridge=None, namespace=None, prefix=None, mtu=None):
"""Plug in the interface only for new devices that don't exist yet."""
def plug(self, network_id, port_id, device_name, mac_address,
- bridge=None, namespace=None, prefix=None, mtu=None, link_up=True):
+ bridge=None, namespace=None, prefix=None, mtu=None):
if not ip_lib.device_exists(device_name,
namespace=namespace):
self._safe_plug_new(
network_id, port_id, device_name, mac_address, bridge,
- namespace, prefix, mtu, link_up)
+ namespace, prefix, mtu)
else:
LOG.info("Device %s already exists", device_name)
if mtu:
@@ -279,11 +278,11 @@ class LinuxInterfaceDriver(object):
LOG.warning("No MTU configured for port %s", port_id)
def _safe_plug_new(self, network_id, port_id, device_name, mac_address,
- bridge=None, namespace=None, prefix=None, mtu=None, link_up=True):
+ bridge=None, namespace=None, prefix=None, mtu=None):
try:
self.plug_new(
network_id, port_id, device_name, mac_address, bridge,
- namespace, prefix, mtu, link_up)
+ namespace, prefix, mtu)
except TypeError:
LOG.warning("Interface driver's plug_new() method should now "
"accept additional optional parameter 'link_up'. "
@@ -321,27 +320,10 @@ class LinuxInterfaceDriver(object):
LOG.warning("Interface driver cannot update MTU for ports")
self._mtu_update_warn_logged = True
- def set_link_status(self, device_name, namespace=None, link_up=True):
- ns_dev = ip_lib.IPWrapper(namespace=namespace).device(device_name)
- try:
- utils.wait_until_true(ns_dev.exists, timeout=3)
- except utils.WaitTimeout:
- LOG.debug('Device %s may have been deleted concurrently',
- device_name)
- return False
-
- if link_up:
- ns_dev.link.set_up()
- else:
- ns_dev.link.set_down()
-
- return True
-
class NullDriver(LinuxInterfaceDriver):
def plug_new(self, network_id, port_id, device_name, mac_address,
- bridge=None, namespace=None, prefix=None, mtu=None,
- link_up=True):
+ bridge=None, namespace=None, prefix=None, mtu=None):
pass
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
@@ -377,8 +359,7 @@ class OVSInterfaceDriver(LinuxInterfaceDriver):
ovs.replace_port(device_name, *attrs)
def plug_new(self, network_id, port_id, device_name, mac_address,
- bridge=None, namespace=None, prefix=None, mtu=None,
- link_up=True):
+ bridge=None, namespace=None, prefix=None, mtu=None):
"""Plug in the interface."""
if not bridge:
bridge = self.conf.ovs_integration_bridge
@@ -442,8 +423,8 @@ class OVSInterfaceDriver(LinuxInterfaceDriver):
else:
LOG.warning("No MTU configured for port %s", port_id)
- if link_up:
- ns_dev.link.set_up()
+ ns_dev.link.set_up()
+
if self.conf.ovs_use_veth:
# ovs-dpdk does not do checksum calculations for veth interface
# (bug 1832021)
@@ -488,8 +469,7 @@ class BridgeInterfaceDriver(LinuxInterfaceDriver):
DEV_NAME_PREFIX = 'ns-'
def plug_new(self, network_id, port_id, device_name, mac_address,
- bridge=None, namespace=None, prefix=None, mtu=None,
- link_up=True):
+ bridge=None, namespace=None, prefix=None, mtu=None):
"""Plugin the interface."""
ip = ip_lib.IPWrapper()
@@ -508,8 +488,7 @@ class BridgeInterfaceDriver(LinuxInterfaceDriver):
LOG.warning("No MTU configured for port %s", port_id)
root_veth.link.set_up()
- if link_up:
- ns_veth.link.set_up()
+ ns_veth.link.set_up()
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
"""Unplug the interface."""
diff --git a/agent/linux/keepalived.py b/agent/linux/keepalived.py
index f47a27f1d1..405a781f0b 100644
--- a/agent/linux/keepalived.py
+++ b/agent/linux/keepalived.py
@@ -32,6 +32,7 @@ from neutron.common import utils
VALID_STATES = ['MASTER', 'BACKUP']
VALID_AUTH_TYPES = ['AH', 'PASS']
HA_DEFAULT_PRIORITY = 50
+HA_DEFAULT_MASTER_PRIORITY = 100
PRIMARY_VIP_RANGE_SIZE = 24
KEEPALIVED_SERVICE_NAME = 'keepalived'
KEEPALIVED_EMAIL_FROM = 'neutron@openstack.local'
diff --git a/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py b/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py
index 29b61b2c9b..fbad5f1d9a 100644
--- a/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py
+++ b/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py
@@ -312,6 +312,15 @@ class DhcpAgentNotifyAPI(object):
return False
if set(orig.keys()) != set(new.keys()):
return False
+
+ if cfg.CONF.enable_set_route_for_single_port:
+ device_owner = new.get('device_owner', None)
+ orig_device_owner = orig.get('device_owner', None)
+ if (not orig_device_owner and device_owner and
+ device_owner.startswith(
+ constants.DEVICE_OWNER_COMPUTE_PREFIX)):
+ return True
+
for k in orig.keys():
if k in ('status', 'updated_at', 'revision_number'):
continue
diff --git a/api/rpc/callbacks/resources.py b/api/rpc/callbacks/resources.py
index 734f05eb6f..de56211c53 100644
--- a/api/rpc/callbacks/resources.py
+++ b/api/rpc/callbacks/resources.py
@@ -15,6 +15,7 @@ from neutron.objects import conntrack_helper
from neutron.objects.logapi import logging_resource as log_object
from neutron.objects import network
from neutron.objects import port_forwarding
+from neutron.objects import rg_port_forwarding
from neutron.objects import ports
from neutron.objects.qos import policy
from neutron.objects import securitygroup
@@ -33,6 +34,7 @@ SUBNET = subnet.Subnet.obj_name()
SECURITYGROUP = securitygroup.SecurityGroup.obj_name()
SECURITYGROUPRULE = securitygroup.SecurityGroupRule.obj_name()
PORTFORWARDING = port_forwarding.PortForwarding.obj_name()
+RGPORTFORWARDING = rg_port_forwarding.RGPortForwarding.obj_name()
CONNTRACKHELPER = conntrack_helper.ConntrackHelper.obj_name()
@@ -47,6 +49,7 @@ _VALID_CLS = (
securitygroup.SecurityGroupRule,
log_object.Log,
port_forwarding.PortForwarding,
+ rg_port_forwarding.RGPortForwarding,
conntrack_helper.ConntrackHelper,
)
diff --git a/conf/agent/l3/keepalived.py b/conf/agent/l3/keepalived.py
index bd46c723fc..5ff3492280 100644
--- a/conf/agent/l3/keepalived.py
+++ b/conf/agent/l3/keepalived.py
@@ -20,6 +20,8 @@ from neutron._i18n import _
CLI_OPTS = [
cfg.StrOpt('router_id', help=_('ID of the router')),
+ cfg.StrOpt('master_agent', help=_('The master agent of router')),
+ cfg.ListOpt('slave_agents', help=_('The slave agents of router')),
cfg.StrOpt('namespace', help=_('Namespace of the router')),
cfg.StrOpt('conf_dir', help=_('Path to the router directory')),
cfg.StrOpt('monitor_interface', help=_('Interface to monitor')),
diff --git a/conf/common.py b/conf/common.py
index f885429613..45e0b48723 100644
--- a/conf/common.py
+++ b/conf/common.py
@@ -145,7 +145,21 @@ core_opts = [
"Setting to any positive integer means that on failure "
"the connection is retried that many times. "
"For example, setting to 3 means total attempts to "
- "connect will be 4."))
+ "connect will be 4.")),
+ cfg.BoolOpt('enable_set_route_for_single_port', default=False,
+ help=_("To set route path for every single port "
+ "when the same subnet has multi ports on router.")),
+ cfg.ListOpt('network_nodes',
+ default=[],
+ help=_("The list of network hosts to "
+ "make a network map "
+ "with compute node and network node.")),
+]
+
+NETWORK_HOST_OPTS = [
+ cfg.ListOpt('compute_nodes',
+ default=[],
+ help=_("The list of compute hosts."))
]
core_cli_opts = [
diff --git a/conf/policies/__init__.py b/conf/policies/__init__.py
index aa4dda63d0..15cdaea45a 100644
--- a/conf/policies/__init__.py
+++ b/conf/policies/__init__.py
@@ -34,6 +34,7 @@ from neutron.conf.policies import port
from neutron.conf.policies import qos
from neutron.conf.policies import rbac
from neutron.conf.policies import router
+from neutron.conf.policies import rg_port_forwarding
from neutron.conf.policies import security_group
from neutron.conf.policies import segment
from neutron.conf.policies import service_type
@@ -63,6 +64,7 @@ def list_rules():
qos.list_rules(),
rbac.list_rules(),
router.list_rules(),
+ rg_port_forwarding.list_rules(),
security_group.list_rules(),
segment.list_rules(),
service_type.list_rules(),
diff --git a/conf/policies/rg_port_forwarding.py b/conf/policies/rg_port_forwarding.py
new file mode 100644
index 0000000000..19e2cd5e2f
--- /dev/null
+++ b/conf/policies/rg_port_forwarding.py
@@ -0,0 +1,76 @@
+# Copyright (c) 2023 UnionTech
+# All rights reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from oslo_policy import policy
+from neutron.conf.policies import base
+
+COLLECTION_PATH = '/routers/{router_id}/port_forwardings'
+RESOURCE_PATH = '/routers/{router_id}/port_forwardings/{port_forwarding_id}'
+
+rules = [
+ policy.DocumentedRuleDefault(
+ 'create_router_gateway_port_forwarding',
+ base.RULE_ADMIN_OR_PARENT_OWNER,
+ 'Create a router gateway port forwarding',
+ [
+ {
+ 'method': 'POST',
+ 'path': COLLECTION_PATH,
+ },
+ ]
+ ),
+ policy.DocumentedRuleDefault(
+ 'get_router_gateway_forwarding',
+ base.RULE_ADMIN_OR_PARENT_OWNER,
+ 'Get a router gateway port forwarding',
+ [
+ {
+ 'method': 'GET',
+ 'path': COLLECTION_PATH,
+ },
+ {
+ 'method': 'GET',
+ 'path': RESOURCE_PATH,
+ },
+ ]
+ ),
+ policy.DocumentedRuleDefault(
+ 'update_router_gateway_port_forwarding',
+ base.RULE_ADMIN_OR_PARENT_OWNER,
+ 'Update a floating IP port forwarding',
+ [
+ {
+ 'method': 'PUT',
+ 'path': RESOURCE_PATH,
+ },
+ ]
+ ),
+ policy.DocumentedRuleDefault(
+ 'delete_router_gateway_port_forwarding',
+ base.RULE_ADMIN_OR_PARENT_OWNER,
+ 'Delete a floating IP port forwarding',
+ [
+ {
+ 'method': 'DELETE',
+ 'path': RESOURCE_PATH,
+ },
+ ]
+ ),
+]
+
+
+def list_rules():
+ return rules
diff --git a/db/l3_attrs_db.py b/db/l3_attrs_db.py
index e6d4e298b1..f292b7aa32 100644
--- a/db/l3_attrs_db.py
+++ b/db/l3_attrs_db.py
@@ -19,6 +19,7 @@ from oslo_config import cfg
from neutron._i18n import _
from neutron.db.models import l3_attrs
+from neutron.objects.base import NeutronDbObject
def get_attr_info():
@@ -29,7 +30,11 @@ def get_attr_info():
'availability_zone_hints': {
'default': '[]',
'transform_to_db': az_validator.convert_az_list_to_string,
- 'transform_from_db': az_validator.convert_az_string_to_list}
+ 'transform_from_db': az_validator.convert_az_string_to_list},
+ 'configurations': {
+ 'default': '{}',
+ 'transform_to_db': NeutronDbObject.filter_to_json_str,
+ 'transform_from_db': NeutronDbObject.load_json_from_str}
}
diff --git a/db/l3_db.py b/db/l3_db.py
index 565b422532..b625dc1959 100644
--- a/db/l3_db.py
+++ b/db/l3_db.py
@@ -47,6 +47,7 @@ from neutron.common import ipv6_utils
from neutron.common import utils
from neutron.db import _utils as db_utils
from neutron.db.models import l3 as l3_models
+from neutron.plugins.ml2 import models as ml2_models
from neutron.db import models_v2
from neutron.db import standardattrdescription_db as st_attr
from neutron.extensions import l3
@@ -1086,21 +1087,37 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
# with subnet's gateway-ip, return that router.
# Otherwise return the first router.
RouterPort = l3_models.RouterPort
+ RouterPortBinding = orm.aliased(ml2_models.PortBinding,
+ name="router_port_binding")
+ ComputePortBinding = orm.aliased(ml2_models.PortBinding,
+ name="compute_port_binding")
gw_port = orm.aliased(models_v2.Port, name="gw_port")
# TODO(lujinluo): Need IPAllocation and Port object
routerport_qry = context.session.query(
- RouterPort.router_id, models_v2.IPAllocation.ip_address).join(
- RouterPort.port, models_v2.Port.fixed_ips).filter(
+ RouterPort.router_id, models_v2.IPAllocation.ip_address,
+ RouterPortBinding.host, ComputePortBinding.host,
+ ).join(
+ RouterPort.port, models_v2.Port.fixed_ips
+ ).filter(
models_v2.Port.network_id == internal_port['network_id'],
RouterPort.port_type.in_(constants.ROUTER_INTERFACE_OWNERS),
- models_v2.IPAllocation.subnet_id == internal_subnet['id']
- ).join(gw_port, gw_port.device_id == RouterPort.router_id).filter(
+ models_v2.IPAllocation.subnet_id == internal_subnet['id'],
+ ComputePortBinding.port_id == internal_port['id'],
+ ).join(
+ gw_port, gw_port.device_id == RouterPort.router_id
+ ).filter(
gw_port.network_id == external_network_id,
gw_port.device_owner == DEVICE_OWNER_ROUTER_GW
+ ).join(
+ RouterPortBinding, RouterPortBinding.port_id == models_v2.Port.id
).distinct()
first_router_id = None
- for router_id, interface_ip in routerport_qry:
+ for (router_id, interface_ip,
+ network_host, compute_host) in routerport_qry:
+ network_node = self.compute_to_network.get(compute_host, None)
+ if network_node and network_node == network_host:
+ return router_id
if interface_ip == internal_subnet['gateway_ip']:
return router_id
if not first_router_id:
diff --git a/db/l3_hamode_db.py b/db/l3_hamode_db.py
index bff388e166..4c414016dd 100644
--- a/db/l3_hamode_db.py
+++ b/db/l3_hamode_db.py
@@ -32,6 +32,7 @@ from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as n_exc
from neutron_lib.exceptions import l3 as l3_exc
+from neutron_lib.exceptions import agent as agent_exc
from neutron_lib.exceptions import l3_ext_ha_mode as l3ha_exc
from neutron_lib.objects import exceptions as obj_base
from neutron_lib.plugins import utils as p_utils
@@ -54,6 +55,7 @@ from neutron.db import l3_dvr_db
from neutron.objects import base
from neutron.objects import l3_hamode
from neutron.objects import router as l3_obj
+from neutron.objects import agent as agent_obj
VR_ID_RANGE = set(range(1, 255))
@@ -378,12 +380,85 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
if not self.get_ha_network(context, router['tenant_id']):
self._create_ha_network(context, router['tenant_id'])
+ @staticmethod
+ def _check_router_configurations_creation(context,
+ configurations: dict,
+ is_ha: bool = True):
+ agents = agent_obj.Agent.get_objects(context,
+ binary='neutron-l3-agent')
+ agents = [agent['host'] for agent in agents]
+ master = configurations.get('master_agent', None)
+ slaves = configurations.get('slave_agents', [])
+ preferred_agent = configurations.get('preferred_agent', None)
+ if is_ha:
+ if master and slaves:
+ for agent in [master] + slaves:
+ if agent not in agents:
+ raise agent_exc.AgentNotFound(id=agent)
+ if master in slaves:
+ raise l3_exc.RouterAgentConflict()
+ else:
+ if master or slaves:
+ raise l3_exc.RouterAgentNotGiven()
+ else:
+ if preferred_agent and preferred_agent not in agents:
+ raise agent_exc.AgentNotFound(id=preferred_agent)
+
+ @staticmethod
+ def _check_router_configurations_update(context,
+ configurations: dict,
+ old_configurations: dict,
+ is_ha: bool = True) -> bool:
+ if configurations == old_configurations:
+ return False
+
+ agents = agent_obj.Agent.get_objects(context,
+ binary='neutron-l3-agent')
+ agents = [agent['host'] for agent in agents]
+ master = configurations.get('master_agent', None)
+ slaves = configurations.get('slave_agents', [])
+ preferred_agent = configurations.get('preferred_agent', None)
+ old_master = old_configurations.get('master_agent', None)
+ old_slaves = old_configurations.get('slave_agents', [])
+ old_preferred_agent = old_configurations.get('preferred_agent', None)
+ if is_ha:
+ if master:
+ if master != old_master:
+ if master not in agents:
+ raise agent_exc.AgentNotFound(id=master)
+ old_configurations['master_agent'] = master
+ if slaves:
+ if slaves != old_slaves:
+ for slave in slaves:
+ if slave not in agents:
+ raise agent_exc.AgentNotFound(id=slave)
+ old_configurations['slave_agents'] = slaves
+ if (old_configurations['master_agent'] in
+ old_configurations['slave_agents']):
+ raise l3_exc.RouterAgentConflict()
+ else:
+ if preferred_agent:
+ if preferred_agent != old_preferred_agent:
+ if preferred_agent not in agents:
+ raise agent_exc.AgentNotFound(id=preferred_agent)
+ old_configurations['preferred_agent'] = preferred_agent
+ else:
+ old_configurations['preferred_agent'] = None
+
+ return True
+
@registry.receives(resources.ROUTER, [events.PRECOMMIT_CREATE],
priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE)
def _precommit_router_create(self, resource, event, trigger, context,
router, router_db, **kwargs):
"""Event handler to set ha flag and status on creation."""
is_ha = self._is_ha(router)
+ configurations = router.get('configurations', {})
+ if configurations:
+ self._check_router_configurations_creation(context, configurations,
+ is_ha)
+ self.set_extra_attr_value(context, router_db, 'configurations',
+ configurations)
router['ha'] = is_ha
self.set_extra_attr_value(context, router_db, 'ha', is_ha)
if not is_ha:
@@ -465,6 +540,28 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
self.set_extra_attr_value(
payload.context, payload.desired_state, 'ha', requested_ha_state)
+ @registry.receives(resources.ROUTER, [events.PRECOMMIT_UPDATE],
+ priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE)
+ def _validate_configurations(self, resource, event, trigger, payload=None):
+ old_configurations = payload.states[0].get('configurations', {})
+ configurations = payload.request_body.get('configurations', {})
+
+ if not configurations:
+ return
+
+ if payload.desired_state.admin_state_up:
+ msg = _('Cannot change configurations of active routers. Please '
+ 'set router admin_state_up to False prior to upgrade')
+ raise n_exc.BadRequest(resource='router', msg=msg)
+
+ need_update = self._check_router_configurations_update(
+ payload.context, configurations, old_configurations,
+ payload.states[0]['ha'])
+
+ if need_update:
+ self.set_extra_attr_value(payload.context, payload.desired_state,
+ 'configurations', old_configurations)
+
@registry.receives(resources.ROUTER, [events.AFTER_UPDATE],
priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE)
def _reconfigure_ha_resources(self, resource, event, trigger, context,
diff --git a/db/migration/alembic_migrations/versions/EXPAND_HEAD b/db/migration/alembic_migrations/versions/EXPAND_HEAD
index ffa2bbaaf6..0c8e4a2178 100644
--- a/db/migration/alembic_migrations/versions/EXPAND_HEAD
+++ b/db/migration/alembic_migrations/versions/EXPAND_HEAD
@@ -1 +1 @@
-c613d0b82681
+1c19a98b5eef
diff --git a/db/migration/alembic_migrations/versions/train/expand/1c19a98b5eef_add_router_configurations.py b/db/migration/alembic_migrations/versions/train/expand/1c19a98b5eef_add_router_configurations.py
new file mode 100644
index 0000000000..f12600ef4f
--- /dev/null
+++ b/db/migration/alembic_migrations/versions/train/expand/1c19a98b5eef_add_router_configurations.py
@@ -0,0 +1,36 @@
+# Copyright 2023 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+from alembic import op
+import sqlalchemy as sa
+
+"""add router configurations
+
+Revision ID: 1c19a98b5eef
+Revises: cab12b72ed90
+Create Date: 2023-08-01 10:05:56.412167
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '1c19a98b5eef'
+down_revision = 'cab12b72ed90'
+
+
+def upgrade():
+ # ### commands auto generated by Alembic - please adjust! ###
+ op.add_column('router_extra_attributes',
+ sa.Column('configurations', sa.String(length=4095)))
+ # ### end Alembic commands ###
diff --git a/db/migration/alembic_migrations/versions/train/expand/cab12b72ed90_add_router_gateway_port_forwarding.py b/db/migration/alembic_migrations/versions/train/expand/cab12b72ed90_add_router_gateway_port_forwarding.py
new file mode 100644
index 0000000000..ad511a7ed8
--- /dev/null
+++ b/db/migration/alembic_migrations/versions/train/expand/cab12b72ed90_add_router_gateway_port_forwarding.py
@@ -0,0 +1,55 @@
+# Copyright 2023 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+from alembic import op
+import sqlalchemy as sa
+
+"""add router gateway port forwarding
+
+Revision ID: cab12b72ed90
+Revises: c613d0b82681
+Create Date: 2023-07-04 10:27:54.485453
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'cab12b72ed90'
+down_revision = 'c613d0b82681'
+
+
+def upgrade():
+ # ### commands auto generated by Alembic - please adjust! ###
+ op.create_table(
+ 'rgportforwardings',
+ sa.Column('id', sa.String(length=36), nullable=False),
+ sa.Column('router_id', sa.String(length=36), nullable=False),
+ sa.Column('external_port', sa.Integer(), nullable=False),
+ sa.Column('internal_neutron_port_id', sa.String(length=36),
+ nullable=False),
+ sa.Column('protocol', sa.String(length=40), nullable=False),
+ sa.Column('socket', sa.String(length=36), nullable=False),
+ sa.ForeignKeyConstraint(['internal_neutron_port_id'], ['ports.id'],
+ ondelete='CASCADE'),
+ sa.ForeignKeyConstraint(['router_id'], ['routers.id'],
+ ondelete='CASCADE'),
+ sa.PrimaryKeyConstraint('id'),
+ sa.UniqueConstraint(
+ 'internal_neutron_port_id', 'socket', 'protocol',
+ name='uniq_port_forwardings0internal_neutron_port_id0socket0protocol'),
+ sa.UniqueConstraint(
+ 'router_id', 'external_port', 'protocol',
+ name='uniq_rg_port_forwardings0router_id0external_port0protocol')
+ )
+ # ### end Alembic commands ###
diff --git a/db/models/l3_attrs.py b/db/models/l3_attrs.py
index 6c30ac2c16..904f4ef08d 100644
--- a/db/models/l3_attrs.py
+++ b/db/models/l3_attrs.py
@@ -41,6 +41,8 @@ class RouterExtraAttributes(model_base.BASEV2):
# Availability Zone support
availability_zone_hints = sa.Column(sa.String(255))
+ configurations = sa.Column(sa.String(4095))
+
router = orm.relationship(
'Router', load_on_pending=True,
backref=orm.backref("extra_attributes", lazy='joined',
diff --git a/db/models/rg_port_forwarding.py b/db/models/rg_port_forwarding.py
new file mode 100644
index 0000000000..e7963169a8
--- /dev/null
+++ b/db/models/rg_port_forwarding.py
@@ -0,0 +1,59 @@
+# Copyright (c) 2023 UnionTech
+# All rights reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import sqlalchemy as sa
+from sqlalchemy import orm
+from neutron_lib.db import model_base
+from neutron_lib.db import constants as db_const
+
+from neutron.db.models import l3
+from neutron.db import models_v2
+
+
+class RGPortForwarding(model_base.BASEV2, model_base.HasId):
+ __table_args__ = (
+ sa.UniqueConstraint('router_id', 'external_port', 'protocol',
+ name='uniq_rg_port_forwardings0router_id0'
+ 'external_port0protocol'),
+ sa.UniqueConstraint('internal_neutron_port_id', 'socket', 'protocol',
+ name='uniq_port_forwardings0'
+ 'internal_neutron_port_id0socket0'
+ 'protocol')
+ )
+
+ router_id = sa.Column(sa.String(db_const.UUID_FIELD_SIZE),
+ sa.ForeignKey('routers.id',
+ ondelete="CASCADE"),
+ nullable=False)
+ external_port = sa.Column(sa.Integer, nullable=False)
+ internal_neutron_port_id = sa.Column(
+ sa.String(db_const.UUID_FIELD_SIZE),
+ sa.ForeignKey('ports.id', ondelete="CASCADE"),
+ nullable=False)
+ protocol = sa.Column(sa.String(40), nullable=False)
+ socket = sa.Column(sa.String(36), nullable=False)
+ port = orm.relationship(
+ models_v2.Port, load_on_pending=True,
+ backref=orm.backref("rg_port_forwardings",
+ lazy='subquery', uselist=True,
+ cascade='delete')
+ )
+ router = orm.relationship(
+ l3.Router, load_on_pending=True,
+ backref=orm.backref("rg_port_forwardings",
+ lazy='subquery', uselist=True,
+ cascade='delete')
+ )
+ revises_on_change = ('router', 'port',)
diff --git a/extensions/rg_port_forwarding.py b/extensions/rg_port_forwarding.py
new file mode 100644
index 0000000000..c9888cd1ec
--- /dev/null
+++ b/extensions/rg_port_forwarding.py
@@ -0,0 +1,119 @@
+# Copyright (c) 2023 UnionTech
+# All rights reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import six
+import abc
+import itertools
+from typing import List
+
+from neutron_lib.context import Context
+from neutron_lib.plugins import constants
+from neutron_lib.plugins import directory
+from neutron_lib.services import base as service_base
+from neutron_lib.api.extensions import APIExtensionDescriptor
+from neutron_lib.api.definitions import rg_port_forwarding as apidef
+
+from neutron.api.v2 import base
+from neutron.api.v2 import resource_helper
+from neutron.api.extensions import ResourceExtension
+
+
+class Rg_port_forwarding(APIExtensionDescriptor):
+ api_definition = apidef
+
+ @classmethod
+ def get_plugin_interface(cls):
+ return RGPortForwardingPluginBase
+
+ @classmethod
+ def get_resources(cls):
+ special_mappings = {'routers': 'router'}
+ plural_mappings = resource_helper.build_plural_mappings(
+ special_mappings,
+ itertools.chain(
+ apidef.RESOURCE_ATTRIBUTE_MAP,
+ apidef.SUB_RESOURCE_ATTRIBUTE_MAP
+ )
+ )
+
+ resources = resource_helper.build_resource_info(
+ plural_mappings,
+ apidef.RESOURCE_ATTRIBUTE_MAP,
+ constants.ROUTER_GATEWAY_PORTFORWARDING,
+ translate_name=True,
+ allow_bulk=True)
+
+ plugin = directory.get_plugin(constants.ROUTER_GATEWAY_PORTFORWARDING)
+
+ parent = apidef.SUB_RESOURCE_ATTRIBUTE_MAP[apidef.COLLECTION_NAME].get(
+ 'parent')
+ params = apidef.SUB_RESOURCE_ATTRIBUTE_MAP[apidef.COLLECTION_NAME].get(
+ 'parameters')
+ controller = base.create_resource(
+ apidef.COLLECTION_NAME, apidef.RESOURCE_NAME, plugin, params,
+ allow_bulk=True, parent=parent, allow_pagination=True,
+ allow_sorting=True)
+
+ resource = ResourceExtension(
+ apidef.COLLECTION_NAME, controller, parent, attr_map=params)
+ resources.append(resource)
+
+ return resources
+
+
+@six.add_metaclass(abc.ABCMeta)
+class RGPortForwardingPluginBase(service_base.ServicePluginBase):
+ path_prefix = apidef.API_PREFIX
+
+ @classmethod
+ def get_plugin_type(cls):
+ return constants.ROUTER_GATEWAY_PORTFORWARDING
+
+ def get_plugin_description(self):
+ return "Router Gateway Port Forwarding Service Plugin"
+
+ @abc.abstractmethod
+ def create_router_gateway_port_forwarding(self, context: Context,
+ router_id: str,
+ gateway_port_forwarding: dict):
+ pass
+
+ @abc.abstractmethod
+ def update_router_gateway_port_forwarding(self, context: Context, id: str,
+ router_id: str,
+ gateway_port_forwarding: dict):
+ pass
+
+ @abc.abstractmethod
+ def get_router_gateway_port_forwarding(self, context: Context, id: str,
+ router_id: str,
+ fields: List[str] = None):
+ pass
+
+ @abc.abstractmethod
+ def get_router_gateway_port_forwardings(self, context: Context,
+ router_id: str,
+ filters: List[str] = None,
+ fields: List[str] = None,
+ sorts: List[str] = None,
+ limit: int = None,
+ marker: str = None,
+ page_reverse: bool = False):
+ pass
+
+ @abc.abstractmethod
+ def delete_router_gateway_port_forwarding(self, context: Context, id: str,
+ router_id: str):
+ pass
diff --git a/objects/rg_port_forwarding.py b/objects/rg_port_forwarding.py
new file mode 100644
index 0000000000..28cb4e1d4d
--- /dev/null
+++ b/objects/rg_port_forwarding.py
@@ -0,0 +1,87 @@
+# Copyright (c) 2023 UnionTech
+# All rights reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+from neutron_lib import constants as lib_const
+from oslo_versionedobjects import fields as obj_fields
+
+from neutron.objects import base, common_types
+from neutron.db.models import rg_port_forwarding as models
+
+FIELDS_NOT_SUPPORT_FILTER = ['internal_ip_address', 'internal_port']
+
+
+@base.NeutronObjectRegistry.register
+class RGPortForwarding(base.NeutronDbObject):
+ VERSION = '1.0'
+
+ db_model = models.RGPortForwarding
+
+ primary_keys = ['id']
+ foreign_keys = {
+ 'Router': {'router_id': 'id'},
+ 'Port': {'internal_port_id': 'id'}
+ }
+ fields_need_translation = {
+ 'socket': 'socket',
+ 'internal_port_id': 'internal_neutron_port_id'
+ }
+
+ fields = {
+ 'id': common_types.UUIDField(),
+ 'router_id': common_types.UUIDField(nullable=False),
+ 'external_port': common_types.PortRangeField(nullable=False),
+ 'protocol': common_types.IpProtocolEnumField(nullable=False),
+ 'internal_port_id': common_types.UUIDField(nullable=False),
+ 'internal_ip_address': obj_fields.IPV4AddressField(),
+ 'internal_port': common_types.PortRangeField(nullable=False),
+ 'gw_ip_address': obj_fields.IPV4AddressField(),
+ }
+
+ synthetic_fields = ['gw_ip_address']
+ fields_no_update = {'id', 'router_id'}
+
+ def __eq__(self, other):
+ for attr in self.fields:
+ if getattr(self, attr) != getattr(other, attr):
+ return False
+ return True
+
+ def obj_load_attr(self, attrname):
+ super(RGPortForwarding, self).obj_load_attr(attrname)
+
+ def from_db_object(self, db_obj):
+ super(RGPortForwarding, self).from_db_object(db_obj)
+
+ @classmethod
+ def modify_fields_from_db(cls, db_obj):
+ result = super(RGPortForwarding, cls).modify_fields_from_db(db_obj)
+ if 'socket' in result:
+ groups = result['socket'].split(":")
+ result['internal_ip_address'] = netaddr.IPAddress(
+ groups[0], version=lib_const.IP_VERSION_4)
+ result['internal_port'] = int(groups[1])
+ del result['socket']
+ return result
+
+ @classmethod
+ def modify_fields_to_db(cls, fields):
+ result = super(RGPortForwarding, cls).modify_fields_to_db(fields)
+ if 'internal_ip_address' in result and 'internal_port' in result:
+ result['socket'] = (f"{result['internal_ip_address']}:"
+ f"{result['internal_port']}")
+ del result['internal_ip_address']
+ del result['internal_port']
+ return result
diff --git a/objects/router.py b/objects/router.py
index 1373f89515..9590a109f6 100644
--- a/objects/router.py
+++ b/objects/router.py
@@ -18,6 +18,7 @@ from neutron_lib.api.definitions import availability_zone as az_def
from neutron_lib.api.validators import availability_zone as az_validator
from neutron_lib import constants as n_const
from neutron_lib.utils import net as net_utils
+from neutron_lib.objects import utils as obj_utils
from oslo_versionedobjects import fields as obj_fields
import six
from sqlalchemy import func
@@ -70,7 +71,8 @@ class RouterRoute(base.NeutronDbObject):
@base.NeutronObjectRegistry.register
class RouterExtraAttributes(base.NeutronDbObject):
# Version 1.0: Initial version
- VERSION = '1.0'
+ # Version 1.1: Add configurations
+ VERSION = '1.1'
db_model = l3_attrs.RouterExtraAttributes
@@ -80,7 +82,8 @@ class RouterExtraAttributes(base.NeutronDbObject):
'service_router': obj_fields.BooleanField(default=False),
'ha': obj_fields.BooleanField(default=False),
'ha_vr_id': obj_fields.IntegerField(nullable=True),
- 'availability_zone_hints': obj_fields.ListOfStringsField(nullable=True)
+ 'availability_zone_hints': obj_fields.ListOfStringsField(nullable=True),
+ 'configurations': common_types.DictOfMiscValuesField(nullable=True),
}
primary_keys = ['router_id']
@@ -95,6 +98,9 @@ class RouterExtraAttributes(base.NeutronDbObject):
result[az_def.AZ_HINTS] = (
az_validator.convert_az_string_to_list(
result[az_def.AZ_HINTS]))
+ if 'configurations' in result:
+ result['configurations'] = cls.load_json_from_str(
+ result['configurations'], default={})
return result
@classmethod
@@ -104,6 +110,11 @@ class RouterExtraAttributes(base.NeutronDbObject):
result[az_def.AZ_HINTS] = (
az_validator.convert_az_list_to_string(
result[az_def.AZ_HINTS]))
+ if ('configurations' in result and
+ not isinstance(result['configurations'],
+ obj_utils.StringMatchingFilterObj)):
+ result['configurations'] = (
+ cls.filter_to_json_str(result['configurations']))
return result
@classmethod
diff --git a/scheduler/l3_agent_scheduler.py b/scheduler/l3_agent_scheduler.py
index 5810cf85b8..7a428aef03 100644
--- a/scheduler/l3_agent_scheduler.py
+++ b/scheduler/l3_agent_scheduler.py
@@ -14,13 +14,15 @@
# under the License.
import abc
-import collections
+import random
import functools
import itertools
-import random
+import collections
+from typing import List, Optional
from neutron_lib.api.definitions import availability_zone as az_def
from neutron_lib import constants as lib_const
+from neutron_lib.context import Context
from neutron_lib.db import api as lib_db_api
from neutron_lib.exceptions import l3 as l3_exc
from oslo_config import cfg
@@ -31,8 +33,9 @@ import six
from neutron.common import utils
from neutron.conf.db import l3_hamode_db
from neutron.db.models import l3agent as rb_model
+from neutron.objects.agent import Agent
from neutron.objects import l3agent as rb_obj
-
+from neutron.services.l3_router.l3_router_plugin import L3RouterPlugin
LOG = logging.getLogger(__name__)
cfg.CONF.register_opts(l3_hamode_db.L3_HA_OPTS)
@@ -228,26 +231,25 @@ class L3Scheduler(object):
if not candidates:
return
elif sync_router.get('ha', False):
- chosen_agents = self._bind_ha_router(plugin, context,
- router_id,
- sync_router.get('tenant_id'),
+ chosen_agents = self._bind_ha_router(plugin, context, sync_router,
candidates)
if not chosen_agents:
return
chosen_agent = chosen_agents[-1]
else:
chosen_agent = self._choose_router_agent(
- plugin, context, candidates)
+ context, plugin, candidates, sync_router)
self.bind_router(plugin, context, router_id, chosen_agent.id)
return chosen_agent
@abc.abstractmethod
- def _choose_router_agent(self, plugin, context, candidates):
+ def _choose_router_agent(self, context, plugin, candidates, sync_router):
"""Choose an agent from candidates based on a specific policy."""
pass
@abc.abstractmethod
- def _choose_router_agents_for_ha(self, plugin, context, candidates):
+ def _choose_router_agents_for_ha(self, context, plugin, candidates,
+ sync_router):
"""Choose agents from candidates based on a specific policy."""
pass
@@ -315,19 +317,19 @@ class L3Scheduler(object):
hosting_list = [tuple(host) for host in hosting]
return list(set(candidates) - set(hosting_list))
- def _bind_ha_router(self, plugin, context, router_id,
- tenant_id, candidates):
+ def _bind_ha_router(self, plugin, context, sync_router, candidates):
"""Bind a HA router to agents based on a specific policy."""
-
+ router_id = sync_router.get('id')
+ tenant_id = sync_router.get('tenant_id')
candidates = self._filter_scheduled_agents(plugin, context, router_id,
candidates)
chosen_agents = self._choose_router_agents_for_ha(
- plugin, context, candidates)
+ context, plugin, candidates, sync_router)
for agent in chosen_agents:
- self.create_ha_port_and_bind(plugin, context, router_id,
- tenant_id, agent)
+ self.create_ha_port_and_bind(plugin, context, router_id, tenant_id,
+ agent)
return chosen_agents
@@ -335,10 +337,11 @@ class L3Scheduler(object):
class ChanceScheduler(L3Scheduler):
"""Randomly allocate an L3 agent for a router."""
- def _choose_router_agent(self, plugin, context, candidates):
+ def _choose_router_agent(self, context, plugin, candidates, sync_router):
return random.choice(candidates)
- def _choose_router_agents_for_ha(self, plugin, context, candidates):
+ def _choose_router_agents_for_ha(self, context, plugin, candidates,
+ sync_router):
num_agents = self._get_num_of_agents_for_ha(len(candidates))
return random.sample(candidates, num_agents)
@@ -346,13 +349,14 @@ class ChanceScheduler(L3Scheduler):
class LeastRoutersScheduler(L3Scheduler):
"""Allocate to an L3 agent with the least number of routers bound."""
- def _choose_router_agent(self, plugin, context, candidates):
+ def _choose_router_agent(self, context, plugin, candidates, sync_router):
candidate_ids = [candidate['id'] for candidate in candidates]
chosen_agent = plugin.get_l3_agent_with_min_routers(
context, candidate_ids)
return chosen_agent
- def _choose_router_agents_for_ha(self, plugin, context, candidates):
+ def _choose_router_agents_for_ha(self, context, plugin, candidates,
+ sync_router):
num_agents = self._get_num_of_agents_for_ha(len(candidates))
ordered_agents = plugin.get_l3_agents_ordered_by_num_routers(
context, [candidate['id'] for candidate in candidates])
@@ -397,7 +401,8 @@ class AZLeastRoutersScheduler(LeastRoutersScheduler):
return candidates
- def _choose_router_agents_for_ha(self, plugin, context, candidates):
+ def _choose_router_agents_for_ha(self, context, plugin, candidates,
+ sync_router):
ordered_agents = plugin.get_l3_agents_ordered_by_num_routers(
context, [candidate['id'] for candidate in candidates])
num_agents = self._get_num_of_agents_for_ha(len(ordered_agents))
@@ -416,3 +421,65 @@ class AZLeastRoutersScheduler(LeastRoutersScheduler):
if len(selected_agents) >= num_agents:
break
return selected_agents
+
+
+class PreferredL3AgentRoutersScheduler(LeastRoutersScheduler):
+
+ @staticmethod
+ def get_preferred_agent(sync_router: dict) -> Optional[str]:
+ configurations = sync_router.get('configurations', {})
+ if configurations:
+ return configurations.get('preferred_agent', None)
+ return None
+
+ @staticmethod
+ def get_agents(sync_router: dict) -> Optional[List[str]]:
+ configurations = sync_router.get('configurations', {})
+ if configurations:
+ master = configurations.get('master_agent', None)
+ slaves = configurations.get('slave_agents', [])
+ if master and slaves:
+ return slaves + [master]
+ return []
+
+ def _choose_router_agent(self, context: Context,
+ plugin: L3RouterPlugin,
+ candidates: List[Agent],
+ sync_router: dict) -> Agent:
+ agent = self.get_preferred_agent(sync_router)
+ if agent:
+ new_candidates = [candidate for candidate in candidates
+ if candidate['host'] == agent]
+ if not new_candidates:
+ LOG.warning(f"Router {sync_router['id']} failed to "
+ f"schedule l3 agent on {agent}.")
+ else:
+ agent = new_candidates[0]
+ LOG.debug(f"Router {sync_router['id']} l3 agent is {agent}.")
+ return agent
+ agent = super()._choose_router_agent(context, plugin, candidates,
+ sync_router)
+ return agent
+
+ def _choose_router_agents_for_ha(self, context: Context,
+ plugin: L3RouterPlugin,
+ candidates: List[Agent],
+ sync_router: dict) -> List[Agent]:
+
+ agents = self.get_agents(sync_router)
+ if agents:
+ if self.max_ha_agents < len(agents):
+ agents = agents[len(agents) - self.max_ha_agents:]
+ new_candidates = [candidate for candidate in candidates if
+ candidate['host'] in agents]
+ if len(new_candidates) != len(agents):
+ LOG.warning(f"Router {sync_router['id']} failed to "
+ f"schedule l3 agents on {agents}.")
+ else:
+ LOG.debug(f"Router {sync_router['id']} l3 agents are "
+ f"{new_candidates}.")
+ return new_candidates
+
+ return super(
+ PreferredL3AgentRoutersScheduler, self
+ )._choose_router_agents_for_ha(context, plugin, candidates, sync_router)
diff --git a/services/l3_router/l3_router_plugin.py b/services/l3_router/l3_router_plugin.py
index 2e8a762764..9825138261 100644
--- a/services/l3_router/l3_router_plugin.py
+++ b/services/l3_router/l3_router_plugin.py
@@ -38,6 +38,7 @@ from oslo_utils import importutils
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
+from neutron.conf.common import NETWORK_HOST_OPTS
from neutron.db import dns_db
from neutron.db import extraroute_db
from neutron.db import l3_dvr_ha_scheduler_db
@@ -135,6 +136,17 @@ class L3RouterPlugin(service_base.ServicePluginBase,
self.add_worker(rpc_worker)
self.l3_driver_controller = driver_controller.DriverController(self)
+ self.compute_to_network = dict()
+ self._init_compute_to_network()
+
+ def _init_compute_to_network(self):
+ for network_node in cfg.CONF.network_nodes:
+ cfg.CONF.register_opts(NETWORK_HOST_OPTS, group=network_node)
+ network_group = cfg.CONF.get(network_node, None)
+ if network_group:
+ compute_nodes = network_group.get('compute_nodes', [])
+ for compute_node in compute_nodes:
+ self.compute_to_network[compute_node] = network_node
@property
def supported_extension_aliases(self):
diff --git a/services/rg_portforwarding/__init__.py b/services/rg_portforwarding/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/services/rg_portforwarding/common/__init__.py b/services/rg_portforwarding/common/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/services/rg_portforwarding/common/exceptions.py b/services/rg_portforwarding/common/exceptions.py
new file mode 100644
index 0000000000..73cea68e32
--- /dev/null
+++ b/services/rg_portforwarding/common/exceptions.py
@@ -0,0 +1,77 @@
+# Copyright (c) 2023 UnionTech
+# All rights reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron._i18n import _
+from neutron_lib import exceptions
+
+
+class PortForwardingNotSupportFilterField(exceptions.BadRequest):
+ message = _("Port Forwarding filter %(filter)s is not supported.")
+
+
+class RouterDoesNotHaveGateway(exceptions.BadRequest):
+ message = _("Router %(router_id)s does not have any gateways.")
+
+
+class RouterGatewayPortNotFound(exceptions.NotFound):
+ message = _("Router %(router_id)s 's gateway port %(gw_port_id)s "
+ "could not be found.")
+
+
+class RouterGatewayPortDoesNotHaveAnyIPAddresses(exceptions.NotFound):
+ message = _("Router %(router_id)s 's gateway port %(gw_port_id)s "
+ "does not have any IP addresses.")
+
+
+class RouterGatewayPortForwardingNotFound(exceptions.NotFound):
+ message = _("Router Gateway Port Forwarding %(id)s could not be found.")
+
+
+class PortHasBindingFloatingIP(exceptions.InUse):
+ message = _("Cannot create port forwarding to floating IP "
+ "%(floating_ip_address)s (%(fip_id)s) with port %(port_id)s "
+ "using fixed IP %(fixed_ip)s, as that port already "
+ "has a binding floating IP.")
+
+
+class InconsistentPortAndIP(exceptions.BadRequest):
+ message = _("Port %(port_id)s does not have ip address %(ip_address)s.")
+
+
+class RouterGatewayPortForwardingAlreadyExists(exceptions.BadRequest):
+ message = _("A duplicate router gateway port forwarding entry "
+ "with same attributes already exists, "
+ "conflicting values are %(conflict)s.")
+
+
+class PortNetworkNotBindOnRouter(exceptions.BadRequest):
+ message = _("Port %(port_id)s 's network %(network_id)s "
+ "not bind on router %(router_id)s.")
+
+
+class RouterGatewayPortForwardingUpdateFailed(exceptions.BadRequest):
+ message = _("Another router port forwarding entry with the same "
+ "attributes already exists, conflicting "
+ "values are %(conflict)s.")
+
+
+class DeletedRouterWithRGForwarding(exceptions.InUse):
+ message = _("Cant not delete router, "
+ "router %(router_id)s has port forwardings to remove.")
+
+
+class DeletedRouterGatewayWithRGForwarding(exceptions.InUse):
+ message = _("Cant not delete or update router gateway, "
+ "router %(router_id)s has port forwardings to remove.")
diff --git a/services/rg_portforwarding/pf_plugin.py b/services/rg_portforwarding/pf_plugin.py
new file mode 100644
index 0000000000..ed8e68e53c
--- /dev/null
+++ b/services/rg_portforwarding/pf_plugin.py
@@ -0,0 +1,369 @@
+# Copyright (c) 2023 UnionTech
+# All rights reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from oslo_log import log as logging
+from typing import List, Dict, Optional
+
+from neutron_lib import constants
+from neutron_lib.context import Context
+from neutron_lib.plugins import directory
+from neutron_lib.db import resource_extend
+from neutron_lib.plugins.constants import L3
+from neutron_lib.db.api import CONTEXT_WRITER
+from neutron_lib.exceptions import PortNotFound
+from neutron_lib.exceptions.l3 import RouterNotFound
+from neutron_lib.callbacks import registry, resources
+from neutron_lib.callbacks import events as lib_events
+from neutron_lib.callbacks.events import DBEventPayload
+from neutron_lib.api.definitions import rg_port_forwarding as apidef
+from neutron_lib.objects.exceptions import NeutronDbObjectDuplicateEntry
+
+from neutron.db import db_base_plugin_common
+from neutron.db.l3_dvr_db import is_distributed_router
+from neutron.db.l3_hamode_db import is_ha_router
+
+from neutron.objects.base import Pager
+from neutron.objects.ports import Port
+from neutron.objects.router import Router, FloatingIP
+from neutron.objects.rg_port_forwarding import RGPortForwarding
+from neutron.objects.rg_port_forwarding import FIELDS_NOT_SUPPORT_FILTER
+
+from neutron.extensions.rg_port_forwarding import RGPortForwardingPluginBase
+from neutron.services.l3_router.l3_router_plugin import L3RouterPlugin
+from neutron.services.rg_portforwarding.common import exceptions
+
+from neutron.api.rpc.callbacks import events
+from neutron.api.rpc.handlers import resources_rpc
+
+LOG = logging.getLogger(__name__)
+
+
+@resource_extend.has_resource_extenders
+@registry.has_registry_receivers
+class RGPortForwardingPlugin(RGPortForwardingPluginBase):
+ required_service_plugins = ['router']
+
+ supported_extension_aliases = [apidef.ALIAS]
+
+ __native_pagination_support = True
+ __native_sorting_support = True
+ __filter_validation_support = True
+
+ def __init__(self):
+ super(RGPortForwardingPlugin, self).__init__()
+ self.push_api = resources_rpc.ResourcesPushRpcApi()
+ self.l3_plugin = directory.get_plugin(L3)
+ self.core_plugin = directory.get_plugin()
+
+ @staticmethod
+ def _get_router(context: Context, router_id: str) -> Optional[Router]:
+ router = Router.get_object(context, id=router_id)
+ if not router:
+ raise RouterNotFound(router_id=router_id)
+ return router
+
+ @staticmethod
+ def _get_router_gateway(context: Context, router: Router) -> str:
+ gw_port_id = router.get('gw_port_id', None)
+ if not gw_port_id:
+ raise exceptions.RouterDoesNotHaveGateway(router_id=router.id)
+ gw_port = Port.get_object(
+ context, id=gw_port_id)
+ if not gw_port:
+ raise exceptions.RouterGatewayPortNotFound(router_id=router.id,
+ gw_port_id=gw_port_id)
+ gw_port_ips = gw_port.get("fixed_ips", [])
+ if len(gw_port_ips) <= 0:
+ raise exceptions.RouterGatewayPortDoesNotHaveAnyIPAddresses(
+ router_id=router.id, gw_port_id=gw_port_id)
+ gw_ip_address = gw_port_ips[0].get('ip_address')
+ return gw_ip_address
+
+ @staticmethod
+ def _get_port(context: Context, port_id: str) -> Optional[Port]:
+ port = Port.get_object(context, id=port_id)
+ if not port:
+ raise PortNotFound(port_id=port_id)
+ return port
+
+ @staticmethod
+ def _get_ports(context: Context, router_id: str, port: Port,
+ device_owner: str) -> Optional[List[Port]]:
+ ports = Port.get_ports_by_router_and_network(
+ context, router_id, device_owner, port.network_id)
+ if not ports:
+ raise exceptions.PortNetworkNotBindOnRouter(
+ port_id=port.id,
+ network_id=port.network_id,
+ router_id=router_id)
+ return ports
+
+ @staticmethod
+ def _validate_filter_for_port_forwarding(filters: Dict[str, str]) -> None:
+ if not filters:
+ return
+ for filter_member_key in filters.keys():
+ if filter_member_key in FIELDS_NOT_SUPPORT_FILTER:
+ raise exceptions.PortForwardingNotSupportFilterField(
+ filter=filter_member_key)
+
+ @staticmethod
+ def _check_port_has_binding_floating_ip(context: Context, port_id: str,
+ ip_address: str) -> None:
+ floatingip_objs = FloatingIP.get_objects(
+ context.elevated(),
+ fixed_port_id=port_id)
+ if floatingip_objs:
+ floating_ip_address = floatingip_objs[0].floating_ip_address
+ raise exceptions.PortHasBindingFloatingIP(
+ floating_ip_address=floating_ip_address,
+ fip_id=floatingip_objs[0].id,
+ port_id=port_id,
+ fixed_ip=ip_address)
+
+ @staticmethod
+ def _get_device_owner(router: Router) -> str:
+ if is_distributed_router(router):
+ return constants.DEVICE_OWNER_DVR_INTERFACE
+ elif is_ha_router(router):
+ return constants.DEVICE_OWNER_HA_REPLICATED_INT
+ return constants.DEVICE_OWNER_ROUTER_INTF
+
+ def _check_router_port(self, context: Context, router: Router,
+ port: Port):
+ device_owner = self._get_device_owner(router)
+ self._get_ports(context, router.id, port, device_owner)
+
+ def _check_port(self, context: Context, port_id: str, ip: str) -> Port:
+ port = self._get_port(context, port_id)
+ self._check_port_has_binding_floating_ip(context, port_id, ip)
+ fixed_ips = port.get('fixed_ips', [])
+ result = list(map(lambda x: str(x.get('ip_address')) == ip, fixed_ips))
+ if not any(result):
+ raise exceptions.InconsistentPortAndIP(port_id=port, ip_address=ip)
+ return port
+
+ def _check_router(self, context: Context, router_id: str) -> (Router, str):
+ router = self._get_router(context, router_id)
+ gw_ip_address = self._get_router_gateway(context, router)
+ return router, gw_ip_address
+
+ def _check_port_forwarding_create(self, context: Context, router_id: str,
+ pf_dict: Dict) -> None:
+ router, gw_ip_address = self._check_router(context, router_id)
+ pf_dict['router_id'] = router_id
+ pf_dict[apidef.GW_IP_ADDRESS] = gw_ip_address
+ internal_port_id = pf_dict[apidef.INTERNAL_PORT_ID]
+ internal_ip_address = pf_dict[apidef.INTERNAL_IP_ADDRESS]
+ internal_port = self._check_port(context, internal_port_id,
+ internal_ip_address)
+ self._check_router_port(context, router, internal_port)
+
+ @staticmethod
+ def _check_port_forwarding(context: Context, pf_obj: RGPortForwarding):
+ pf_objs = RGPortForwarding.get_objects(
+ context,
+ router_id=pf_obj.router_id,
+ protocol=pf_obj.protocol)
+
+ for obj in pf_objs:
+ if obj.id == pf_obj.get('id', None):
+ continue
+ # Ensure there are no conflicts on the outside
+ if obj.external_port == pf_obj.external_port:
+ raise exceptions.RouterGatewayPortForwardingAlreadyExists(
+ conflict={
+ 'router_id': pf_obj.router_id,
+ 'protocol': pf_obj.protocol,
+ 'external_port': obj.external_port,
+ }
+ )
+ # Ensure there are no conflicts in the inside
+ # socket: internal_ip_address + internal_port
+ if (obj.internal_port_id == pf_obj.internal_port_id and
+ obj.internal_ip_address == pf_obj.internal_ip_address and
+ obj.internal_port == pf_obj.internal_port):
+ raise exceptions.RouterGatewayPortForwardingAlreadyExists(
+ conflict={
+ 'router_id': pf_obj.router_id,
+ 'protocol': pf_obj.protocol,
+ 'internal_port_id': obj.internal_port_id,
+ 'internal_ip_address': str(obj.internal_ip_address),
+ 'internal_port': obj.internal_port
+ }
+ )
+
+ @staticmethod
+ def _find_existing_rg_port_forwarding(context: Context,
+ router_id: str,
+ port_forwarding: Dict,
+ specify_params: List = None):
+ # Because the session had been flushed by NeutronDbObjectDuplicateEntry
+ # so if we want to use the context to get another db queries, we need
+ # to rollback first.
+ context.session.rollback()
+ if not specify_params:
+ specify_params = [
+ {
+ 'router_id': router_id,
+ 'external_port': port_forwarding['external_port'],
+ 'protocol': port_forwarding['protocol']
+ },
+ {
+ 'internal_port_id': port_forwarding['internal_port_id'],
+ 'internal_ip_address': port_forwarding[
+ 'internal_ip_address'],
+ 'internal_port': port_forwarding['internal_port'],
+ 'protocol': port_forwarding['protocol']
+ }]
+ for param in specify_params:
+ objs = RGPortForwarding.get_objects(context, **param)
+ if objs:
+ return objs[0], param
+
+ @db_base_plugin_common.make_result_with_fields
+ @db_base_plugin_common.convert_result_to_dict
+ def get_router_gateway_port_forwardings(self, context: Context,
+ router_id: str,
+ filters: List[str] = None,
+ fields: List[str] = None,
+ sorts: List[str] = None,
+ limit: int = None,
+ marker: str = None,
+ page_reverse: bool = False):
+
+ router, gw_ip_address = self._check_router(context, router_id)
+ filters = filters or {}
+ self._validate_filter_for_port_forwarding(filters)
+ pager = Pager(sorts, limit, page_reverse, marker)
+ port_forwardings = RGPortForwarding.get_objects(
+ context, _pager=pager, router_id=router_id, **filters)
+ for pf in port_forwardings:
+ setattr(pf, 'gw_ip_address', gw_ip_address)
+ return port_forwardings
+
+ @db_base_plugin_common.convert_result_to_dict
+ def create_router_gateway_port_forwarding(self, context: Context,
+ router_id: str,
+ gateway_port_forwarding: dict):
+ port_forwarding = gateway_port_forwarding.get(apidef.RESOURCE_NAME)
+ self._check_port_forwarding_create(context, router_id, port_forwarding)
+ with CONTEXT_WRITER.using(context):
+ pf_obj = RGPortForwarding(context, **port_forwarding)
+ self._check_port_forwarding(context, pf_obj)
+ try:
+ pf_obj.create()
+ except NeutronDbObjectDuplicateEntry:
+ _, conflict = self._find_existing_rg_port_forwarding(
+ context, router_id, port_forwarding)
+ raise exceptions.RouterGatewayPortForwardingAlreadyExists(
+ conflict=conflict
+ )
+ self.push_api.push(context, [pf_obj], events.CREATED)
+ return pf_obj
+
+ @db_base_plugin_common.convert_result_to_dict
+ def update_router_gateway_port_forwarding(self, context: Context, id: str,
+ router_id: str,
+ gateway_port_forwarding: dict):
+
+ router = self._get_router(context, router_id)
+ gw_ip_address = self._get_router_gateway(context, router)
+ pf_obj = RGPortForwarding.get_object(context, id=id)
+ if not pf_obj:
+ raise exceptions.RouterGatewayPortForwardingNotFound(id=id)
+
+ port_forwarding = gateway_port_forwarding.get(apidef.RESOURCE_NAME, {})
+ port_forwarding[apidef.GW_IP_ADDRESS] = gw_ip_address
+ new_port_id = port_forwarding.get(apidef.INTERNAL_PORT_ID)
+ new_internal_ip = port_forwarding.get(apidef.INTERNAL_IP_ADDRESS, None)
+
+ if new_port_id and new_port_id != pf_obj.internal_port_id:
+ self._check_port_has_binding_floating_ip(context,
+ new_port_id,
+ new_internal_ip)
+
+ if any([new_internal_ip, new_port_id]):
+ port_forwarding.update({
+ apidef.INTERNAL_IP_ADDRESS: new_internal_ip
+ if new_internal_ip else
+ str(pf_obj.internal_ip_address),
+ apidef.INTERNAL_PORT_ID: new_port_id
+ if new_port_id else pf_obj.internal_port
+ })
+
+ with CONTEXT_WRITER.using(context):
+ pf_obj.update_fields(port_forwarding, reset_changes=True)
+ self._check_port_forwarding(context, pf_obj)
+ try:
+ pf_obj.update()
+ except NeutronDbObjectDuplicateEntry:
+ _, conflict = self._find_existing_rg_port_forwarding(
+ context, router_id, port_forwarding)
+ raise exceptions.RouterGatewayPortForwardingAlreadyExists(
+ conflict=conflict
+ )
+ self.push_api.push(context, [pf_obj], events.UPDATED)
+ return pf_obj
+
+ @db_base_plugin_common.make_result_with_fields
+ @db_base_plugin_common.convert_result_to_dict
+ def get_router_gateway_port_forwarding(self, context: Context, id: str,
+ router_id: str,
+ fields: List[str] = None):
+ _, gw_ip_address = self._check_router(context, router_id)
+ pf_obj = RGPortForwarding.get_object(context, id=id)
+ if not pf_obj:
+ raise exceptions.RouterGatewayPortForwardingNotFound(id=id)
+ setattr(pf_obj, apidef.GW_IP_ADDRESS, gw_ip_address)
+ return pf_obj
+
+ def delete_router_gateway_port_forwarding(self, context: Context, id: str,
+ router_id: str):
+ pf_obj = RGPortForwarding.get_object(context, id=id)
+ if not pf_obj:
+ raise exceptions.RouterGatewayPortForwardingNotFound(id=id)
+ with CONTEXT_WRITER.using(context):
+ pf_obj.delete()
+ self.push_api.push(context, [pf_obj], events.DELETED)
+
+ @registry.receives(resources.ROUTER, [lib_events.BEFORE_DELETE])
+ def _receive_router_before_delete(self, resource: str, event: str,
+ trigger: L3RouterPlugin,
+ payload: DBEventPayload):
+ router_id = payload.resource_id
+ context = payload.context
+ port_forwardings = RGPortForwarding.get_objects(context,
+ router_id=router_id)
+ if port_forwardings:
+ ex = exceptions.DeletedRouterWithRGForwarding(router_id=router_id)
+ LOG.info(ex.msg)
+ raise ex
+
+ @registry.receives(resources.ROUTER_GATEWAY, [lib_events.BEFORE_DELETE,
+ lib_events.BEFORE_UPDATE])
+ def _receive_router_gateway_before_delete(self, resource: str, event: str,
+ trigger: L3RouterPlugin,
+ payload: DBEventPayload):
+ router_id = payload.resource_id
+ context = payload.context
+ port_forwardings = RGPortForwarding.get_objects(context,
+ router_id=router_id)
+ if port_forwardings:
+ ex = exceptions.DeletedRouterGatewayWithRGForwarding(
+ router_id=router_id)
+ LOG.info(ex.msg)
+ raise ex