ProPeler
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cpanel-ccs
/
txdav
/
who
/
Filename :
cpanel.py
back
Copy
## # Copyright 2019 cPanel, L.L.C. # ## """ cPanel directory service implementation. """ __all__ = [ "CpanelDirectoryService", "CpanelDirectoryRecord", ] from uuid import UUID from twext.who.index import (DirectoryRecord as BaseDirectoryRecord, FieldName as IndexFieldName) from txdav.who.xml import DirectoryService as XMLDirectoryService from twext.who.xml import DirectoryService as XMLBaseDirectoryService from zope.interface import implementer from twext.who.idirectory import (DirectoryServiceError, DirectoryAvailabilityError, IPlaintextPasswordVerifier, FieldName as BaseFieldName, RecordType as BaseRecordType) from twisted.internet.defer import succeed, inlineCallbacks, returnValue from twisted.python.constants import Names, Values, ValueConstant, Flags, NamedConstant from base64 import b64encode import urllib import urllib2 import ssl import imaplib from twext.python.log import Logger import os import os.path import fcntl import traceback import inspect import time import re import json from string import upper log = Logger() ## # Directory Service ## # Define the cpanel_user field named constant in a way that the Element 'constantContainer' can comprehend it. class FieldName(Names): cpanel_user = NamedConstant() cpanel_user.description = u"cPanel User" cpanel_user.multiValue = False hasCalendars = NamedConstant() hasCalendars.description = u"has calendars" hasCalendars.valueType = bool loginAllowed = NamedConstant() loginAllowed.description = u"login permitted" loginAllowed.valueType = bool # Needed in order to add our own custom XML elements. class Element(Values): """ XML element names. """ # Booleans true = ValueConstant(u"true") false = ValueConstant(u"false") # Schema hierarchy directory = ValueConstant(u"directory") record = ValueConstant(u"record") # Field names uid = ValueConstant(u"uid") uid.fieldName = BaseFieldName.uid guid = ValueConstant(u"guid") guid.fieldName = BaseFieldName.guid shortName = ValueConstant(u"short-name") shortName.fieldName = BaseFieldName.shortNames fullName = ValueConstant(u"full-name") fullName.fieldName = BaseFieldName.fullNames emailAddress = ValueConstant(u"email") emailAddress.fieldName = BaseFieldName.emailAddresses password = ValueConstant(u"password") password.fieldName = BaseFieldName.password memberUID = ValueConstant(u"member-uid") memberUID.fieldName = IndexFieldName.memberUIDs cpanel_user = ValueConstant(u"cpanel-user") cpanel_user.fieldName = FieldName.cpanel_user hasCalendars = ValueConstant(u"has-calendars") hasCalendars.fieldName = FieldName.hasCalendars loginAllowed = ValueConstant(u"login-allowed") loginAllowed.fieldName = FieldName.loginAllowed # Implement the login path to verify versus cpsrvd @implementer(IPlaintextPasswordVerifier) class CpanelDirectoryRecord(BaseDirectoryRecord): def cpanel_auth (self, user, password): log.debug("Attempting cpanel_auth") if (user == "root") : port = 2087 request = urllib2.Request ("https://127.0.0.1:2087/json-api/version?api.version=1") # use base64 lib insted of standard encoder, as it unhelpfully adds # newlines every 76 bytes b64string = b64encode( user + ':' + password ) request.add_header ("Authorization", ('Basic', b64string)) code = 0 try: response = urllib2.urlopen (request, timeout=180, context=ssl._create_unverified_context ()) code = response.getcode () except urllib2.HTTPError, e: code = e.code if code == 200: return True return False else : # Try login with IMAP, as that supports cPanel "temp" users. # Since we chopped off the cpsess user earlier to get directory # Lookup to work right, we have to now reconstruct the temp user # string based off the password, as it contains the username # before the pass string. # First two letters of a temp user are the first two letters of # the *actual* user, so match `.` for 2 chars, as this can be # *almost* any character. May as well use . as such. matches = re.search( '^(cpses_.{2}[a-z0-9]{8})\[::cpses::\][0-9A-Za-z_]+', password ) if(matches and matches.group(1)) : user = user + '/' + matches.group(1) authenticated = False try: imap_conn = imaplib.IMAP4_SSL("localhost") imap_conn.login( user, password ) imap_conn.logout() authenticated = True except Exception as e: log.debug(traceback.format_exc()) return authenticated return False def verifyPlaintextPassword(self, password): log.debug ("cpanel verifyPlaintextPassword") cpanel_user = self.fields.get(FieldName.cpanel_user) if cpanel_user: if self.cpanel_auth (cpanel_user, password): return succeed(True) else: return succeed(False) return succeed(False) class CpanelDirectoryService (XMLDirectoryService): # Override __init__ specifically to bring in our version of Element above def __init__(self, filePath, refreshInterval=4): log.info("Initializing CpanelDirectoryService") """ @param realmName: a realm name @type realmName: L{unicode} """ XMLDirectoryService.__init__(self, filePath, refreshInterval) self.element = Element # Mostly here so that we can override the default DirectoryRecord with the CpanelDirectoryRecord we provide above def parseRecordNode(self, recordNode, unknownFieldElements=None): log.debug("parsing recordnode") recordTypeAttribute = recordNode.get( self.attribute.recordType.value, u"" ) if recordTypeAttribute: try: recordType = ( self.recordTypeValue .lookupByValue(recordTypeAttribute) .recordType ) except (ValueError, AttributeError): raise UnknownRecordTypeError(recordTypeAttribute) else: recordType = self.recordType.user fields = {} fields[self.fieldName.recordType] = recordType for fieldNode in recordNode: try: fieldElement = self.element.lookupByValue(fieldNode.tag) except ValueError: if unknownFieldElements is not None: unknownFieldElements.add(fieldNode.tag) continue try: fieldName = fieldElement.fieldName except AttributeError: if unknownFieldElements is not None: unknownFieldElements.add(fieldNode.tag) continue valueType = self.fieldName.valueType(fieldName) if valueType in (unicode, UUID): if fieldNode.text is None and valueType is unicode: value = u"" else: value = valueType(fieldNode.text) elif valueType is bool: boolElement = self._constantElement(fieldNode) if boolElement is Element.true: value = True elif boolElement is Element.false: value = False else: raise ParseError( "Child element {0} of element {0} is not a boolean." .format(boolElement.value, fieldNode.tag) ) elif issubclass(valueType, (Names, Values, Flags)): constantElement = self._constantElement(fieldNode) value = constantElement.constantValue else: raise AssertionError( "Unknown value type {0} for field {1}".format( valueType, fieldName ) ) assert value is not None if self.fieldName.isMultiValue(fieldName): values = fields.setdefault(fieldName, []) values.append(value) else: fields[fieldName] = value myRecord = CpanelDirectoryRecord (self, fields) return myRecord def loadRecords(self, loadNow=False, stat=True): log.debug("Loading records from " + self.filePath.path) lockFile= self.filePath.path lockFile += ".lock" f = open (lockFile, "w+") locked = False # wait a maximum of 10 seconds for the file for idx in range (1,20): try: fcntl.flock (f, fcntl.LOCK_EX | fcntl.LOCK_NB) locked = True break except IOError: log.debug ("loadRecords: failed to lock") time.sleep (0.5) super(CpanelDirectoryService, self).loadRecords (loadNow, stat) if locked: fcntl.flock (f, fcntl.LOCK_UN | fcntl.LOCK_NB); f.close () os.unlink(lockFile);