Commit 8946234b authored by Nikolai Kristiansen's avatar Nikolai Kristiansen
Browse files

Merge branch 'bump-bonanza-and-python3'

parents bc6029e2 b7826993
Pipeline #4 skipped
......@@ -2,7 +2,7 @@
*.swp
temp
*.log
settings.py
dns_mdb.db
local_settings.py
db.sqlite3
/.idea
/venv
A Django app for managing computer equipment.
Generates DHCP configs and zone files.
Dependencies:
sudo apt-get install libmysqlclient-dev python-dev libldap2-dev libsasl2-dev bind9utils
Setup environment:
mdb$ virtualenv venv
mdb$ . venv/bin/activate
(venv)mdb$ pip install -r requirements.txt
Create a config file:
mdb$ cp mdb/settings-sample.py mdb/settings.py
# Edit settings.py with your favorite editor
Run the server:
(venv)mdb$ python manage.py runserver
A Django app for managing computer equipment.
## Installation
sudo apt install libmysqlclient-dev python-dev libldap2-dev libsasl2-dev bind9utils
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python manage.py migrate
python manage.py runserver
## Development tasks
Dump test data:
python manage.py dumpdata mdb --natural-foreign --indent=4 > mdb/fixtures/test_data.json
## Features
* Creates DHCPD configuration files
* Creates DNS zone files for BIND
* Creates PXE config files (with random secret for use with Puppet autosign)
* Partial IPv6 support
#!/usr/bin/env python
import os, sys
import os
import sys
if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mdb.settings")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mdbsite.settings")
from django.core.management import execute_from_command_line
execute_from_command_line(sys.argv)
default_app_config = 'mdb.apps.MdbAppConfig'
from mdb.models import *
from django.contrib import admin
from django.contrib import admin, messages
from mdb.models import Ip6Address, Interface, Ip4Address, DhcpOption, DhcpCustomField, DomainSrvRecord, DomainTxtRecord, \
DomainCnameRecord, DomainARecord, Domain, Host, Ip4Subnet, Ip6Subnet, Nameserver, MailExchange, OperatingSystem, \
HostType, DhcpConfig
class Ip6AddressInline(admin.TabularInline):
......@@ -20,17 +23,17 @@ class HostAdmin(admin.ModelAdmin):
ordering = ('hostname',)
inlines = [InterfaceInline]
list_display = ['hostname', 'owner', 'host_type', 'location', 'mac_addresses', 'ip_addresses', 'in_domain',
'ipv6_enabled']
list_filter = ['host_type', 'owner', 'location']
'pxe_installable', 'ipv6_enabled']
list_filter = ['host_type', 'owner', 'location', 'pxe_installable']
readonly_fields = ['kerberos_principal_name', 'kerberos_principal_created_date',
'kerberos_principal_created']
search_fields = ['hostname', 'location', 'interface__macaddr', 'interface__ip4address__address']
fieldsets = (
('Owner Information', {
'fields': ('owner', 'location', 'description' )
'fields': ('owner', 'location', 'description')
}),
('Hardware and Software Information', {
'fields': (('brand', 'model', 'serial_number'), ('hostname', 'host_type'), ('operating_system', 'virtual'))
'fields': (('hostname', 'host_type'), ('pxe_key', 'pxe_installable'), ('brand', 'model'), 'serial_number', ('operating_system', 'virtual'))
}),
# FIXME: Not in use
# ('Domain and Kerberos Information', {
......@@ -40,6 +43,26 @@ class HostAdmin(admin.ModelAdmin):
# ('kerberos_principal_name', 'kerberos_principal_created_date'))
# }),
)
actions = ['set_installable']
def _get_host_warning_message(self, host, ifs):
ifs_str = ', '.join([_if.name for _if in ifs])
return 'Host {} might not be installable since interface(s) {} has no IP set.'.format(host, ifs_str)
def set_installable(self, request, queryset):
invalid_ifs_per_host = []
for obj in queryset:
invalid_ifs = obj.interface_set.filter(ip4address__isnull=True)
if invalid_ifs.exists():
invalid_ifs_per_host.append(self._get_host_warning_message(obj, invalid_ifs))
obj.pxe_installable = True
obj.save() # triggers post save signal
if invalid_ifs_per_host:
self.message_user(request, message='\n'.join(invalid_ifs_per_host), level=messages.WARNING)
set_installable.short_description = "Mark selected hosts as PXE installable"
def get_queryset(self, request):
return super(HostAdmin, self).get_queryset(request).select_related()
......
from mdb.models import Host
from rest_framework import serializers
from rest_framework.exceptions import ValidationError
class HostPXEValidateSerializer(serializers.Serializer):
certname = serializers.CharField(required=True)
pxe_key = serializers.CharField(required=True)
host = None
def _get_host_by_fqdn(self, attrs):
fqdn = attrs['certname']
host_name = fqdn.split('.')[0]
domain = '.'.join(fqdn.split('.')[1:])
try:
return Host.objects.get(hostname=host_name, interface__domain__domain_name=domain)
except (Host.DoesNotExist, Host.MultipleObjectsReturned):
return None
def validate(self, attrs):
# custom validation
host = self._get_host_by_fqdn(attrs)
if host is None:
raise ValidationError('Host \'{}\' not found.'.format(attrs['certname']))
if not host.pxe_installable:
raise ValidationError('Host \'{}\' is marked as not installable via PXE.'.format(attrs['certname']))
if not host.pxe_key:
raise ValidationError('Host \'{}\' has no pxe_key.'.format(attrs['certname']))
if host.pxe_key != attrs['pxe_key']:
raise ValidationError('Supplied pxe_key does not match host \'{}\'.'.format(attrs['certname']))
self.host = host
return attrs
def create(self, validated_data):
# Make requests kind of idempotent by only allowing 1 request / installation
self.host.pxe_installable = False
self.host.save()
return self.host
from rest_framework.authentication import TokenAuthentication
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from mdb.api.serializers import HostPXEValidateSerializer
class HostPXEValidate(APIView):
authentication_classes = [TokenAuthentication]
permission_classes = [IsAuthenticated]
def post(self, request):
serializer = HostPXEValidateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
host = serializer.save() # sets Host.pxe_installable=False
return Response({'valid_pxe_key': True})
from django.apps import AppConfig
class MdbAppConfig(AppConfig):
name = 'mdb'
verbose_name = 'Machine Database'
def ready(self):
# all models are loaded, now attach signals
import mdb.signals
\ No newline at end of file
This diff is collapsed.
......@@ -6,7 +6,11 @@ from difflib import context_diff
from django.core.management.base import BaseCommand
from django.core.mail import mail_admins
from optparse import make_option
from commands import getstatusoutput
try:
from subprocess import getstatusoutput
except ImportError:
from commands import getstatusoutput
from mdb.models import DhcpConfig
......@@ -42,13 +46,13 @@ class Command(BaseCommand):
# do we need to do anything?
if config.serial == config.active_serial and not force:
print "Nothing to do, serial is the same: %s" % config.serial
print("Nothing to do, serial is the same: %s" % config.serial)
sys.exit(0)
# write to stdout
if output == sys.stdout:
print config.dhcpd_configuration()
print 'Not writing to file, serial not updated'
print(config.dhcpd_configuration())
print('Not writing to file, serial not updated')
return
# read current file contents
......@@ -71,7 +75,7 @@ class Command(BaseCommand):
tofile=config.serial,
lineterm=''))
print diff
print(diff)
# update serial
old_serial = config.active_serial
......
......@@ -7,8 +7,12 @@ from difflib import context_diff
from django.core.management.base import BaseCommand
from django.core.mail import mail_admins
from optparse import make_option
from commands import getstatusoutput
from mdb.models import *
try:
from subprocess import getstatusoutput
except ImportError:
from commands import getstatusoutput
from mdb.models import Domain, Ip4Subnet, Ip6Subnet
class Command(BaseCommand):
......@@ -43,7 +47,7 @@ class Command(BaseCommand):
# do the binaries exist?
for f in (self.checkzone_bin, self.bind_bin):
if not os.path.isfile(f):
print "ERROR: no such file %s, exiting..." % f
print("ERROR: no such file %s, exiting..." % f)
if not debug:
sys.exit(1)
......@@ -106,11 +110,11 @@ class Command(BaseCommand):
'diff': None,
}
print "updating %s %s [%d -> %d]" % (
print("updating %s %s [%d -> %d]" % (
zonetype,
zone.domain_name,
zone.domain_active_serial,
zone.domain_serial)
zone.domain_serial))
# generate zone file contents
new = zone.zone_file_contents()
......
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
import django.core.validators
import re
class Migration(migrations.Migration):
dependencies = [
('mdb', '0005_auto_20150822_2204'),
]
operations = [
migrations.AlterModelOptions(
name='dhcpcustomfield',
options={'verbose_name': 'DHCP custom field'},
),
migrations.AlterField(
model_name='domain',
name='domain_admin',
field=models.EmailField(max_length=254),
),
migrations.AlterField(
model_name='domain',
name='domain_ipaddr',
field=models.GenericIPAddressField(protocol='IPv4'),
),
migrations.AlterField(
model_name='host',
name='hostname',
field=models.CharField(validators=[django.core.validators.RegexValidator(code='invalid', message='Enter a valid hostname', regex=re.compile('^(?!-)[-a-z0-9]+(?<!-)$', 34))], max_length=64),
),
migrations.AlterField(
model_name='interface',
name='ip4address',
field=models.OneToOneField(to='mdb.Ip4Address', null=True, blank=True),
),
migrations.AlterField(
model_name='interface',
name='macaddr',
field=models.CharField(validators=[django.core.validators.RegexValidator(code='invalid', message='Enter a valid MAC address', regex=re.compile('^([0-9A-F]{2}[:]){5}([0-9A-F]{2})$', 34))], max_length=17),
),
migrations.AlterField(
model_name='ip4address',
name='address',
field=models.GenericIPAddressField(protocol='IPv4'),
),
migrations.AlterField(
model_name='ip4subnet',
name='dhcp_dynamic_end',
field=models.GenericIPAddressField(protocol='IPv4', null=True, blank=True),
),
migrations.AlterField(
model_name='ip4subnet',
name='dhcp_dynamic_start',
field=models.GenericIPAddressField(protocol='IPv4', null=True, blank=True),
),
migrations.AlterField(
model_name='ip4subnet',
name='domain_admin',
field=models.EmailField(max_length=254),
),
migrations.AlterField(
model_name='ip4subnet',
name='netmask',
field=models.GenericIPAddressField(protocol='IPv4'),
),
migrations.AlterField(
model_name='ip4subnet',
name='network',
field=models.GenericIPAddressField(protocol='IPv4'),
),
migrations.AlterField(
model_name='ip6subnet',
name='domain_admin',
field=models.EmailField(max_length=254),
),
migrations.AlterField(
model_name='operatingsystem',
name='arch',
field=models.CharField(max_length=255, choices=[('mips', 'MIPS'), ('arm', 'ARM'), ('broadcom', 'Broadcom'), ('rc32300', 'RC32300'), ('powerpc', 'PowerPC'), ('powerpc403ga', 'PowerPC403GA'), ('unknown', 'Unknown'), ('x86_64', 'x86-64'), ('i386', 'x86')], null=True, blank=True),
),
]
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('mdb', '0006_auto_20151001_2235'),
]
operations = [
migrations.AddField(
model_name='host',
name='pxe_installable',
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name='host',
name='pxe_key',
field=models.CharField(max_length=254, blank=True),
),
]
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('mdb', '0007_auto_20151003_1313'),
]
operations = [
migrations.AlterField(
model_name='ip6address',
name='address',
field=models.GenericIPAddressField(protocol='IPv6'),
),
]
This diff is collapsed.
import logging
import os
import uuid
from django.conf import settings
from django.db.models.signals import pre_delete, post_save, pre_save
from django.dispatch import receiver
import ipaddress
from mdb.models import Interface, Host, Ip6Subnet, Ip4Subnet, Ip4Address
from mdb.utils import format_domain_serial_and_add_one
logger = logging.getLogger(__name__)
@receiver(post_save, sender=Ip4Subnet)
def create_ips_for_subnet(sender, instance, created, **kwargs):
if not created:
return
subnet = ipaddress.IPv4Network(instance.network + "/" + instance.netmask)
for addr in subnet.hosts():
address = Ip4Address(address=str(addr), subnet=instance)
address.save()
@receiver(pre_delete, sender=Ip4Subnet)
def delete_ips_for_subnet(sender, instance, **kwargs):
for addr in instance.ip4address_set.all():
addr.delete()
@receiver(pre_save, sender=Ip4Subnet)
def set_domain_name_for_subnet(sender, instance, **kwargs):
# we assume that the reverse domain_name does not change
if len(instance.domain_name) == 0:
ipspl = instance.network.split(".")
rev = "%s.%s.%s" % (ipspl[2], ipspl[1], ipspl[0])
instance.domain_name = "%s.in-addr.arpa" % rev
# update it's own serial
# if instance.domain_serial is not None:
# instance.domain_serial = format_domain_serial_and_add_one(instance.domain_serial)
# lets update the serial of the dhcp config
# when the subnet is changed
if instance.dhcp_config:
# instance.dhcp_config.serial = instance.dhcp_config.serial + 1
instance.dhcp_config.serial = format_domain_serial_and_add_one(instance.dhcp_config.serial)
instance.dhcp_config.save()
@receiver(pre_save, sender=Ip6Subnet)
def set_domain_name_for_ipv6_subnet(sender, instance, **kwargs):
if len(instance.domain_name) > 0:
return
network = ipaddress.IPv6Address("%s::" % instance.network)
instance.domain_name = ".".join(network.exploded.replace(":", "")[:16])[::-1] + ".ip6.arpa"
@receiver(post_save, sender=Interface)
def update_domain_serial_when_change_to_interface(sender, instance, created, **kwargs):
if instance.domain is not None:
domain = instance.domain
domain.domain_serial = format_domain_serial_and_add_one(domain.domain_serial)
domain.save()
if instance.ip4address is not None:
subnet = instance.ip4address.subnet
subnet.domain_serial = format_domain_serial_and_add_one(subnet.domain_serial)
subnet.save()
@receiver(post_save, sender=Host)
def update_domain_serial_when_change_to_host(sender, instance, created, **kwargs):
for interface in instance.interface_set.all():
if interface.domain is not None:
domain = interface.domain
domain.domain_serial = format_domain_serial_and_add_one(domain.domain_serial)
domain.save()
if interface.ip4address is not None:
subnet = interface.ip4address.subnet
subnet.domain_serial = format_domain_serial_and_add_one(subnet.domain_serial)
subnet.save()
@receiver(post_save, sender=Host)
def create_pxe_key_and_write_pxe_files_when_host_changes(sender, instance, created, **kwargs):
host = instance
if not host.pxe_key:
# generate key (prevent infinite recursion by using update)
pxe_key = uuid.uuid4().hex
Host.objects.filter(pk=host.pk).update(pxe_key=pxe_key)
host.pxe_key = pxe_key
for pxe_file_name, pxe_file in host.as_pxe_files():
path = os.path.join(settings.MDB_PXE_TFTP_ROOT, pxe_file_name)
if host.pxe_installable:
with open(path, 'w+') as f:
f.write(pxe_file)
logger.info("Created or updated {}".format(path))
# Also set necessary fields for pxe installation
interfaces = host.interface_set.exclude(ip4address__isnull=True)
for _if in interfaces:
changed = False
if not _if.pxe_filename:
_if.pxe_filename = 'pxelinux.0' # don't overwrite
changed = True
if not _if.dhcp_client:
_if.dhcp_client = True
changed = True
if changed:
_if.save()
else:
if os.path.exists(path):
os.unlink(path)
logger.info("deleted {}".format(path))
@receiver(pre_delete, sender=Interface)
def update_domain_serial_when_interface_deleted(sender, instance, **kwargs):
if instance.domain is not None:
domain = instance.domain
domain.domain_serial += 1
domain.save()
if instance.ip4address is not None:
subnet = instance.ip4address.subnet
subnet.domain_serial += 1
subnet.save()
# @receiver(pre_save, sender=Domain)
# def update_domain_serial_when_domain_is_saved(sender, instance, **kwargs):
# instance.domain_serial = format_domain_serial_and_add_one(instance.domain_serial)
kernel {{kernel}}
append ramdisk_size=14984 locale=en_US console-setup/ask_detect=false keyboard-configuration/layoutcode=no netcfg/wireless_wep= netcfg_choose_interface=eth0 netcfg/get_hostname= preseed/url={{preseed_config_url}} vga=normal initrd={{initrd}} -- snop={{host.pxe_key}}
prompt 0
timeout 0
\ No newline at end of file
"""
This file demonstrates writing tests using the unittest module. These will pass
when you run "manage.py test".
import io
from django.contrib.auth.models import User
from django.core.management import call_command
from django.core.urlresolvers import reverse
from django.test import TestCase
from rest_framework.authtoken.models import Token
from rest_framework.test import APITestCase
Replace this with more appropriate tests for your application.
"""
from mdb.models import Host
from django.test import TestCase
class MyAPITestCases(APITestCase):
fixtures = ['test_data']
def create_user_with_token(self):
self.user = User.objects.create(username='admin')
self.token = Token.objects.create(user=self.user)
self.host = Host.objects.first()
def setUp(self):
self.create_user_with_token()
return super(MyAPITestCases, self).setUp()
def test_validate_puppet_host_by_secret(self):
data = {
'pxe_key': self.host.pxe_key,
'certname': '{}.{}'.format(self.host.hostname, self.host.interface_set.first().domain.domain_name)
}
self.client.credentials(HTTP_AUTHORIZATION='Token {}'.format(self.token.key))
res = self.client.post(reverse('api-validate-host-secret'), data, format='json')
self.assertEquals(res.status_code, 200, res.content)
self.assertFalse(Host.objects.get(pk=self.host.pk).pxe_installable)