Advertisement
Guest User

Untitled

a guest
Apr 8th, 2020
413
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.72 KB | None | 0 0
  1. from ldap3 import Server, Connection, SUBTREE, ServerPool, FIRST
  2.  
  3. @api.route('/ldap/groupmembers_recursive')class groupmembers_recursive(Resource):    def post(self):        response = {}        """        if not request.headers.get('X-Forwarded-For'):            abort(400, {"error": "invalid request _"})        else:            if not ipaddress.ip_address(request.headers.get('X-Forwarded-For')).is_private:                abort(401, {"error": "not authorized"})        """        if not request.headers.get('X-Secret'):            abort(400, {"error": "invalid request"})        else:            if request.headers.get('X-Secret') != api_key:                abort(401, {"error": "not authorized"})        if not request.get_json():            abort(400,{"error": "invalid json"})        if not 'search_base' in request.get_json():            abort(400,{"error": "search base not provided"})        if not 'attributes' in request.get_json():            abort(400,{"error": "ldap attributes not provided"})        manifest = request.get_json()        user_objects = []        try:            if manifest["protocol"] == 'ldap' or manifest["protocol"] == 'LDAP':                use_ssl = False            else:                use_ssl = True            ldap_server_list = []            ldap_server_list.append(                Server(manifest["server_a_address"], port=manifest["server_a_port"], use_ssl=use_ssl,                       connect_timeout=5))            ldap_server_list.append(                Server(manifest["server_b_address"], port=manifest["server_b_port"], use_ssl=use_ssl,                       connect_timeout=5))            server_pool = ServerPool(ldap_server_list, FIRST, active=3, exhaust=True)            ldap_connection = Connection(server_pool, user=manifest["username"], password=manifest["password"],                                         version=manifest["ldap_version"], auto_range=True, receive_timeout=20)            if not ldap_connection.bind():                abort(400,{"error": "LDAP bind error " + str(ldap_connection.result)})                #print('error in bind', ldap_connection.result)  # return non 200 response w/result            group_dn_list = ldap_connection.extend.standard.paged_search(search_base=manifest["groups_search_base"],                                                                         search_filter='(&(objectCategory=group)(cn=' +                                                                                       manifest["group_name"] + '))',                                                                         search_scope=SUBTREE,                                                                         attributes=manifest["attributes"],                                                                         paged_size=100000,                                                                         generator=False)            for group_entry in group_dn_list:                if 'dn' in group_entry.keys():                    #print()                    #print('dn: ' + group_entry['dn'])                    #ldap_connection.bind()                    user_list = ldap_connection.extend.standard.paged_search(search_base=manifest["search_base"],                                                                             search_filter='(&(objectCategory=Person)(sAMAccountName=*)(memberOf:1.2.840.113556.1.4.1941:=' +                                                                                           group_entry['dn'] + '))',                                                                             search_scope=SUBTREE,                                                                             attributes=manifest["attributes"],                                                                             paged_size=100000,                                                                             generator=False)                    for user_entry in user_list:                        if 'dn' in user_entry.keys():                            #print('attributes: ' + str(user_entry['attributes']))                            user_object = {}                            for attribute in user_entry['attributes']:                                if user_entry['attributes'][attribute] != []:                                    user_object[attribute] = user_entry['attributes'][attribute]                                else:                                    user_object[attribute] = None                            #print('user objects: ' + str(user_object))                            user_objects.append(user_object)                         return (user_objects)            ldap_connection.unbind()        except:            abort(400, {"error": {"exception: ": str(error_handling())}})@api.route('/ldap/groupdn')class groupdn(Resource):    def post(self):        response = {}        """        if not request.headers.get('X-Forwarded-For'):            abort(400, {"error": "invalid request _"})        else:            if not ipaddress.ip_address(request.headers.get('X-Forwarded-For')).is_private:                abort(401, {"error": "not authorized"})        """        if not request.headers.get('X-Secret'):            abort(400, {"error": "invalid request"})        else:            if request.headers.get('X-Secret') != api_key:                abort(401, {"error": "not authorized"})        if not request.get_json():            abort(400, {"error": "invalid json"})        if not 'search_base' in request.get_json():            abort(400, {"error": "search base not provided"})        if not 'attributes' in request.get_json():            abort(400, {"error": "ldap attributes not provided"})        manifest = request.get_json()        user_objects = []        try:            if manifest["protocol"] == 'ldap' or manifest["protocol"] == 'LDAP':                use_ssl = False            else:                use_ssl = True            ldap_server_list = []            ldap_server_list.append(                Server(manifest["server_a_address"], port=manifest["server_a_port"], use_ssl=use_ssl,                       connect_timeout=5))            ldap_server_list.append(                Server(manifest["server_b_address"], port=manifest["server_b_port"], use_ssl=use_ssl,                       connect_timeout=5))            server_pool = ServerPool(ldap_server_list, FIRST, active=3, exhaust=True)            ldap_connection = Connection(server_pool, user=manifest["username"], password=manifest["password"],                                         version=manifest["ldap_version"], auto_range=True, receive_timeout=20)            if not ldap_connection.bind():                abort(400, {"error": "LDAP bind error " + str(ldap_connection.result)})                # print('error in bind', ldap_connection.result)  # return non 200 response w/result            group_dn_list = ldap_connection.extend.standard.paged_search(search_base=manifest["groups_search_base"],                                                                         search_filter='(&(objectCategory=group)(cn=' +                                                                                       manifest["group_name"] + '))',                                                                         search_scope=SUBTREE,                                                                         attributes=manifest["attributes"],                                                                         paged_size=100000,                                                                         generator=False)            for group_entry in group_dn_list:                if 'dn' in group_entry.keys():                    return group_entry['dn']            ldap_connection.unbind()            abort(400, {"error": "group dn not found"})        except:            abort(400, {"error": {"exception: ": str(error_handling())}})
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement