SDN Complete Guide | Software-Defined Networking Architecture & Implementation
Master Software-Defined Networking (SDN) with Cisco ACI, VMware NSX, and open source solutions. Complete guide to SDN controllers, APIs, and network automation.

When Traditional Networks Can't Keep Pace with Digital Transformation
Your development team needs to deploy a new microservices application. They require:
- š Instant network provisioning in minutes, not days
- š Dynamic security policies that follow workloads
- š Real-time visibility into application performance
- š Multi-cloud consistency across environments
But your traditional network responds with:
- ā Manual CLI configurations taking days to implement
- ā Static security policies that can't follow virtual machines
- ā Limited visibility into application dependencies
- ā Inconsistent policies across different environments
The result? Development delays, security gaps, and operational complexity that stifle innovation.
Welcome to Software-Defined Networking (SDN) - where networks become as agile as the applications they support.
SDN Architecture: Separating Control and Data Planes
Traditional vs SDN Architecture:
Traditional Network:
[ Control Plane ] ā Embedded in each device
[ Data Plane ] ā Forwarding decisions per device
SDN Architecture:
[ SDN Controller ] ā Centralized intelligence
[ Southbound API ] ā Communication protocol (OpenFlow)
[ Network Devices ] ā Simplified data plane only
SDN Components Overview:
| Component | Traditional Network | SDN | Benefits |
|---|---|---|---|
| Control Plane | Distributed per device | Centralized controller | Consistent policies |
| Management | CLI per device | API-driven automation | Faster provisioning |
| Visibility | Limited per device | Network-wide view | Better troubleshooting |
| Innovation | Vendor-dependent | Open APIs | Faster feature development |
Cisco Application Centric Infrastructure (ACI)
ACI Architecture Components:
#!/usr/bin/env python3
"""
Cisco ACI Configuration with Python
š“ Red - Tenant and Application Profile configuration
š Orange - Endpoint Group (EPG) settings
š” Yellow - Contract and policy definitions
š¢ Green - Physical domain configurations
šµ Blue - Monitoring and analytics
"""
from cobra.mit.access import MoDirectory
from cobra.mit.session import LoginSession
from cobra.model.pol import Uni
from cobra.model.fv import Tenant, Ctx, BD, Ap, AEPg
from cobra.model.vz import BrCP, Subj, RsSubjFiltAtt
import json
class CiscoACIManager:
def __init__(self, apic_ip, username, password):
self.apic_ip = apic_ip
self.username = username
self.password = password
self.session = None
self.md = None
def connect(self):
"""š“ Connect to APIC controller"""
print("š“ Connecting to Cisco APIC...")
try:
self.session = LoginSession(f'https://{self.apic_ip}', self.username, self.password)
self.md = MoDirectory(self.session)
self.md.login()
print("ā
Successfully connected to APIC")
return True
except Exception as e:
print(f"ā Connection failed: {str(e)}")
return False
def create_tenant(self, tenant_name, description=""):
"""š“ Create ACI Tenant"""
print(f"š“ Creating tenant: {tenant_name}")
# Get the top-level policy universe
polUni = Uni('')
# Create tenant
tenant = Tenant(polUni, tenant_name, descr=description)
# Commit configuration
self.md.commit(tenant)
print(f"ā
Tenant {tenant_name} created successfully")
return tenant
def create_application_profile(self, tenant, app_profile_name):
"""š Create Application Profile"""
print(f"š Creating application profile: {app_profile_name}")
app_profile = Ap(tenant, app_profile_name)
self.md.commit(app_profile)
print(f"ā
Application Profile {app_profile_name} created")
return app_profile
def create_endpoint_group(self, app_profile, epg_name, bd_name):
"""š Create Endpoint Group (EPG)"""
print(f"š Creating EPG: {epg_name}")
epg = AEPg(app_profile, epg_name)
# Associate with Bridge Domain
from cobra.model.fv import RsBd
RsBd(epg, tnFvBDName=bd_name)
self.md.commit(epg)
print(f"ā
EPG {epg_name} created and associated with BD {bd_name}")
return epg
def create_contract(self, tenant, contract_name, subjects):
"""š” Create Contract for communication policies"""
print(f"š” Creating contract: {contract_name}")
contract = BrCP(tenant, contract_name)
for subject_name, filters in subjects.items():
subject = Subj(contract, subject_name)
for filter_name in filters:
RsSubjFiltAtt(subject, tnVzFilterName=filter_name)
self.md.commit(contract)
print(f"ā
Contract {contract_name} created with {len(subjects)} subjects")
return contract
# Example usage
def main():
# Initialize ACI Manager
aci_mgr = CiscoACIManager('10.1.100.100', 'admin', 'SecurePass123!')
if aci_mgr.connect():
# Create tenant for application
tenant = aci_mgr.create_tenant('WebApp-Tenant', 'Three-tier web application')
# Create application profile
app_profile = aci_mgr.create_application_profile(tenant, 'Ecommerce-App')
# Create EPGs for each tier
web_epg = aci_mgr.create_endpoint_group(app_profile, 'Web-Tier', 'Web-BD')
app_epg = aci_mgr.create_endpoint_group(app_profile, 'App-Tier', 'App-BD')
db_epg = aci_mgr.create_endpoint_group(app_profile, 'DB-Tier', 'DB-BD')
# Define communication contracts
contracts = {
'Web-to-App': ['http', 'https'],
'App-to-DB': ['sql-server']
}
aci_mgr.create_contract(tenant, 'App-Communication', contracts)
print("\nšÆ ACI Application-Centric Configuration Complete!")
if __name__ == "__main__":
main()
VMware NSX-T Data Center Implementation
NSX-T Architecture and Configuration:
# š£ NSX-T Configuration Overview
NSX_Manager:
š“ Management Plane: NSX Manager
š Control Plane: Control Cluster
š” Data Plane: Transport Nodes
Networking:
š¢ Logical Switches: Overlay networks
šµ Logical Routers: Distributed routing
š£ Transport Zones: VLAN/VXLAN
Security:
š¤ Distributed Firewall: Micro-segmentation
š Service Insertion: Advanced services
š“ Gateway Firewall: North-South protection
NSX-T Python Automation:
#!/usr/bin/env python3
"""
VMware NSX-T Automation with Python
š“ Red - NSX Manager operations
š Orange - Logical switching configuration
š” Yellow - Security policy automation
š¢ Green - Monitoring and troubleshooting
šµ Blue - Integration with vSphere
"""
import requests
import json
import urllib3
# Disable SSL warnings for lab environments
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class NSXTManager:
def __init__(self, nsx_manager, username, password):
self.nsx_manager = nsx_manager
self.base_url = f"https://{nsx_manager}/api/v1"
self.session = requests.Session()
self.session.auth = (username, password)
self.session.verify = False # In production, use proper certificates
self.headers = {'Content-Type': 'application/json'}
def create_logical_switch(self, switch_name, transport_zone_id, vlan_id=None):
"""š Create logical switch in NSX-T"""
print(f"š Creating logical switch: {switch_name}")
logical_switch_config = {
"display_name": switch_name,
"transport_zone_id": transport_zone_id,
"admin_state": "UP",
"replication_mode": "MTEP"
}
if vlan_id:
logical_switch_config["vlan"] = vlan_id
url = f"{self.base_url}/logical-switches"
response = self.session.post(
url,
data=json.dumps(logical_switch_config),
headers=self.headers
)
if response.status_code == 201:
print(f"ā
Logical switch {switch_name} created successfully")
return response.json()
else:
print(f"ā Failed to create logical switch: {response.text}")
return None
def create_distributed_firewall_rule(self, rule_name, source_groups, destination_groups, services, action="ALLOW"):
"""š” Create distributed firewall rule"""
print(f"š” Creating DFW rule: {rule_name}")
firewall_rule = {
"display_name": rule_name,
"source_groups": source_groups,
"destination_groups": destination_groups,
"services": services,
"action": action,
"logged": False,
"disabled": False
}
url = f"{self.base_url}/firewall/rules"
response = self.session.post(
url,
data=json.dumps(firewall_rule),
headers=self.headers
)
if response.status_code == 201:
print(f"ā
DFW rule {rule_name} created successfully")
return response.json()
else:
print(f"ā Failed to create DFW rule: {response.text}")
return None
def enable_microsegmentation(self, security_group_name, inclusion_criteria):
"""š¢ Configure micro-segmentation security groups"""
print(f"š¢ Configuring micro-segmentation: {security_group_name}")
security_group = {
"display_name": security_group_name,
"expression": inclusion_criteria
}
url = f"{self.base_url}/ns-groups"
response = self.session.post(
url,
data=json.dumps(security_group),
headers=self.headers
)
if response.status_code == 201:
print(f"ā
Security group {security_group_name} created")
return response.json()
else:
print(f"ā Failed to create security group: {response.text}")
return None
# Example usage
def nsxt_automation_example():
"""šµ Complete NSX-T automation example"""
nsxt = NSXTManager('nsxt-manager.company.com', 'admin', 'SecurePass123!')
# Create logical switches for application tiers
transport_zone_id = "tz-12345678-1234-1234-1234-123456789012"
logical_switches = [
{"name": "Web-Tier-LS", "vlan": 100},
{"name": "App-Tier-LS", "vlan": 200},
{"name": "DB-Tier-LS", "vlan": 300}
]
created_switches = {}
for switch in logical_switches:
result = nsxt.create_logical_switch(
switch["name"],
transport_zone_id,
switch["vlan"]
)
if result:
created_switches[switch["name"]] = result["id"]
# Configure micro-segmentation
security_groups = {
"Web-Servers": ["VM.name contains 'web'"],
"App-Servers": ["VM.name contains 'app'"],
"Database-Servers": ["VM.name contains 'db'"]
}
for sg_name, criteria in security_groups.items():
nsxt.enable_microsegmentation(sg_name, criteria)
# Create firewall rules
firewall_rules = [
{
"name": "Allow-Web-to-App",
"sources": ["Web-Servers"],
"destinations": ["App-Servers"],
"services": ["/infra/services/HTTP", "/infra/services/HTTPS"]
},
{
"name": "Allow-App-to-DB",
"sources": ["App-Servers"],
"destinations": ["Database-Servers"],
"services": ["/infra/services/MS-SQL"]
}
]
for rule in firewall_rules:
nsxt.create_distributed_firewall_rule(
rule["name"],
rule["sources"],
rule["destinations"],
rule["services"]
)
print("\nšÆ NSX-T Micro-segmentation Configuration Complete!")
if __name__ == "__main__":
nsxt_automation_example()
Open Source SDN with OpenDaylight and Open vSwitch
OpenDaylight Controller Configuration:
#!/usr/bin/env python3
"""
OpenDaylight SDN Controller Automation
š“ Red - OpenDaylight REST API operations
š Orange - Flow programming with OpenFlow
š” Yellow - Network topology discovery
š¢ Green - Switch configuration
šµ Blue - Monitoring and statistics
"""
import requests
import json
from datetime import datetime
class OpenDaylightManager:
def __init__(self, controller_ip, username, password):
self.controller_ip = controller_ip
self.base_url = f"http://{controller_ip}:8181/restconf"
self.auth = (username, password)
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
def get_network_topology(self):
"""š” Get current network topology"""
print("š” Retrieving network topology...")
url = f"{self.base_url}/operational/network-topology:network-topology"
response = requests.get(url, auth=self.auth, headers=self.headers)
if response.status_code == 200:
topology = response.json()
print("ā
Network topology retrieved successfully")
return topology
else:
print(f"ā Failed to get topology: {response.text}")
return None
def get_switches(self):
"""š¢ Get list of connected switches"""
print("š¢ Retrieving connected switches...")
url = f"{self.base_url}/operational/opendaylight-inventory:nodes"
response = requests.get(url, auth=self.auth, headers=self.headers)
if response.status_code == 200:
nodes = response.json()
switches = []
for node in nodes.get('nodes', {}).get('node', []):
if 'openflow' in node['id']:
switches.append({
'id': node['id'],
'description': node.get('flow-node-inventory:description', 'N/A')
})
print(f"ā
Found {len(switches)} OpenFlow switches")
return switches
else:
print(f"ā Failed to get switches: {response.text}")
return None
def install_flow(self, switch_id, flow_config):
"""š Install flow on OpenFlow switch"""
print(f"š Installing flow on switch {switch_id}...")
url = f"{self.base_url}/config/opendaylight-inventory:nodes/node/{switch_id}/flow-node-inventory:table/0"
response = requests.post(
url,
data=json.dumps(flow_config),
auth=self.auth,
headers=self.headers
)
if response.status_code in [200, 201, 204]:
print("ā
Flow installed successfully")
return True
else:
print(f"ā Failed to install flow: {response.text}")
return False
def create_qos_flow(self, switch_id, priority, match, actions, queue_id):
"""šµ Create QoS flow for traffic prioritization"""
print(f"šµ Creating QoS flow on switch {switch_id}...")
flow_config = {
"flow-node-inventory:flow": [
{
"id": f"qos-flow-{priority}",
"priority": priority,
"match": match,
"instructions": {
"instruction": [
{
"order": 0,
"apply-actions": {
"action": actions
}
},
{
"order": 1,
"go-to-table": {
"table_id": 1
}
}
]
}
}
]
}
return self.install_flow(switch_id, flow_config)
# Example usage
def openflow_automation():
"""š“ Complete OpenFlow automation example"""
odl = OpenDaylightManager('10.1.100.50', 'admin', 'admin')
# Discover network topology
topology = odl.get_network_topology()
# Get connected switches
switches = odl.get_switches()
if switches:
for switch in switches[:2]: # Configure first two switches
switch_id = switch['id']
# Create voice traffic QoS flow
voice_flow_match = {
"ethernet-match": {
"ethernet-type": {"type": 2048}
},
"ip-match": {"ip-protocol": 17},
"udp-destination-port": 16384
}
voice_flow_actions = [
{
"order": 0,
"set-queue-action": {"queue-id": 4}
}
]
odl.create_qos_flow(
switch_id,
priority=1000,
match=voice_flow_match,
actions=voice_flow_actions,
queue_id=4
)
# Create video traffic QoS flow
video_flow_match = {
"ethernet-match": {
"ethernet-type": {"type": 2048}
},
"ip-match": {"ip-protocol": 17},
"udp-destination-port": 16385
}
video_flow_actions = [
{
"order": 0,
"set-queue-action": {"queue-id": 3}
}
]
odl.create_qos_flow(
switch_id,
priority=900,
match=video_flow_match,
actions=video_flow_actions,
queue_id=3
)
print("\nšÆ OpenFlow QoS Configuration Complete!")
if __name__ == "__main__":
openflow_automation()
SD-WAN: Software-Defined Wide Area Networking
SD-WAN Architecture and Benefits:
#!/usr/bin/env python3
"""
SD-WAN Configuration and Monitoring
š“ Red - Cisco SD-WAN (Viptela) configuration
š Orange - VMware SD-WAN (Velocloud) setup
š” Yellow - Path selection and quality monitoring
š¢ Green - Security policy application
šµ Blue - Application-aware routing
"""
import requests
import json
import time
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class SDWANConnection:
interface: str
type: str # MPLS, Internet, LTE, etc.
cost: int
quality: float
status: str
class SDWANManager:
def __init__(self, vmanage_ip, username, password):
self.vmanage_ip = vmanage_ip
self.base_url = f"https://{vmanage_ip}/dataservice"
self.session = requests.Session()
self.session.verify = False
self.login(username, password)
def login(self, username, password):
"""š“ Authenticate with vManage"""
print("š“ Authenticating with SD-WAN controller...")
login_data = {
"j_username": username,
"j_password": password
}
response = self.session.post(
f"https://{self.vmanage_ip}/j_security_check",
data=login_data
)
if response.status_code == 200:
print("ā
Successfully authenticated with vManage")
else:
print(f"ā Authentication failed: {response.status_code}")
def get_device_list(self):
"""š¢ Get list of SD-WAN devices"""
print("š¢ Retrieving SD-WAN devices...")
url = f"{self.base_url}/device"
response = self.session.get(url)
if response.status_code == 200:
devices = response.json()['data']
print(f"ā
Found {len(devices)} SD-WAN devices")
return devices
else:
print(f"ā Failed to get devices: {response.text}")
return []
def get_connection_quality(self, device_id):
"""š” Get connection quality metrics"""
print(f"š” Checking connection quality for device {device_id}...")
url = f"{self.base_url}/device/bfd/sites?deviceId={device_id}"
response = self.session.get(url)
if response.status_code == 200:
quality_data = response.json()['data']
connections = []
for connection in quality_data:
connections.append(SDWANConnection(
interface=connection.get('local-color', 'Unknown'),
type=connection.get('color', 'Unknown'),
cost=connection.get('cost', 0),
quality=connection.get('latency', 999),
status=connection.get('state', 'down')
))
return connections
else:
print(f"ā Failed to get connection quality: {response.text}")
return []
def create_app_route_policy(self, policy_name, applications, preferred_path):
"""šµ Create application-aware routing policy"""
print(f"šµ Creating app-route policy: {policy_name}")
policy_config = {
"name": policy_name,
"sequences": [
{
"sequenceName": "Priority-Apps",
"sequenceType": "appRoute",
"match": {
"applicationList": applications
},
"actions": [
{
"type": "backupSlaPreferredColor",
"preferredColor": preferred_path
}
]
}
]
}
url = f"{self.base_url}/template/policy/approute"
response = self.session.post(url, json=policy_config)
if response.status_code == 200:
print(f"ā
App-route policy {policy_name} created")
return response.json()
else:
print(f"ā Failed to create policy: {response.text}")
return None
def monitor_application_performance(self, application_name, duration_minutes=5):
"""š Monitor application performance across paths"""
print(f"š Monitoring {application_name} performance...")
end_time = int(time.time() * 1000)
start_time = end_time - (duration_minutes * 60 * 1000)
url = f"{self.base_url}/statistics/approute"
params = {
"app": application_name,
"startTime": start_time,
"endTime": end_time
}
response = self.session.get(url, params=params)
if response.status_code == 200:
stats = response.json()['data']
print(f"š Application Performance for {application_name}:")
for stat in stats:
path = stat.get('local-color', 'Unknown')
latency = stat.get('avg-latency', 'N/A')
jitter = stat.get('avg-jitter', 'N/A')
loss = stat.get('loss-percentage', 'N/A')
print(f" {path}: Latency={latency}ms, Jitter={jitter}ms, Loss={loss}%")
return stats
else:
print(f"ā Failed to get performance stats: {response.text}")
return []
# Example usage
def sdwan_automation():
"""š“ Complete SD-WAN automation example"""
sdwan = SDWANManager('vmanage.company.com', 'admin', 'SecurePass123!')
# Get device inventory
devices = sdwan.get_device_list()
# Monitor connection quality for each device
for device in devices[:3]: # First 3 devices
device_id = device['deviceId']
device_name = device['host-name']
print(f"\nš” Checking {device_name}...")
connections = sdwan.get_connection_quality(device_id)
for conn in connections:
status_icon = "š¢" if conn.status == "up" else "š“"
print(f" {status_icon} {conn.interface} ({conn.type}): "
f"Cost={conn.cost}, Quality={conn.quality}ms")
# Create application-aware routing policy
critical_apps = ["webex-teams", "office365", "salesforce"]
sdwan.create_app_route_policy(
"Business-Critical-Apps",
critical_apps,
"biz-internet"
)
# Monitor application performance
sdwan.monitor_application_performance("webex-teams")
print("\nšÆ SD-WAN Automation Complete!")
if __name__ == "__main__":
sdwan_automation()
SDN Security and Micro-Segmentation
Zero Trust Security Implementation:
#!/usr/bin/env python3
"""
SDN Security and Zero Trust Implementation
š“ Red - Security policy enforcement
š Orange - Threat detection and response
š” Yellow - Micro-segmentation configuration
š¢ Green - Compliance monitoring
šµ Blue - API security
"""
class SDNSecurityManager:
def __init__(self):
self.security_policies = {}
self.threat_feeds = []
self.compliance_rules = {}
def create_microsegmentation_policy(self, policy_name, source_group, dest_group, allowed_services):
"""š” Create micro-segmentation security policy"""
print(f"š” Creating micro-segmentation policy: {policy_name}")
policy = {
'name': policy_name,
'source_group': source_group,
'destination_group': dest_group,
'allowed_services': allowed_services,
'action': 'ALLOW',
'logging': True,
'enabled': True
}
self.security_policies[policy_name] = policy
print(f"ā
Micro-segmentation policy '{policy_name}' created")
return policy
def enable_threat_intelligence(self, threat_feed_urls):
"""š Enable threat intelligence feeds"""
print("š Configuring threat intelligence feeds...")
for feed_url in threat_feed_urls:
self.threat_feeds.append({
'url': feed_url,
'last_update': None,
'enabled': True
})
print(f"ā
Added threat feed: {feed_url}")
def implement_zero_trust_rules(self):
"""š“ Implement Zero Trust security rules"""
print("š“ Implementing Zero Trust security rules...")
zero_trust_rules = [
{
'name': 'ZT-Deny-By-Default',
'description': 'Default deny all traffic',
'action': 'DENY',
'priority': 65535
},
{
'name': 'ZT-Require-Authentication',
'description': 'Require authentication for all east-west traffic',
'action': 'AUTHENTICATE',
'priority': 1
},
{
'name': 'ZT-Log-All-Flows',
'description': 'Log all network flows for audit',
'action': 'ALLOW_LOG',
'priority': 1000
}
]
for rule in zero_trust_rules:
self.security_policies[rule['name']] = rule
print(f"ā
Zero Trust rule '{rule['name']}' implemented")
def monitor_compliance(self, compliance_standard):
"""š¢ Monitor security compliance"""
print(f"š¢ Monitoring compliance with {compliance_standard}...")
compliance_checks = {
'NIST': [
'Check for default credentials',
'Verify encryption in transit',
'Validate access controls',
'Audit logging enabled'
],
'PCI-DSS': [
'Network segmentation verified',
'Cardholder data encrypted',
'Access logs maintained',
'Security monitoring active'
],
'HIPAA': [
'PHI data protected',
'Access controls enforced',
'Audit trails maintained',
'Security incidents monitored'
]
}
checks = compliance_checks.get(compliance_standard, [])
results = {}
for check in checks:
# Simulate compliance check
results[check] = 'PASS' # In real implementation, actually perform checks
print(f"ā
Compliance monitoring for {compliance_standard} completed")
return results
# Example usage
def sdn_security_demo():
"""šµ SDN Security Implementation Demo"""
security_mgr = SDNSecurityManager()
print("š”ļø SDN Security Implementation")
print("=" * 50)
# Implement Zero Trust
security_mgr.implement_zero_trust_rules()
# Create micro-segmentation policies
app_segments = [
{'source': 'Web-Servers', 'dest': 'App-Servers', 'services': ['HTTP', 'HTTPS']},
{'source': 'App-Servers', 'dest': 'Database-Servers', 'services': ['SQL']},
{'source': 'Admin-Stations', 'dest': 'All-Servers', 'services': ['SSH', 'RDP']}
]
for segment in app_segments:
policy_name = f"Allow-{segment['source']}-to-{segment['dest']}"
security_mgr.create_microsegmentation_policy(
policy_name,
segment['source'],
segment['dest'],
segment['services']
)
# Enable threat intelligence
threat_feeds = [
'https://feeds.company.com/malware-ips',
'https://feeds.company.com/suspicious-domains',
'https://feeds.company.com/known-attackers'
]
security_mgr.enable_threat_intelligence(threat_feeds)
# Monitor compliance
compliance_results = security_mgr.monitor_compliance('NIST')
print(f"\nš Security Summary:")
print(f" Policies: {len(security_mgr.security_policies)}")
print(f" Threat Feeds: {len(security_mgr.threat_feeds)}")
print(f" Compliance Checks: {len(compliance_results)}")
print("\nšÆ SDN Security Implementation Complete!")
if __name__ == "__main__":
sdn_security_demo()
Ready to Transform Your Network with SDN?
Software-Defined Networking represents the biggest shift in networking since the advent of Ethernet. By abstracting network control from physical hardware, SDN enables unprecedented agility, security, and operational efficiency.
Don't let legacy networking hold back your digital transformation. Embrace SDN and build networks that move at the speed of business.
š¢ Follow for more SDN and network automation insights: LinkedIn Page WhatsApp Channel
Need help with your SDN journey? Contact us for SDN strategy, design, and implementation services!
#SDN #SoftwareDefinedNetworking #CiscoACI #VMwareNSX #OpenFlow #NetworkAutomation #ZeroTrust


