2023-04-03 09:16:15 +00:00
|
|
|
from ldap3 import Server,Connection,ALL,MODIFY_REPLACE
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
class LUSER():
|
|
|
|
'''
|
|
|
|
Class that represents secure connection to LDAP server
|
|
|
|
|
|
|
|
LDAPhost := string IP or hostname of LDAP server
|
|
|
|
admin_user := string DN of LDAP admin user
|
|
|
|
admin_pass := string password of LDAP admin user
|
|
|
|
base := string base in LDAP system where users are made
|
|
|
|
basealt := string base in LDAP system where users are made with password hashes generated for openalt
|
|
|
|
'''
|
|
|
|
|
2023-06-20 17:54:11 +00:00
|
|
|
|
|
|
|
def expandbase(self):
|
|
|
|
'''
|
|
|
|
Extract orgnaization, name of dc object and full domain part with all dc values from base
|
|
|
|
'''
|
|
|
|
# Split base string with commas to find values of organization and dc
|
|
|
|
baselist = self.base.split(",")
|
|
|
|
|
|
|
|
organization = ''
|
|
|
|
dc = ''
|
|
|
|
dcfull = ''
|
|
|
|
|
|
|
|
# Find ou in base and set it as organization variable
|
|
|
|
for i in baselist:
|
|
|
|
if i.split('=')[0] == 'ou':
|
|
|
|
organization = i.split('=')[1]
|
|
|
|
|
|
|
|
# Find first dc and set it as dc variable
|
|
|
|
for i in baselist:
|
|
|
|
if i.split('=')[0] == 'dc':
|
|
|
|
dc = i.split('=')[1]
|
|
|
|
break
|
|
|
|
|
|
|
|
# Find full dc and set it as dcfull variable
|
|
|
|
for i in baselist:
|
|
|
|
if i.split('=')[0] == 'dc':
|
|
|
|
# if first dc, add it from dc variable
|
|
|
|
if dcfull == '':
|
|
|
|
dcfull = f'dc={dc}'
|
|
|
|
else:
|
|
|
|
dcfull += ',dc=' + i.split('=')[1]
|
|
|
|
|
|
|
|
return organization, dc, dcfull
|
|
|
|
|
|
|
|
def __init__(self, ldap_host, admin_user, admin_pass, base, basealt='', autoconnect=True):
|
2023-04-03 09:16:15 +00:00
|
|
|
self.ldap_host = ldap_host
|
|
|
|
self.admin_user = admin_user
|
|
|
|
self.admin_pass = admin_pass
|
|
|
|
self.base = base
|
2023-06-20 17:54:11 +00:00
|
|
|
self.organization, self.dc, self.dcfull = self.expandbase()
|
2023-04-03 09:16:15 +00:00
|
|
|
self.basealt = basealt
|
|
|
|
self.alt = True
|
2023-06-20 17:54:11 +00:00
|
|
|
self.autoconnect = autoconnect
|
2023-04-03 09:16:15 +00:00
|
|
|
ldapserver = Server(ldap_host, use_ssl=True)
|
2023-06-20 17:54:11 +00:00
|
|
|
if self.autoconnect:
|
|
|
|
self.ldapconnection = Connection(ldapserver, admin_user, admin_pass, auto_bind=True)
|
2023-04-03 09:16:15 +00:00
|
|
|
|
|
|
|
# uid and gid of most recently registered users
|
|
|
|
self.lastuid = 1337
|
|
|
|
self.lastgid = 1337
|
|
|
|
|
|
|
|
# Set alt boolean to false if basealt not set
|
|
|
|
if basealt == '':
|
|
|
|
self.alt = False
|
|
|
|
|
|
|
|
def prepareluser(self):
|
|
|
|
'''
|
|
|
|
Create base on LDAP host
|
|
|
|
'''
|
|
|
|
|
|
|
|
# Split base string with commas to find values of organization and dc
|
|
|
|
baselist = self.base.split(",")
|
|
|
|
basealtlist = self.basealt.split(",")
|
|
|
|
|
|
|
|
# Find ou in base and set it as organization variable
|
|
|
|
for i in baselist:
|
|
|
|
if i.split('=')[0] == 'ou':
|
|
|
|
organization = i.split('=')[1]
|
|
|
|
break
|
|
|
|
|
|
|
|
for i in basealtlist:
|
|
|
|
if i.split('=')[0] == 'ou':
|
|
|
|
organizationalt = i.split('=')[1]
|
|
|
|
break
|
|
|
|
|
|
|
|
# Find first dc and set it as dc variable
|
|
|
|
for i in baselist:
|
|
|
|
if i.split('=')[0] == 'dc':
|
|
|
|
dc = i.split('=')[1]
|
|
|
|
break
|
|
|
|
|
|
|
|
for i in basealtlist:
|
|
|
|
if i.split('=')[0] == 'dc':
|
|
|
|
dcalt = i.split('=')[1]
|
|
|
|
break
|
|
|
|
|
|
|
|
# Find full dc and set it as dcfull variable
|
2023-06-20 17:54:11 +00:00
|
|
|
dcfull = ''
|
2023-04-03 09:16:15 +00:00
|
|
|
for i in baselist:
|
|
|
|
if i.split('=')[0] == 'dc':
|
|
|
|
dcfull += ',dc=' + i.split('=')[1]
|
|
|
|
|
|
|
|
for i in basealtlist:
|
|
|
|
if i.split('=')[0] == 'dc':
|
|
|
|
dcfullalt += ',dc=' + i.split('=')[1]
|
|
|
|
|
|
|
|
# Remove first column character
|
|
|
|
dcfull = dcfull[1:]
|
|
|
|
dcfullalt = dcfull[1:]
|
|
|
|
|
|
|
|
# Create organization on LDAP server and store boolean indicating it's success
|
|
|
|
|
|
|
|
rcode1 = self.ldapconnection.add(f'dc={dcfull}', ['dcObject', 'organization'], {'o' : dc, 'dc' : dc})
|
|
|
|
|
|
|
|
if self.alt:
|
|
|
|
rcode2 = self.ldapconnection.add(f'dc={dcfullalt}', ['dcObject', 'organization'], {'o' : dcalt, 'dc' : dcalt})
|
|
|
|
else:
|
|
|
|
rcode2 = True
|
|
|
|
|
|
|
|
# Create organizational units on LDAP server and store boolean indicating it's success
|
|
|
|
|
|
|
|
rcode3 = self.ldapconnection.add(self.base, ['top', 'organizationalUnit'], {'ou' : organization})
|
|
|
|
|
|
|
|
if self.alt :
|
|
|
|
rcode4 = self.ldapconnection.add(self.basealt, ['top', 'organizationalUnit'], {'ou' : organizationalt})
|
|
|
|
else:
|
|
|
|
rcode4 = True
|
|
|
|
|
|
|
|
|
|
|
|
# Return True only if all return values are true
|
|
|
|
|
|
|
|
return rcode1 and rcode2 and rcode3 and rcode4
|
|
|
|
|
|
|
|
def lastpwchangenow(self):
|
|
|
|
'''
|
|
|
|
Return time of last password change for the user set to current time
|
|
|
|
messured in days from UNIX epoch
|
|
|
|
'''
|
|
|
|
|
|
|
|
return str((datetime.utcnow() - datetime(1970,1,1)).days)
|
|
|
|
|
|
|
|
def add(self, user, password, althash=""):
|
|
|
|
'''
|
|
|
|
Add a user to base in LDAP with user and pass as credentials
|
|
|
|
user := string containing username
|
|
|
|
password := string containing user password
|
|
|
|
althash := string containing user password/hash for the alternative base
|
|
|
|
'''
|
|
|
|
|
|
|
|
# Increase UID and GID counters
|
|
|
|
self.lastuid += 1
|
|
|
|
self.lastgid += 1
|
|
|
|
|
|
|
|
|
|
|
|
# Add user to base
|
|
|
|
id = f"uid={user}"
|
|
|
|
|
|
|
|
# Object classes of a user entry
|
|
|
|
objectClass = ['top', 'person', 'organizationalPerson', 'inetOrgPerson', 'posixAccount', 'shadowAccount']
|
|
|
|
|
|
|
|
# Attributes for a user entry
|
2023-06-20 17:54:11 +00:00
|
|
|
attributes = {'cn' : user, 'sn' : user, 'givenName' : user, 'uid' : user, 'uidNumber' : self.lastuid, 'gidNumber' : self.lastgid, 'homeDirectory' : f'/home/{user}', 'loginShell' : '/usr/bin/git-shell', 'gecos' : 'SystemUser', 'shadowLastChange' : self.lastpwchangenow(), 'shadowMax' : '45', 'userPassword' : password }
|
2023-04-03 09:16:15 +00:00
|
|
|
|
|
|
|
attributesalt = {'cn' : user, 'sn' : user, 'givenName' : user, 'uid' : user, 'uidNumber' : self.lastuid, 'gidNumber' : self.lastgid, 'homeDirectory' : f'/home/{user}', 'loginShell' : '/usr//bin/git-shell', 'gecos' : 'SystemUser', 'shadowLastChange' : self.lastpwchangenow(), 'shadowMax' : '45', 'userPassword' : althash}
|
|
|
|
|
|
|
|
# Return boolean value of new user entry
|
|
|
|
rcode1 = self.ldapconnection.add(f'{id},{self.base}', objectClass, attributes)
|
|
|
|
|
|
|
|
# If alternative base is set add the same user to the alternative base as well, if not act as if it was success
|
|
|
|
if self.alt:
|
|
|
|
rcode2 = self.ldapconnection.add(f'{id},{self.basealt}', objectClass, attributesalt)
|
|
|
|
else:
|
|
|
|
rcode2 = True
|
|
|
|
|
|
|
|
# Return True only if both entries was successful
|
|
|
|
return rcode1 and rcode2
|
|
|
|
|
|
|
|
def changepassword(self, user, newpass, althash=''):
|
|
|
|
'''
|
|
|
|
Change password of user to newpass
|
|
|
|
|
|
|
|
user := string containing username
|
|
|
|
newpass := string containing new password
|
|
|
|
althash := string containing password/hash for alternative base
|
|
|
|
'''
|
|
|
|
|
|
|
|
# This variable holds boolean indicating successful change of user password
|
|
|
|
chpassbool = self.ldapconnection.modify(f'uid={user},{self.base}', {'userPassword': (MODIFY_REPLACE,[newpass])})
|
|
|
|
|
|
|
|
# If alternative base is set modify user password/hash with althash, if not, pretend change was successful
|
|
|
|
if self.alt:
|
|
|
|
chpassboolalt = self.ldapconnection.modify(f'uid={user},{self.basealt}', {'userPassword': (MODIFY_REPLACE,[althash])})
|
|
|
|
else:
|
|
|
|
chpassboolalt = True
|
|
|
|
|
|
|
|
# This variable holds boolean indicating successful change of shadowLastChange value to current time
|
|
|
|
chlastchangebool = self.ldapconnection.modify(f'uid={user},{self.base}', {'shadowLastChange' : (MODIFY_REPLACE,[self.lastpwchangenow()])})
|
|
|
|
|
|
|
|
# If alternative base is set modify user password/hash with althash, if not, pretend change was successful
|
|
|
|
if self.alt:
|
|
|
|
chlastchangeboolalt = self.ldapconnection.modify(f'uid={user},{self.base}', {'shadowLastChange' : (MODIFY_REPLACE, [self.lastpwchangenow()])})
|
|
|
|
else:
|
|
|
|
chlastchangeboolalt = True
|
|
|
|
|
|
|
|
# Return True only if changing of both password and time of last password change was successful
|
|
|
|
return chpassbool and chpassboolalt and chlastchangebool and chlastchangeboolalt
|
|
|
|
|
|
|
|
def delete(self, user):
|
|
|
|
'''
|
|
|
|
Delete user given username of user
|
|
|
|
|
|
|
|
user := string containing username
|
|
|
|
'''
|
|
|
|
|
|
|
|
rcode1 = self.ldapconnection.delete(f'uid={user},{self.base}')
|
|
|
|
|
|
|
|
# If alternative base is set delete user from alternative base, if not, pretend deletion was successful
|
|
|
|
if self.alt:
|
|
|
|
rcode2 = self.ldapconnection.delete(f'uid={user},{self.basealt}')
|
|
|
|
else:
|
|
|
|
rcode2 = True
|
|
|
|
|
|
|
|
# Return True only if deletion from both bases was successful
|
|
|
|
return rcode1 and rcode2
|
|
|
|
|
|
|
|
def getpassword(self, user):
|
|
|
|
'''
|
|
|
|
Retrive password of a user
|
|
|
|
|
|
|
|
user := string containing username
|
|
|
|
'''
|
|
|
|
|
|
|
|
# Search LDAP entries that have object class inetOrgPerson and uid attribute equal to given user field
|
|
|
|
self.ldapconnection.search(search_base=self.base,search_filter=f'(&(objectClass=inetOrgPerson)(uid={user}))', attributes=['userPassword'])
|
|
|
|
|
|
|
|
# Return userPassword attribute from the response
|
|
|
|
return self.ldapconnection.response[0]['attributes']['userPassword'][0].decode('utf-8')
|