Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -\*- coding: UTF-8 -\*-
- #!/usr/bin/python
- #############################################################################
- # Name: samdbtest.py
- # Purpose: Example routines to access SamDB via Python2
- # Maintainers: Torsten Kurbad <[email protected]>
- # Copyright: (c) iwm-kmrc.de KMRC - Knowledge Media Research Center
- # License: GPLv2
- #############################################################################
- __docformat__ = 'restructuredtext'
- from base64 import b64decode, b64encode
- from time import time
- import ldb
- from samba import dsdb, unix2nttime
- from samba.auth import system_session
- from samba.common import normalise_int32
- from samba.credentials import Credentials
- from samba.dsdb import _samdb_get_domain_sid
- from samba.ndr import ndr_unpack, ndr_pack
- from samba.param import LoadParm
- from samba.samdb import SamDB
- from samba.dsdb import (
- GTYPE_SECURITY_DOMAIN_LOCAL_GROUP,
- GTYPE_SECURITY_GLOBAL_GROUP,
- GTYPE_SECURITY_UNIVERSAL_GROUP,
- GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP,
- GTYPE_DISTRIBUTION_GLOBAL_GROUP,
- GTYPE_DISTRIBUTION_UNIVERSAL_GROUP,
- UF_ACCOUNTDISABLE,
- UF_DONT_EXPIRE_PASSWD,
- UF_PASSWORD_EXPIRED,
- UF_PASSWD_NOTREQD,
- )
- class ActiveDirectory(object):
- COUNTRYCODES=dict(
- AF=('Afghanistan', '004'),
- AX=('Åland-Inseln', '248'),
- AL=('Albanien', '008'),
- DZ=('Algerien', '012'),
- AS=('Samoa', '016'),
- AD=('Andorra', '020'),
- AO=('Angola', '024'),
- AI=('Anguilla', '660'),
- AQ=('Antarktis', '010'),
- AG=('Antigua und Barbuda', '028'),
- AR=('Argentinien', '032'),
- AM=('Armenien', '051'),
- AW=('Aruba', '533'),
- AU=('Australien', '036'),
- AT=('Österreich', '040'),
- AZ=('Aserbaidschan', '031'),
- BS=('Bahamas', '044'),
- BH=('Bahrain', '048'),
- BD=('Bangladesch', '050'),
- BB=('Barbados', '052'),
- BY=('Weißrussland', '112'),
- BE=('Belgien', '056'),
- BZ=('Belize', '084'),
- BJ=('Benin', '204'),
- BM=('Bermuda', '060'),
- BT=('Bhutan', '064'),
- BO=('Bolivien, Plurinationaler Staat', '068'),
- BQ=('Bonaire, Sint Eustatius und Saba', '535'),
- BA=('Bosnien und Herzegowina', '070'),
- BW=('Botsuana', '072'),
- BV=('Bouvet-Insel', '074'),
- BR=('Brasilien', '076'),
- IO=('Britisches Territorium im Indischen Ozean', '086'),
- BN=('Brunei Darussalam', '096'),
- BG=('Bulgarien', '100'),
- BF=('Burkina Faso', '854'),
- BI=('Burundi', '108'),
- KH=('Kambodscha', '116'),
- CM=('Kamerun', '120'),
- CA=('Kanada', '124'),
- CV=('Kap Verde', '132'),
- KY=('Cayman-Inseln', '136'),
- CF=('Zentralafrikanische Republik', '140'),
- TD=('Tschad', '148'),
- CL=('Chile', '152'),
- CN=('China', '156'),
- CX=('Weihnachtsinseln', '162'),
- CC=('Kokos-(Keeling-)Inseln', '166'),
- CO=('Kolumbien', '170'),
- KM=('Komoren', '174'),
- CG=('Kongo', '178'),
- CD=('Demokratische Republik Kongo', '180'),
- CK=('Cookinseln', '184'),
- CR=('Costa Rica', '188'),
- CI=('Côte d\'Ivoire', '384'),
- HR=('Kroatien', '191'),
- CU=('Kuba', '192'),
- CW=('Curaçao', '531'),
- CY=('Zypern', '196'),
- CZ=('Tschechische Republik', '203'),
- DK=('Dänemark', '208'),
- DJ=('Dschibuti', '262'),
- DM=('Dominica', '212'),
- DO=('Dominikanische Republik', '214'),
- EC=('Ecuador', '218'),
- EG=('Ägypten', '818'),
- SV=('El Salvador', '222'),
- GQ=('Äquatorialguinea', '226'),
- ER=('Eritrea', '232'),
- EE=('Estland', '233'),
- ET=('Äthiopien', '231'),
- FK=('Falklandinseln (Malwinen)', '238'),
- FO=('Färöer-Inseln', '234'),
- FJ=('Fidschi', '242'),
- FI=('Finnland', '246'),
- FR=('Frankreich', '250'),
- GF=('Französisch-Guyana', '254'),
- PF=('Französisch-Polynesien', '258'),
- TF=('Französische Süd- und Antarktisgebiete', '260'),
- GA=('Gabun', '266'),
- GM=('Gambia', '270'),
- GE=('Georgien', '268'),
- DE=('Deutschland', '276'),
- GH=('Ghana', '288'),
- GI=('Gibraltar', '292'),
- GR=('Griechenland', '300'),
- GL=('Grönland', '304'),
- GD=('Grenada', '308'),
- GP=('Guadeloupe', '312'),
- GU=('Guam', '316'),
- GT=('Guatemala', '320'),
- GG=('Guernsey', '831'),
- GN=('Guinea', '324'),
- GW=('Guinea-Bissau', '624'),
- GY=('Guyana', '328'),
- HT=('Haiti', '332'),
- HM=('Heard und McDonaldinseln', '334'),
- VA=('Heiliger Stuhl (Staat Vatikanstadt)', '336'),
- HN=('Honduras', '340'),
- HK=('Hongkong', '344'),
- HU=('Ungarn', '348'),
- IS=('Island', '352'),
- IN=('Indien', '356'),
- ID=('Indonesien', '360'),
- IR=('Iran, Islamische Republik', '364'),
- IQ=('Irak', '368'),
- IE=('Irland', '372'),
- IM=('Insel Man', '833'),
- IL=('Israel', '376'),
- IT=('Italien', '380'),
- JM=('Jamaika', '388'),
- JP=('Japan', '392'),
- JE=('Jersey', '832'),
- JO=('Jordanien', '400'),
- KZ=('Kasachstan', '398'),
- KE=('Kenia', '404'),
- KI=('Kiribati', '296'),
- KP=('Korea, Demokratische Volksrepublik', '408'),
- KR=('Korea, Republik', '410'),
- KW=('Kuwait', '414'),
- KG=('Kirgisistan', '417'),
- LA=('Laos, Demokratische Volksrepublik', '418'),
- LV=('Lettland', '428'),
- LB=('Libanon', '422'),
- LS=('Lesotho', '426'),
- LR=('Liberia', '430'),
- LY=('Libysch-Arabische Dschamahirija', '434'),
- LI=('Liechtenstein', '438'),
- LT=('Litauen', '440'),
- LU=('Luxemburg', '442'),
- MO=('Macao', '446'),
- MK=('Mazedonien, ehemalige jugoslawische Republik', '807'),
- MG=('Madagaskar', '450'),
- MW=('Malawi', '454'),
- MY=('Malaysia', '458'),
- MV=('Malediven', '462'),
- ML=('Mali', '466'),
- MT=('Malta', '470'),
- MH=('Marshallinseln', '584'),
- MQ=('Martinique', '474'),
- MR=('Mauretanien', '478'),
- MU=('Mauritius', '480'),
- YT=('Mayotte', '175'),
- MX=('Mexiko', '484'),
- FM=('Mikronesien, Föderierte Staaten von', '583'),
- MD=('Moldau, Republik', '498'),
- MC=('Monaco', '492'),
- MN=('Mongolei', '496'),
- ME=('Montenegro', '499'),
- MS=('Montserrat', '500'),
- MA=('Marokko', '504'),
- MZ=('Mosambik', '508'),
- MM=('Myanmar', '104'),
- NA=('Namibia', '516'),
- NR=('Nauru', '520'),
- NP=('Nepal', '524'),
- NL=('Niederlande', '528'),
- NC=('Neukaledonien', '540'),
- NZ=('Neuseeland', '554'),
- NI=('Nicaragua', '558'),
- NE=('Niger', '562'),
- NG=('Nigeria', '566'),
- NU=('Niue', '570'),
- NF=('Norfolkinsel', '574'),
- MP=('Nördliche Mariana-Inseln', '580'),
- NO=('Norwegen', '578'),
- OM=('Oman', '512'),
- PK=('Pakistan', '586'),
- PW=('Palau', '585'),
- PS=('Palästinensische Autonomiegebiete', '275'),
- PA=('Panama', '591'),
- PG=('Papua-Neuguinea', '598'),
- PY=('Paraguay', '600'),
- PE=('Peru', '604'),
- PH=('Philippinen', '608'),
- PN=('Pitcairn', '612'),
- PL=('Polen', '616'),
- PT=('Portugal', '620'),
- PR=('Puerto Rico', '630'),
- QA=('Katar', '634'),
- RE=('Réunion', '638'),
- RO=('Rumänien', '642'),
- RU=('Russische Föderation', '643'),
- RW=('Ruanda', '646'),
- BL=('Saint-Barthélemy', '652'),
- SH=('St. Helena, Ascension und Tristan da Cunha', '654'),
- KN=('St. Kitts und Nevis', '659'),
- LC=('St. Lucia', '662'),
- MF=('Saint Martin (Französischer Teil)', '663'),
- PM=('St. Pierre und Miquelon', '666'),
- VC=('St. Vincent und die Grenadinen', '670'),
- WS=('Samoa', '882'),
- SM=('San Marino', '674'),
- ST=('São Tomé und Príncipe', '678'),
- SA=('Saudi-Arabien', '682'),
- SN=('Senegal', '686'),
- RS=('Serbien', '688'),
- SC=('Seychellen', '690'),
- SL=('Sierra Leone', '694'),
- SG=('Singapur', '702'),
- SX=('Saint-Martin', '702'),
- SK=('Slowakei', '703'),
- SI=('Slowenien', '705'),
- SB=('Salomoninseln', '090'),
- SO=('Somalia', '706'),
- ZA=('Südafrika', '710'),
- GS=('South Georgia und die Südlichen Sandwichinseln', '239'),
- ES=('Spanien', '724'),
- LK=('Sri Lanka', '144'),
- SD=('Sudan', '736'),
- SR=('Suriname', '740'),
- SJ=('Svalbard und Jan Mayen', '744'),
- SZ=('Swasiland', '748'),
- SE=('Schweden', '752'),
- CH=('Schweiz', '756'),
- SY=('Syrien, Arabische Republik', '760'),
- TW=('Taiwan, Chinesische Provinz', '158'),
- TJ=('Tadschikistan', '762'),
- TZ=('Tansania, Vereinigte Republik', '834'),
- TH=('Thailand', '764'),
- TL=('Timor-Leste', '626'),
- TG=('Togo', '768'),
- TK=('Tokelau', '772'),
- TO=('Tonga', '776'),
- TT=('Trinidad und Tobago', '780'),
- TN=('Tunesien', '788'),
- TR=('Türkei', '792'),
- TM=('Turkmenistan', '795'),
- TC=('Turks- und Caicosinseln', '796'),
- TV=('Tuvalu', '798'),
- UG=('Uganda', '800'),
- UA=('Ukraine', '804'),
- AE=('Vereinigte Arabische Emirate', '784'),
- GB=('Vereinigtes Königreich', '826'),
- US=('Vereinigte Staaten', '840'),
- UM=('United States Minor Outlying Islands', '581'),
- UY=('Uruguay', '858'),
- UZ=('Usbekistan', '860'),
- VU=('Vanuatu', '548'),
- VE=('Venezuela, Bolivarische Republik', '862'),
- VN=('Vietnam', '704'),
- VG=('Britische Jungferninseln', '092'),
- VI=('Amerikanische Jungferninseln', '850'),
- WF=('Wallis und Futuna', '876'),
- EH=('Westsahara', '732'),
- YE=('Jemen', '887'),
- ZM=('Sambia', '894'),
- ZW=('Simbabwe', '716'),
- )
- GROUPTYPES = dict(
- SECURITY = dict(
- DOMAIN = normalise_int32(GTYPE_SECURITY_DOMAIN_LOCAL_GROUP),
- GLOBAL = normalise_int32(GTYPE_SECURITY_GLOBAL_GROUP),
- UNIVERSAL = normalise_int32(GTYPE_SECURITY_UNIVERSAL_GROUP)),
- DISTRIBUTION = dict(
- DOMAIN = normalise_int32(GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP),
- GLOBAL = normalise_int32(GTYPE_DISTRIBUTION_GLOBAL_GROUP),
- UNIVERSAL = normalise_int32(GTYPE_DISTRIBUTION_UNIVERSAL_GROUP))
- )
- ATTRMAP = dict(
- facsimiletelephonenumber = 'fax',
- l = 'location',
- )
- def __init__(self, realm=None, url=None):
- self.realm = realm
- self.url = url
- self._credentials = Credentials()
- self._lp = LoadParm()
- self._session_info = system_session()
- self._connection = None
- self._attrs = None
- def _connect(self, url=None):
- if url is None:
- if self.url is None:
- raise Exception('Need an URL to connect to.')
- url = self.url
- try:
- connection = SamDB(url=url,
- session_info=self._session_info,
- credentials=self._credentials,
- lp=self._lp)
- except Exception, e:
- raise Exception('Unable to connect to SamDB. Reason: %s' % e)
- self._connection = connection
- self.url = url
- return connection
- def _getDomainDN(self, connection=None):
- if connection is None:
- connection = self._connection
- try:
- return connection.domain_dn()
- except Exception, e:
- raise Exception('Unable to retrieve Domain DN. Reason: %s' % e)
- def _getDomainSID(self, connection=None):
- if connection is None:
- connection = self._connection
- try:
- return _samdb_get_domain_sid(self._connection)
- except Exception, e:
- raise Exception('Unable to retrieve Domain SID. Reason: %s' % e)
- def _search(self, expression=None, attrs=None, scope=ldb.SCOPE_SUBTREE,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- try:
- return self._connection.search(domainDN, scope=scope,
- expression=expression,
- attrs=attrs)
- except Exception, e:
- raise Exception('Unable to search Active Directory. Reason: %s'
- % e)
- def _parseLDBMessage(self, message, utf8=False, attrMap=None):
- if attrMap is None:
- attrMap = self.ATTRMAP
- dn = message.get('dn').extended_str()
- attrs = {}
- for (key, element) in message.items():
- if key != 'dn':
- attr = []
- for value in element:
- if utf8:
- try:
- value = value.decode('utf-8')
- except UnicodeDecodeError:
- pass
- attr.append(value)
- attrs[key.lower()] = attr
- if key.lower() in attrMap.keys():
- attrs[attrMap[key.lower()].lower()] = attr
- return (dn, attrs)
- def _parseLDBMessageList(self, messageList, utf8=False):
- result = []
- for message in messageList:
- result.append(self._parseLDBMessage(message, utf8=utf8))
- return result
- def _modifyFromDict(self, modifyDN, modifyDict,
- attrs=None,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- expression = '(dn=%s)' % modifyDN
- ldbEntry = self._search(expression=expression, attrs=attrs)
- try:
- assert(len(ldbEntry) == 1)
- except AssertionError:
- raise Exception('Unable to find entry for DN "%s"' % dn)
- (ldbEntryDN, ldbEntryDict) = self._parseLDBMessage(ldbEntry[0])
- message = ldb.Message()
- message.dn = ldb.Dn(self._connection, modifyDN)
- for (modifyKey, modifyValue) in modifyDict.items():
- ldbFlag = ldb.FLAG_MOD_ADD
- if modifyKey.lower() in ldbEntryDict.keys():
- ldbFlag = ldb.FLAG_MOD_REPLACE
- if modifyValue in (None, '', []):
- ldbFlag = ldb.FLAG_MOD_DELETE
- modifyValue = ldbEntryDict[modifyKey.lower()]
- if (modifyValue is not None) and (modifyValue != ''):
- message[modifyKey] = ldb.MessageElement(modifyValue,
- ldbFlag,
- modifyKey)
- if len(message.keys()) > 1:
- self._connection.modify(message)
- def _setLpOptions(self, lpOptions=None):
- if lpOptions is not None:
- for (key, value) in lpOptions.items():
- if (key == 'realm') and (self.realm is None):
- self.realm = value
- self._lp.set(key, value)
- def _setCredentials(self, username, password, realm=None):
- if realm is None:
- realm = self.realm
- self._credentials.set_realm(realm)
- self._credentials.set_username(username)
- self._credentials.set_password(password)
- def getOUs(self, attrs=[],
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- expression = '(objectClass=organizationalUnit)'
- result = self._search(expression=expression, attrs=attrs)
- return self._parseLDBMessageList(result)
- def getOU(self, organizationalUnitRDN, attrs=[],
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- expression = ('(&(objectClass=organizationalUnit)(dn=%s,%s))' %
- (organizationalUnitRDN, domainDN))
- result = self._search(expression=expression, attrs=attrs)
- return self._parseLDBMessageList(result)
- def existsOU(self, organizationalUnitRDN,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- result = self.getOU(organizationalUnitRDN, attrs=None,
- url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- return (result is not None) and (len(result) > 0)
- def createOU(self, organizationalUnitRDN,
- description=None, name=None, sd=None,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- if not self.existsOU(organizationalUnitRDN):
- message = dict(
- dn = '%s,%s' % (organizationalUnitRDN, domainDN),
- objectClass = 'organizationalUnit',
- ou = organizationalUnitRDN.split('=')[1].strip(),
- )
- if description is not None:
- message['description'] = description
- if name is not None:
- message['name'] = name
- if sd is not None:
- message['nTSecurityDescriptor'] = ndr_pack(sd)
- self._connection.add(message)
- def deleteOU(self, organizationalUnitRDN,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- expression = ('(&(objectClass=organizationalUnit)(dn=%s,%s))' %
- (organizationalUnitRDN, domainDN))
- organizationalUnit = self._search(expression=expression, attrs=[])
- try:
- assert(len(organizationalUnit) == 1)
- except AssertionError:
- raise Exception('Unable to find Organizational Unit %s.'
- % organizationalUnitRDN)
- self._connection.delete(organizationalUnit[0].dn)
- def setMinPwdAge(self, minPwdAge,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if not isinstance(minPwdAge, basestring):
- minPwdAge = '%d' % minPwdAge
- message = ldb.Message()
- message.dn = ldb.Dn(self._connection, self._getDomainDN())
- message['minPwdAge'] = ldb.MessageElement(minPwdAge,
- ldb.FLAG_MOD_REPLACE,
- 'minPwdAge')
- self._connection.modify(message)
- def getMinPwdAge(self,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- minPwdAge = self._search(attrs=['minPwdAge'],
- scope=ldb.SCOPE_BASE,
- url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if len(minPwdAge) == 0:
- return None
- if not 'minPwdAge' in minPwdAge[0]:
- return None
- return minPwdAge[0]['minPwdAge'][0]
- def setMaxPwdAge(self, maxPwdAge,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if not isinstance(maxPwdAge, basestring):
- maxPwdAge = '%d' % maxPwdAge
- message = ldb.Message()
- message.dn = ldb.Dn(self._connection, self._getDomainDN())
- message['maxPwdAge'] = ldb.MessageElement(maxPwdAge,
- ldb.FLAG_MOD_REPLACE,
- 'maxPwdAge')
- self._connection.modify(message)
- def getMaxPwdAge(self,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- maxPwdAge = self._search(attrs=['maxPwdAge'],
- scope=ldb.SCOPE_BASE,
- url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if len(maxPwdAge) == 0:
- return None
- if not 'maxPwdAge' in maxPwdAge[0]:
- return None
- return maxPwdAge[0]['maxPwdAge'][0]
- def setMinPwdLength(self, minPwdLength,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if not isinstance(minPwdLength, basestring):
- minPwdLength = '%d' % minPwdLength
- message = ldb.Message()
- message.dn = ldb.Dn(self._connection, self._getDomainDN())
- message['minPwdLength'] = ldb.MessageElement(minPwdLength,
- ldb.FLAG_MOD_REPLACE,
- 'minPwdLength')
- self._connection.modify(message)
- def getMinPwdLength(self,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- minPwdLength = self._search(attrs=['minPwdLength'],
- scope=ldb.SCOPE_BASE,
- url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if len(minPwdLength) == 0:
- return None
- if not 'minPwdLength' in minPwdLength[0]:
- return None
- return minPwdLength[0]['minPwdLength'][0]
- def setPwdHistoryLength(self, pwdHistoryLength,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if not isinstance(pwdHistoryLength, basestring):
- pwdHistoryLength = '%d' % pwdHistoryLength
- message = ldb.Message()
- message.dn = ldb.Dn(self._connection, self._getDomainDN())
- message['pwdHistoryLength'] = ldb.MessageElement(pwdHistoryLength,
- ldb.FLAG_MOD_REPLACE,
- 'pwdHistoryLength')
- self._connection.modify(message)
- def getPwdHistoryLength(self,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- pwdHistoryLength = self._search(attrs=['pwdHistoryLength'],
- scope=ldb.SCOPE_BASE,
- url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if len(pwdHistoryLength) == 0:
- return None
- if not 'pwdHistoryLength' in pwdHistoryLength[0]:
- return None
- return pwdHistoryLength[0]['pwdHistoryLength'][0]
- def setPwdProperties(self, pwdProperties,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if not isinstance(pwdProperties, basestring):
- pwdProperties = '%d' % pwdProperties
- message = ldb.Message()
- message.dn = ldb.Dn(self._connection, self._getDomainDN())
- message['pwdProperties'] = ldb.MessageElement(pwdProperties,
- ldb.FLAG_MOD_REPLACE,
- 'pwdProperties')
- self._connection.modify(message)
- def getPwdProperties(self,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- pwdProperties = self._search(attrs=['pwdProperties'],
- scope=ldb.SCOPE_BASE,
- url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if len(pwdProperties) == 0:
- return None
- if not 'pwdProperties' in pwdProperties[0]:
- return None
- return pwdProperties[0]['pwdProperties'][0]
- def connect(self, url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if lpOptions is not None:
- self._setLpOptions(lpOptions)
- if realm is None:
- realm = self.realm
- if username is None and self._credentials.get_username() is None:
- raise Exception(
- 'Cannot modify Active Directory without admin credentials.')
- if username is not None and self._credentials.get_username() is None:
- self._setCredentials(username, password, realm=realm)
- if self._connection is None:
- return self._connect(url)
- class ADUser(ActiveDirectory):
- def __init__(self, realm=None, url=None):
- super(ADUser, self).__init__(realm=realm, url=url)
- self._orgAttrs = [
- 'c',
- 'cn',
- 'co',
- 'company',
- 'department',
- 'description',
- 'displayName',
- 'facsimileTelephoneNumber',
- 'givenName',
- 'homePhone',
- 'initials',
- 'l',
- 'mail',
- 'manager',
- 'mobile',
- 'name',
- 'personalTitle',
- 'physicalDeliveryOfficeName',
- 'postalCode',
- 'secretary',
- 'seeAlso',
- 'serialNumber',
- 'sn',
- 'streetAddress',
- 'thumbnailPhoto',
- 'telephoneNumber',
- 'title',
- ]
- self._sambaAttrs = [
- 'accountExpires',
- 'badPasswordTime',
- 'badPwdCount',
- 'codePage',
- 'countryCode',
- 'distinguishedName',
- 'homeDirectory',
- 'homeDrive',
- 'instanceType',
- 'lastLogoff',
- 'lastLogon',
- 'logonCount',
- 'memberOf',
- 'objectCategory',
- 'objectClass',
- 'objectGUID',
- 'objectSID',
- 'primaryGroupID',
- 'profilePath',
- 'pwdLastSet',
- 'sAMAccountName',
- 'sAMAccountType',
- 'userAccountControl',
- 'userPrincipalName',
- 'uSNChanged',
- 'uSNCreated',
- 'whenChanged',
- 'whenCreated',
- ]
- self._unixAttrs = [
- 'gidNumber',
- 'loginShell',
- 'uidNumber',
- 'unixHomeDirectory',
- ]
- self._attrs=self._orgAttrs + self._sambaAttrs + self._unixAttrs
- def _search(self, expression=None, attrs=None, scope=ldb.SCOPE_SUBTREE,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if (not attrs) and (not self.url.startswith('ldap')):
- attrs = self._attrs
- return super(ADUser, self)._search(expression=expression,
- attrs=attrs,
- scope=scope,
- url=url,
- username=username,
- realm=realm,
- password=password,
- lpOptions=lpOptions)
- def _toggleADUserAccountFlags(self, sAMAccountName, flags, setFlags=True,
- url=None, username=None, realm=None,
- password=None, lpOptions=None):
- """toggle_userAccountFlags
- """
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- expression = ('(&(|(sAMAccountName=%s)(CN=%s))(objectCategory=%s,%s))'
- % (ldb.binary_encode(sAMAccountName),
- ldb.binary_encode(sAMAccountName),
- 'CN=Person,CN=Schema,CN=Configuration',
- domainDN))
- adUser = self._search(expression=expression,
- attrs=['userAccountControl'])
- try:
- assert(len(adUser) == 1)
- except AssertionError:
- raise Exception('Unable to find user "%s"' % sAMAccountName)
- adUserDN = adUser[0].dn
- oldUAC = int(adUser[0]['userAccountControl'][0])
- newUAC = oldUAC & ~flags
- if setFlags:
- newUAC = oldUAC | flags
- if oldUAC == newUAC:
- return
- userAccountFlags = """
- dn: %s
- changetype: modify
- delete: userAccountControl
- userAccountControl: %u
- add: userAccountControl
- userAccountControl: %u
- """ % (adUserDN, oldUAC, newUAC)
- self._connection.modify_ldif(userAccountFlags)
- def getADUsers(self, attrs=[], utf8=False,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- expression = ('(&(objectClass=user)(userAccountControl:%s:=%u))'
- % (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
- result = self._search(expression=expression, attrs=attrs,
- url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- return self._parseLDBMessageList(result, utf8=utf8)
- def getADUser(self, sAMAccountName, attrs=[], utf8=False,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- expression = ('(&(|(sAMAccountName=%s)(CN=%s))(objectCategory=%s,%s))'
- % (ldb.binary_encode(sAMAccountName),
- ldb.binary_encode(sAMAccountName),
- 'CN=Person,CN=Schema,CN=Configuration',
- domainDN))
- result = self._search(expression=expression, attrs=attrs,
- url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- return self._parseLDBMessageList(result, utf8=utf8)
- def existsADUser(self, sAMAccountName,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- result = self.getADUser(sAMAccountName, realm=realm, url=url,
- username=username, password=password,
- lpOptions=lpOptions)
- return (result is not None) and (len(result) > 0)
- def createADUser(self,
- sAMAccountName, userPassword,
- surName=None, givenName=None, initials=None,
- personalTitle=None, description=None,
- title=None, department=None, company=None,
- serialNumber=None, manager=None, managerOU=None,
- secretary=None, secretaryOU=None,
- seeAlso=None, seeAlsoOU=None,
- physicalDeliveryOfficeName=None,
- streetAddress=None, postalCode=None, location=None,
- country=None,
- telephoneNumber=None, facsimileTelephoneNumber=None,
- homePhone=None, mobile=None,
- mail=None, wWWHomePage=None,
- jpegPhoto=None,
- profilePath=None, scriptPath=None, homeDrive=None,
- homeDirectory=None,
- uidNumber=None, gidNumber=None, loginShell=None,
- unixHomeDirectory=None,
- changePasswordAtNextLogin=True,
- useSAMAccountNameAsCN=True,
- userOU=None, sd=None, setPassword=True,
- expirySeconds=None,
- neverExpireAccount=True, neverExpirePassword=False,
- extraAttrs={},
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self.existsADUser(sAMAccountName, realm=realm, url=url,
- username=username, password=password,
- lpOptions=lpOptions):
- raise Exception(
- 'Cannot create user %s. A user with that name already exists.'
- % sAMAccountName)
- domainDN = self._getDomainDN()
- # Initial and Common Attributes
- ldbMessage = dict(
- objectClass = ['user',],
- sAMAccountName = sAMAccountName,
- )
- displayName = ''
- if givenName is not None:
- displayName += givenName
- ldbMessage['givenName'] = givenName
- if initials is not None:
- if not initials.endswith('.'):
- initials = '%s.' % initials
- displayName += initials
- ldbMessage['initials'] = initials
- if surName is not None:
- displayName += ' %s' % surName
- ldbMessage['sn'] = surName
- if displayName is not '':
- ldbMessage['displayName'] = displayName
- ldbMessage['name'] = displayName
- cn = sAMAccountName
- if not useSAMAccountNameAsCN and (displayName is not ''):
- cn = displayName
- for ou in [userOU, managerOU, secretaryOU, seeAlsoOU]:
- if ou is not None:
- if not self.existsOU(ou):
- raise Exception('Unable to find Organizational Unit %s for %s.'
- % (ou, ou.__name__))
- userDN = 'CN=%s,%s,%s' % (cn, (userOU or 'CN=Users'), domainDN)
- ldbMessage['dn'] = userDN
- dnsDomain = ldb.Dn(
- self._connection, domainDN).canonical_str().replace('/', '')
- userPrincipalName = '%s@%s' % (sAMAccountName, dnsDomain)
- ldbMessage['userPrincipalName'] = userPrincipalName
- # Standard Samba Account Information
- if title is not None:
- ldbMessage['title'] = title
- if department is not None:
- ldbMessage['department'] = department
- if company is not None:
- ldbMessage['company'] = company
- if manager is not None:
- ldbMessage['manager'] = ('CN=%s,%s,%s'
- % (manager, (managerOU or 'CN=Users'), domainDN))
- if description is None:
- description = displayName
- ldbMessage['description'] = description
- if mail is not None:
- ldbMessage['mail'] = mail
- if wWWHomePage is not None:
- ldbMessage['wWWHomePage'] = wWWHomePage
- if streetAddress is not None:
- ldbMessage['streetAddress'] = streetAddress
- if postalCode is not None:
- ldbMessage['postalCode'] = '%s' % postalCode
- if location is not None:
- ldbMessage['l'] = location
- if country is not None:
- if len(country) == 2:
- if country.upper() in self.COUNTRYCODES.keys():
- ldbMessage['c'] = country.upper()
- ldbMessage['co'] = self.COUNTRYCODES[country.upper()][0]
- ldbMessage['countryCode'] = self.COUNTRYCODES[country.upper()][1]
- else:
- for (countryCode, (countryName, countryNumeric)) in self.COUNTRYCODES:
- if country.lower() == countryName.lower():
- ldbMessage['c'] = countryCode
- ldbMessage['co'] = countryName
- ldbMessage['countryCode'] = countryNumeric
- continue
- if physicalDeliveryOfficeName is not None:
- ldbMessage['physicalDeliveryOfficeName'] = physicalDeliveryOfficeName
- if telephoneNumber is not None:
- ldbMessage['telephoneNumber'] = telephoneNumber
- if facsimileTelephoneNumber is not None:
- ldbMessage['facsimileTelephoneNumber'] = facsimileTelephoneNumber
- if homePhone is not None:
- ldbMessage['homePhone'] = homePhone
- if mobile is not None:
- ldbMessage['mobile'] = mobile
- if profilePath is not None:
- ldbMessage['profilePath'] = profilePath
- if scriptPath is not None:
- ldbMessage['scriptPath'] = scriptPath
- if homeDrive is not None:
- if not homeDrive.endswith(':'):
- homeDrive = '%s:' % homeDrive
- ldbMessage['homeDrive'] = homeDrive.upper()
- if homeDirectory is not None:
- ldbMessage['homeDirectory'] = homeDirectory
- if sd is not None:
- ldbMessage['nTSecurityDescriptor'] = ndr_pack(sd)
- # Additional Organizational Information
- if personalTitle is not None:
- ldbMessage['personalTitle'] = personalTitle
- if serialNumber is not None:
- ldbMessage['serialNumber'] = '%s' % serialNumber
- if secretary is not None:
- ldbMessage['secretary'] = ('CN=%s,%s,%s'
- % (secretary, (secretaryOU or 'CN=Users'), domainDN))
- if seeAlso is not None:
- ldbMessage['seeAlso'] = ('CN=%s,%s,%s'
- % (seeAlso, (seeAlsoOU or 'CN=Users'), domainDN))
- if jpegPhoto is not None:
- # jpegPhoto must be either the contents of a file with correct
- # JPEG header (ff 08 hex), ...
- if jpegPhoto.startswith('\xff\xd8'):
- jpegPhoto = b64encode(jpegPhoto)
- try:
- # ... or a base64 encoded string representation of such file
- # contents
- jpegPhotoContents = b64decode(jpegPhoto)
- assert(jpegPhotoContents[:2] == '\xff\xd8')
- except:
- pass
- else:
- ldbMessage['jpegPhoto'] = jpegPhoto
- # Posix Attributes
- if uidNumber is not None:
- ldbMessage['objectClass'].extend(['posixAccount', 'shadowAccount'])
- ldbMessage['uidNumber'] = '%s' % uidNumber
- if gidNumber is not None:
- ldbMessage['gidNumber'] = '%s' % gidNumber
- if loginShell is not None:
- ldbMessage['loginShell'] = loginShell
- if unixHomeDirectory is not None:
- ldbMessage['unixHomeDirectory'] = unixHomeDirectory
- # Additional Attributes (must be properly formatted)
- for (extraKey, extraValue) in extraAttrs.items():
- ldbMessage[extraKey] = extraValue
- # Create new User
- self._connection.transaction_start()
- try:
- self._connection.add(ldbMessage)
- # Set the password
- if setPassword:
- self.setADUserPassword(
- sAMAccountName,
- userPassword,
- changePasswordAtNextLogin=changePasswordAtNextLogin)
- self.setADUserExpiry(sAMAccountName,
- expirySeconds=expirySeconds,
- neverExpireAccount=neverExpireAccount,
- neverExpirePassword=neverExpirePassword)
- except:
- self._connection.transaction_cancel()
- raise
- else:
- self._connection.transaction_commit()
- def modifyADUser(self,
- sAMAccountName,
- surName=None, givenName=None, initials=None,
- personalTitle=None, description=None, displayName=None,
- title=None, department=None, company=None,
- serialNumber=None, manager=None, managerOU=None,
- secretary=None, secretaryOU=None,
- seeAlso=None, seeAlsoOU=None,
- physicalDeliveryOfficeName=None,
- streetAddress=None, postalCode=None, location=None,
- country=None,
- telephoneNumber=None, facsimileTelephoneNumber=None,
- homePhone=None, mobile=None,
- mail=None, wWWHomePage=None,
- jpegPhoto=None,
- profilePath=None, scriptPath=None, homeDrive=None,
- homeDirectory=None,
- uidNumber=None, gidNumber=None, loginShell=None,
- unixHomeDirectory=None,
- expirySeconds=None,
- neverExpireAccount=None, neverExpirePassword=None,
- extraAttrs={},
- useSAMAccountNameAsCN=True,
- userOU=None, sd=None,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- attrs = []
- if not self.url.startswith('ldap'):
- attrs = self._attrs
- attrs.extend(extraAttrs.keys())
- cn = sAMAccountName
- if not useSAMAccountNameAsCN and (displayName is not None):
- cn = displayName
- for ouStr in ['userOU', 'managerOU', 'secretaryOU', 'seeAlsoOU']:
- ou = eval(ouStr)
- if ou is not None:
- if not self.existsOU(ou):
- raise Exception('Unable to find Organizational Unit %s for %s.'
- % (ou, ouStr))
- userDN = 'CN=%s,%s,%s' % (cn, (userOU or 'CN=Users'), domainDN)
- ldbMessage = {}
- # Standard Samba Account Information
- if surName is not None:
- ldbMessage['sn'] = surName
- if givenName is not None:
- ldbMessage['givenName'] = givenName
- if displayName is not None:
- ldbMessage['displayName'] = displayName
- ldbMessage['name'] = displayName
- if initials is not None:
- if not initials.endswith('.'):
- initials = '%s.' % initials
- ldbMessage['initials'] = initials
- if title is not None:
- ldbMessage['title'] = title
- if department is not None:
- ldbMessage['department'] = department
- if company is not None:
- ldbMessage['company'] = company
- if manager is not None:
- ldbMessage['manager'] = ('CN=%s,%s,%s'
- % (manager, (managerOU or 'CN=Users'), domainDN))
- if description is not None:
- ldbMessage['description'] = description
- if mail is not None:
- ldbMessage['mail'] = mail
- if wWWHomePage is not None:
- ldbMessage['wWWHomePage'] = wWWHomePage
- if streetAddress is not None:
- ldbMessage['streetAddress'] = streetAddress
- if postalCode is not None:
- ldbMessage['postalCode'] = '%s' % postalCode
- if location is not None:
- ldbMessage['l'] = location
- if country is not None:
- if len(country) == 2:
- if country.upper() in self.COUNTRYCODES.keys():
- ldbMessage['c'] = country.upper()
- ldbMessage['co'] = self.COUNTRYCODES[country.upper()][0]
- ldbMessage['countryCode'] = self.COUNTRYCODES[country.upper()][1]
- else:
- for (countryCode, (countryName, countryNumeric)) in self.COUNTRYCODES:
- if country.lower() == countryName.lower():
- ldbMessage['c'] = countryCode
- ldbMessage['co'] = countryName
- ldbMessage['countryCode'] = countryNumeric
- continue
- if physicalDeliveryOfficeName is not None:
- ldbMessage['physicalDeliveryOfficeName'] = physicalDeliveryOfficeName
- if telephoneNumber is not None:
- ldbMessage['telephoneNumber'] = telephoneNumber
- if facsimileTelephoneNumber is not None:
- ldbMessage['facsimileTelephoneNumber'] = facsimileTelephoneNumber
- if homePhone is not None:
- ldbMessage['homePhone'] = homePhone
- if mobile is not None:
- ldbMessage['mobile'] = mobile
- if profilePath is not None:
- ldbMessage['profilePath'] = profilePath
- if scriptPath is not None:
- ldbMessage['scriptPath'] = scriptPath
- if homeDrive is not None:
- if not homeDrive.endswith(':'):
- homeDrive = '%s:' % homeDrive
- ldbMessage['homeDrive'] = homeDrive.upper()
- if homeDirectory is not None:
- ldbMessage['homeDirectory'] = homeDirectory
- if sd is not None:
- ldbMessage['nTSecurityDescriptor'] = ndr_pack(sd)
- # Additional Organizational Information
- if personalTitle is not None:
- ldbMessage['personalTitle'] = personalTitle
- if serialNumber is not None:
- ldbMessage['serialNumber'] = '%s' % serialNumber
- if secretary is not None:
- ldbMessage['secretary'] = ('CN=%s,%s,%s'
- % (secretary, (secretaryOU or 'CN=Users'), domainDN))
- if seeAlso is not None:
- ldbMessage['seeAlso'] = ('CN=%s,%s,%s'
- % (seeAlso, (seeAlsoOU or 'CN=Users'), domainDN))
- if jpegPhoto is not None:
- # jpegPhoto must be either the contents of a file with correct
- # JPEG header (ff 08 hex), ...
- if jpegPhoto.startswith('\xff\xd8'):
- jpegPhoto = b64encode(jpegPhoto)
- try:
- # ... or a base64 encoded string representation of such file
- # contents
- jpegPhotoContents = b64decode(jpegPhoto)
- assert(jpegPhotoContents[:2] == '\xff\xd8')
- except:
- pass
- else:
- ldbMessage['jpegPhoto'] = jpegPhoto
- # Posix Attributes
- if uidNumber is not None:
- ldbMessage['objectClass'] = ['user',
- 'posixAccount',
- 'shadowAccount',]
- ldbMessage['uidNumber'] = '%s' % uidNumber
- if gidNumber is not None:
- ldbMessage['gidNumber'] = '%s' % gidNumber
- if loginShell is not None:
- ldbMessage['loginShell'] = loginShell
- if unixHomeDirectory is not None:
- ldbMessage['unixHomeDirectory'] = unixHomeDirectory
- # Additional Attributes
- for (extraKey, extraValue) in extraAttrs.items():
- ldbMessage[extraKey] = extraValue
- # Modify User
- self._connection.transaction_start()
- try:
- if ldbMessage is not {}:
- self._modifyFromDict(userDN,
- ldbMessage,
- attrs=attrs)
- self.setADUserExpiry(sAMAccountName,
- expirySeconds=expirySeconds,
- neverExpireAccount=neverExpireAccount,
- neverExpirePassword=neverExpirePassword)
- except:
- self._connection.transaction_cancel()
- raise
- self._connection.transaction_commit()
- def renameADUser(self, sAMAccountName, newSAMAccountName, newUserOU=None,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- attrs = []
- if not self.url.startswith('ldap'):
- attrs = self._attrs
- domainDN = self._getDomainDN()
- dnsDomain = ldb.Dn(
- self._connection, domainDN).canonical_str().replace('/', '')
- expression = ('(&(|(sAMAccountName=%s)(CN=%s))(objectCategory=%s,%s))'
- % (ldb.binary_encode(sAMAccountName),
- ldb.binary_encode(sAMAccountName),
- 'CN=Person,CN=Schema,CN=Configuration',
- domainDN))
- adUser = self._search(expression=expression, attrs=attrs)
- try:
- assert(len(adUser) == 1)
- except AssertionError:
- raise Exception('Unable to find user %s.' % sAMAccountName)
- oldUserDN = adUser[0].dn
- if newUserOU is not None:
- if not self.existsOU(newUserOU):
- raise Exception(
- 'Cannot add user %s to non existing Organizational Unit %s.'
- % (sAMAccountName, newUserOU))
- newUserDN = 'CN=%s,%s,%s' % (newSAMAccountName,
- (newUserOU or 'CN=Users'),
- domainDN)
- newUserPrincipalName = '%s@%s' % (newSAMAccountName, dnsDomain)
- extraAttrs=dict(sAMAccountName = newSAMAccountName,
- userPrincipalName = newUserPrincipalName)
- self._connection.transaction_start()
- try:
- self._connection.rename(oldUserDN, newUserDN)
- self.modifyADUser(newSAMAccountName,
- extraAttrs=extraAttrs)
- except:
- self._connection.transaction_cancel()
- raise
- self._connection.transaction_commit()
- def deleteADUser(self, sAMAccountName,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- """Deletes a user
- :param sAMAccountName: Name of the target user
- """
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- expression = ('(&(|(sAMAccountName=%s)(CN=%s))(objectCategory=%s,%s))'
- % (ldb.binary_encode(sAMAccountName),
- ldb.binary_encode(sAMAccountName),
- 'CN=Person,CN=Schema,CN=Configuration',
- domainDN))
- adUser = self._search(expression=expression, attrs=None)
- try:
- assert(len(adUser) == 1)
- except AssertionError:
- raise Exception('Unable to find user %s.' % sAMAccountName)
- self._connection.delete(adUser[0].dn)
- def setADUserPassword(self, sAMAccountName, userPassword,
- changePasswordAtNextLogin=False,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- """Sets the password for a user
- """
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- expression = ('(&(|(sAMAccountName=%s)(CN=%s))(objectCategory=%s,%s))'
- % (ldb.binary_encode(sAMAccountName),
- ldb.binary_encode(sAMAccountName),
- 'CN=Person,CN=Schema,CN=Configuration',
- domainDN))
- adUser = self._search(expression=expression, attrs=None)
- try:
- assert(len(adUser) == 1)
- except AssertionError:
- raise Exception('Unable to find user "%s"' % sAMAccountName)
- adUserDN = adUser[0].dn
- unicodePwd = """
- dn: %s
- changetype: modify
- replace: unicodePwd
- unicodePwd:: %s
- """ % (adUserDN, b64encode(('"' + userPassword + '"').encode('utf-16-le')))
- self._connection.transaction_start()
- try:
- self._connection.modify_ldif(unicodePwd)
- if changePasswordAtNextLogin:
- self.mustChangePasswordAtNextLogin(sAMAccountName)
- self.enableADUser(sAMAccountName)
- except:
- self._connection.transaction_cancel()
- raise
- self._connection.transaction_commit()
- def setADUserExpiry(self, sAMAccountName, expirySeconds=None,
- neverExpireAccount=False, neverExpirePassword=False,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- """Sets the account expiry for a user
- """
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- expression = ('(&(|(sAMAccountName=%s)(CN=%s))(objectCategory=%s,%s))'
- % (ldb.binary_encode(sAMAccountName),
- ldb.binary_encode(sAMAccountName),
- 'CN=Person,CN=Schema,CN=Configuration',
- domainDN))
- adUser = self._search(expression=expression,
- attrs=['accountExpires'])
- try:
- assert(len(adUser) == 1)
- except AssertionError:
- raise Exception('Unable to find user "%s"' % sAMAccountName)
- adUserDN = adUser[0].dn
- oldAccountExpires = int(adUser[0]['accountExpires'][0])
- accountExpires = None
- if expirySeconds is not None:
- accountExpires = unix2nttime(expirySeconds + int(time()))
- if neverExpireAccount:
- accountExpires = 0
- if (accountExpires is None) and not neverExpireAccount:
- accountExpires = oldAccountExpires
- setADUserExpiry = """
- dn: %s
- changetype: modify
- replace: accountExpires
- accountExpires: %u
- """ % (adUserDN, accountExpires)
- self._connection.transaction_start()
- try:
- self._toggleADUserAccountFlags(sAMAccountName,
- flags=UF_DONT_EXPIRE_PASSWD,
- setFlags=neverExpirePassword)
- self._connection.modify_ldif(setADUserExpiry)
- except:
- self._connection.transaction_cancel()
- raise
- self._connection.transaction_commit()
- def getADUserExpiry(self, sAMAccountName,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- expression = ('(&(|(sAMAccountName=%s)(CN=%s))(objectCategory=%s,%s))'
- % (ldb.binary_encode(sAMAccountName),
- ldb.binary_encode(sAMAccountName),
- 'CN=Person,CN=Schema,CN=Configuration',
- domainDN))
- adUser = self._search(expression=expression,
- attrs=['accountExpires',
- 'pwdLastSet',
- 'userAccountControl'])
- try:
- assert(len(adUser) == 1)
- except AssertionError:
- raise Exception('Unable to find user "%s"' % sAMAccountName)
- (adUserDN, adUserDict) = self._parseLDBMessage(adUser[0])
- adUserExpiry = dict(
- accountExpires = adUserDict['accountexpires'][0],
- mustChangePasswordAtNextLogin = (int(adUserDict['pwdlastset'][0]) == 0),
- passwordNeverExpires = (
- int(adUserDict['useraccountcontrol'][0])
- & UF_DONT_EXPIRE_PASSWD == UF_DONT_EXPIRE_PASSWD),
- accountNeverExpires = (int(adUserDict['accountexpires'][0]) == 0))
- return (adUserDN, adUserExpiry)
- def disableADUser(self, sAMAccountName,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- """Disables an account
- """
- flags = UF_ACCOUNTDISABLE
- self._toggleADUserAccountFlags(sAMAccountName, flags, setFlags=True,
- url=url, username=username, realm=realm,
- lpOptions=lpOptions)
- def enableADUser(self, sAMAccountName,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- """Enables an account
- """
- flags = UF_ACCOUNTDISABLE | UF_PASSWD_NOTREQD
- self._toggleADUserAccountFlags(sAMAccountName, flags, setFlags=False,
- url=url, username=username, realm=realm,
- lpOptions=lpOptions)
- def mustChangePasswordAtNextLogin(self, sAMAccountName,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- """Forces a password change at next login
- """
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- expression = ('(&(|(sAMAccountName=%s)(CN=%s))(objectCategory=%s,%s))'
- % (ldb.binary_encode(sAMAccountName),
- ldb.binary_encode(sAMAccountName),
- 'CN=Person,CN=Schema,CN=Configuration',
- domainDN))
- adUser = self._search(expression=expression, attrs=None)
- try:
- assert(len(adUser) == 1)
- except AssertionError:
- raise Exception('Unable to find user "%s"' % sAMAccountName)
- adUserDN = adUser[0].dn
- resetPwdLastSet = """
- dn: %s
- changetype: modify
- replace: pwdLastSet
- pwdLastSet: 0
- """ % (adUserDN)
- self._connection.transaction_start()
- try:
- self._connection.modify_ldif(resetPwdLastSet)
- self._toggleADUserAccountFlags(sAMAccountName,
- UF_PASSWORD_EXPIRED)
- except:
- self._connection.transaction_cancel()
- raise
- self._connection.transaction_commit()
- class ADGroup(ActiveDirectory):
- def __init__(self, realm=None, url=None):
- super(ADGroup, self).__init__(realm=realm, url=url)
- self._orgAttrs = [
- 'cn',
- 'description',
- 'displayName',
- 'name',
- ]
- self._sambaAttrs = [
- 'distinguishedName',
- 'groupType',
- 'instanceType',
- 'member',
- 'objectCategory',
- 'objectClass',
- 'objectGUID',
- 'objectSID',
- 'sAMAccountName',
- 'sAMAccountType',
- 'uSNChanged',
- 'uSNCreated',
- 'whenChanged',
- 'whenCreated',
- ]
- self._unixAttrs = [
- 'gidNumber',
- ]
- self._attrs=self._orgAttrs + self._sambaAttrs + self._unixAttrs
- def _search(self, expression=None, attrs=None, scope=ldb.SCOPE_SUBTREE,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if (not attrs) and (not self.url.startswith('ldap')):
- attrs = self._attrs
- return super(ADGroup, self)._search(expression=expression,
- attrs=attrs,
- scope=scope,
- url=url,
- username=username,
- realm=realm,
- password=password,
- lpOptions=lpOptions)
- def _addRemoveADGroupMembers(self, cn, adUsersList,
- addOperation=True,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if isinstance(adUsersList, basestring):
- adUsersList = adUsersList.split(',')
- domainDN = self._getDomainDN()
- expression = '(&(|(sAMAccountName=%s)(CN=%s))(objectCategory=%s,%s))' % (
- ldb.binary_encode(cn),
- ldb.binary_encode(cn),
- 'CN=Group,CN=Schema,CN=Configuration',
- domainDN)
- adGroup = self._search(expression=expression, attrs=['member'])
- try:
- assert(len(adGroup) == 1)
- except AssertionError:
- raise Exception('Unable to find group %s.' % cn)
- modified = False
- addADUserToGroup = """
- dn: %s
- changetype: modify
- """ % (str(adGroup[0].dn))
- for adUser in adUsersList:
- expression = '(|(sAMAccountName=%s)(CN=%s))' % (
- ldb.binary_encode(adUser.strip()),
- ldb.binary_encode(adUser.strip()))
- adGroupMember = self._search(expression=expression, attrs=None)
- if len(adGroupMember) != 1:
- continue
- if (addOperation and
- (adGroup[0].get('member') is None or
- str(adGroupMember[0].dn) not in adGroup[0]['member'])):
- modified = True
- addADUserToGroup += """add: member
- member: %s
- """ % (str(adGroupMember[0].dn))
- elif (not addOperation and
- (adGroup[0].get('member') is not None and
- str(adGroupMember[0].dn) in adGroup[0]['member'])):
- modified = True
- addADUserToGroup += """delete: member
- member: %s
- """ % (str(adGroupMember[0].dn))
- if modified:
- self._connection.modify_ldif(addADUserToGroup)
- def getADGroups(self, attrs=[], utf8=False,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- expression = ('(objectCategory=%s,%s)'
- % ('CN=Group,CN=Schema,CN=Configuration', domainDN))
- result = self._search(expression=expression, attrs=attrs,
- url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- return self._parseLDBMessageList(result, utf8=utf8)
- def getADGroup(self, cn, attrs=[], utf8=False,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- expression = ('(&(|(sAMAccountName=%s)(CN=%s))(objectCategory=%s,%s))'
- % (ldb.binary_encode(cn),
- ldb.binary_encode(cn),
- 'CN=Group,CN=Schema,CN=Configuration',
- domainDN))
- result = self._search(expression=expression, attrs=attrs,
- url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- return self._parseLDBMessageList(result, utf8=utf8)
- def existsADGroup(self, cn, realm=None, url=None,
- username=None, password=None, lpOptions=None):
- result = self.getADGroup(cn, realm=realm, url=url,
- username=username, password=password,
- lpOptions=lpOptions)
- return (result is not None) and (len(result) > 0)
- def createADGroup(self,
- cn, displayName, description=None,
- groupOU=None, groupType='SECURITY', groupScope='GLOBAL',
- mail=None, info=None,
- gidNumber=None,
- useDisplayNameAsCN=False,
- sd=None, extraAttrs={},
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- """Adds a new group with additional parameters
- """
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- if useDisplayNameAsCN:
- cn = displayName
- domainDN = self._getDomainDN()
- if groupOU is not None:
- if not self.existsOU(groupOU):
- raise Exception('Unable to find Organizational Unit %s for group %s.'
- % (ou, cn))
- groupDN = 'CN=%s,%s,%s' % (cn, (groupOU or 'CN=Users'), domainDN)
- # Standard Samba Account Information
- ldbMessage = {'dn': groupDN,
- 'sAMAccountName': cn,
- 'displayName': displayName,
- 'objectClass': ['group']}
- groupType = (groupType or 'SECURITY')
- groupScope = (groupScope or 'GLOBAL')
- ldbMessage['groupType'] = (
- self.GROUPTYPES[groupType.upper()][groupScope.upper()])
- if description is None:
- description = displayName
- ldbMessage['description'] = description
- if mail is not None:
- ldbMessage['mail'] = mail
- if info is not None:
- ldbMessage['info'] = info
- if sd is not None:
- ldbMessage['nTSecurityDescriptor'] = ndr_pack(sd)
- # Posix Attributes
- if gidNumber is not None:
- ldbMessage['gidNumber'] = '%s' % gidNumber
- # Additional Attributes (must be properly formatted)
- for (extraKey, extraValue) in extraAttrs.items():
- ldbMessage[extraKey] = extraValue
- self._connection.add(ldbMessage)
- def modifyADGroup(self,
- cn, displayName=None, description=None,
- groupOU=None, groupType=None, groupScope=None,
- mail=None, info=None,
- gidNumber=None,
- sd=None, extraAttrs={},
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- attrs = []
- if not self.url.startswith('ldap'):
- attrs = self._attrs
- attrs.extend(extraAttrs.keys())
- domainDN = self._getDomainDN()
- if groupOU is not None:
- if not self.existsOU(groupOU):
- raise Exception('Unable to find Organizational Unit %s for group %s.'
- % (ou, cn))
- groupDN = 'CN=%s,%s,%s' % (cn, (groupOU or 'CN=Users'), domainDN)
- ldbMessage = {}
- # Standard Samba Account Information
- if (groupType is not None) and (groupScope is not None):
- gType = self.GROUPTYPES.get(groupType.upper())
- if gType is not None:
- gType = gType.get(groupScope.upper())
- if gType is not None:
- ldbMessage['groupType'] = gType
- if displayName is not None:
- ldbMessage['displayName'] = displayName
- if description is not None:
- ldbMessage['description'] = description
- if mail is not None:
- ldbMessage['mail'] = mail
- if info is not None:
- ldbMessage['info'] = info
- if sd is not None:
- ldbMessage['nTSecurityDescriptor'] = ndr_pack(sd)
- # Posix Attributes
- if gidNumber is not None:
- ldbMessage['gidNumber'] = '%s' % gidNumber
- # Additional Attributes
- for (extraKey, extraValue) in extraAttrs.items():
- ldbMessage[extraKey] = extraValue
- # Modify Group
- if ldbMessage is not {}:
- self._modifyFromDict(groupDN, ldbMessage, attrs=attrs)
- def renameADGroup(self, cn, newCN, newGroupOU=None,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- expression = ('(&(|(sAMAccountName=%s)(CN=%s))(objectCategory=%s,%s))'
- % (ldb.binary_encode(cn),
- ldb.binary_encode(cn),
- "CN=Group,CN=Schema,CN=Configuration",
- domainDN))
- adGroup = self._search(expression=expression, attrs=None)
- try:
- assert(len(adGroup) == 1)
- except AssertionError:
- raise Exception('Unable to find group %s.' % cn)
- oldGroupDN = adGroup[0].dn
- if newGroupOU is not None:
- if not self.existsOU(newGroupOU):
- raise Exception(
- 'Cannot add group %s to non existing Organizational Unit %s.'
- % (cn, newGroupOU))
- newGroupDN = 'CN=%s,%s,%s' % (newCN,
- (newGroupOU or 'CN=Users'),
- domainDN)
- self._connection.transaction_start()
- try:
- self._connection.rename(oldGroupDN, newGroupDN)
- self.modifyADGroup(newCN,
- extraAttrs={'sAMAccountName': newCN})
- except:
- self._connection.transaction_cancel()
- raise
- self._connection.transaction_commit()
- def deleteADGroup(self, cn,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- """Deletes a group
- """
- if self._connection is None:
- self.connect(url=url, username=username, realm=realm,
- password=password, lpOptions=lpOptions)
- domainDN = self._getDomainDN()
- expression = "(&(|(sAMAccountName=%s)(CN=%s))(objectCategory=%s,%s))" % (
- ldb.binary_encode(cn),
- ldb.binary_encode(cn),
- "CN=Group,CN=Schema,CN=Configuration",
- domainDN)
- adGroup = self._search(expression=expression, attrs=None)
- try:
- assert(len(adGroup) == 1)
- except AssertionError:
- raise Exception('Unable to find group %s.' % cn)
- self._connection.delete(adGroup[0].dn)
- def addADGroupMembers(self, cn, adUsersList,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- self._addRemoveADGroupMembers(cn, adUsersList,
- url=url, username=username, realm=realm, password=password,
- lpOptions=lpOptions)
- def removeADGroupMembers(self, cn, adUsersList,
- url=None, username=None, realm=None, password=None,
- lpOptions=None):
- self._addRemoveADGroupMembers(cn, adUsersList,
- addOperation=False,
- url=url, username=username, realm=realm, password=password,
- lpOptions=lpOptions)
- ###
- from pprint import pprint
- url = 'ldap://localhost:389/'
- username = 'administrator'
- realm = 'EXAMPLE.COM'
- password = 'test'
- lpOptions = dict(
- realm = realm,
- workgroup = 'WORKGROUP',
- )
- adUser = ADUser()
- adUser.connect(url=url, username=username, realm=realm, password=password,
- lpOptions=lpOptions)
- pprint (adUser.getADUser('administrator'))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement