from ldap3 import Server,Connection,ALL,MODIFY_REPLACE from datetime import datetime class LUSER(): ''' Class that represents secure connection to LDAP server LDAPhost := string IP or hostname of LDAP server admin_user := string DN of LDAP admin user admin_pass := string password of LDAP admin user base := string base in LDAP system where users are made basealt := string base in LDAP system where users are made with password hashes generated for openalt ''' def expandbase(self): ''' Extract orgnaization, name of dc object and full domain part with all dc values from base ''' # Split base string with commas to find values of organization and dc baselist = self.base.split(",") organization = '' dc = '' dcfull = '' # Find ou in base and set it as organization variable for i in baselist: if i.split('=')[0] == 'ou': organization = i.split('=')[1] # Find first dc and set it as dc variable for i in baselist: if i.split('=')[0] == 'dc': dc = i.split('=')[1] break # Find full dc and set it as dcfull variable for i in baselist: if i.split('=')[0] == 'dc': # if first dc, add it from dc variable if dcfull == '': dcfull = f'dc={dc}' else: dcfull += ',dc=' + i.split('=')[1] return organization, dc, dcfull def __init__(self, ldap_host, admin_user, admin_pass, base, basealt='', autoconnect=True): self.ldap_host = ldap_host self.admin_user = admin_user self.admin_pass = admin_pass self.base = base self.organization, self.dc, self.dcfull = self.expandbase() self.basealt = basealt self.alt = True self.autoconnect = autoconnect ldapserver = Server(ldap_host, use_ssl=True) if self.autoconnect: self.ldapconnection = Connection(ldapserver, admin_user, admin_pass, auto_bind=True) # uid and gid of most recently registered users self.lastuid = 1337 self.lastgid = 1337 # Set alt boolean to false if basealt not set if basealt == '': self.alt = False def prepare(self): ''' Create base on LDAP host ''' # Create dcObject on LDAP server and store boolean indicating it's success rcode1 = self.ldapconnection.add(f'dc={self.dcfull}', ['dcObject', 'organization'], {'o' : self.dc, 'dc' : self.dc}) # Create organizational units on LDAP server and store boolean indicating it's success rcode2 = self.ldapconnection.add(self.base, ['top', 'organizationalUnit'], {'ou' : self.organization}) # Return True only if all return values are true return rcode1 and rcode2 def lastpwchangenow(self): ''' Return time of last password change for the user set to current time messured in days from UNIX epoch ''' return str((datetime.utcnow() - datetime(1970,1,1)).days) def add(self, user, password, althash=""): ''' Add a user to base in LDAP with user and pass as credentials user := string containing username password := string containing user password althash := string containing user password/hash for the alternative base ''' # Increase UID and GID counters self.lastuid += 1 self.lastgid += 1 # Add user to base id = f"uid={user}" # Object classes of a user entry objectClass = ['top', 'person', 'organizationalPerson', 'inetOrgPerson', 'posixAccount', 'shadowAccount'] # Attributes for a user entry attributes = {'cn' : user, 'sn' : user, 'givenName' : user, 'uid' : user, 'uidNumber' : self.lastuid, 'gidNumber' : self.lastgid, 'homeDirectory' : f'/home/{user}', 'loginShell' : '/usr/bin/git-shell', 'gecos' : 'SystemUser', 'shadowLastChange' : self.lastpwchangenow(), 'shadowMax' : '45', 'userPassword' : password } attributesalt = {'cn' : user, 'sn' : user, 'givenName' : user, 'uid' : user, 'uidNumber' : self.lastuid, 'gidNumber' : self.lastgid, 'homeDirectory' : f'/home/{user}', 'loginShell' : '/usr//bin/git-shell', 'gecos' : 'SystemUser', 'shadowLastChange' : self.lastpwchangenow(), 'shadowMax' : '45', 'userPassword' : althash} # Return boolean value of new user entry rcode1 = self.ldapconnection.add(f'{id},{self.base}', objectClass, attributes) # If alternative base is set add the same user to the alternative base as well, if not act as if it was success if self.alt: rcode2 = self.ldapconnection.add(f'{id},{self.basealt}', objectClass, attributesalt) else: rcode2 = True # Return True only if both entries was successful return rcode1 and rcode2 def changepassword(self, user, newpass, althash=''): ''' Change password of user to newpass user := string containing username newpass := string containing new password althash := string containing password/hash for alternative base ''' # This variable holds boolean indicating successful change of user password chpassbool = self.ldapconnection.modify(f'uid={user},{self.base}', {'userPassword': (MODIFY_REPLACE,[newpass])}) # If alternative base is set modify user password/hash with althash, if not, pretend change was successful if self.alt: chpassboolalt = self.ldapconnection.modify(f'uid={user},{self.basealt}', {'userPassword': (MODIFY_REPLACE,[althash])}) else: chpassboolalt = True # This variable holds boolean indicating successful change of shadowLastChange value to current time chlastchangebool = self.ldapconnection.modify(f'uid={user},{self.base}', {'shadowLastChange' : (MODIFY_REPLACE,[self.lastpwchangenow()])}) # If alternative base is set modify user password/hash with althash, if not, pretend change was successful if self.alt: chlastchangeboolalt = self.ldapconnection.modify(f'uid={user},{self.base}', {'shadowLastChange' : (MODIFY_REPLACE, [self.lastpwchangenow()])}) else: chlastchangeboolalt = True # Return True only if changing of both password and time of last password change was successful return chpassbool and chpassboolalt and chlastchangebool and chlastchangeboolalt def delete(self, user): ''' Delete user given username of user user := string containing username ''' rcode1 = self.ldapconnection.delete(f'uid={user},{self.base}') # If alternative base is set delete user from alternative base, if not, pretend deletion was successful if self.alt: rcode2 = self.ldapconnection.delete(f'uid={user},{self.basealt}') else: rcode2 = True # Return True only if deletion from both bases was successful return rcode1 and rcode2 def getpassword(self, user): ''' Retrive password of a user user := string containing username ''' # Search LDAP entries that have object class inetOrgPerson and uid attribute equal to given user field self.ldapconnection.search(search_base=self.base,search_filter=f'(&(objectClass=inetOrgPerson)(uid={user}))', attributes=['userPassword']) # Return userPassword attribute from the response return self.ldapconnection.response[0]['attributes']['userPassword'][0].decode('utf-8')