Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from ldap3 import Server, Connection, SUBTREE, ServerPool, FIRST
- @api.route('/ldap/groupmembers_recursive')class groupmembers_recursive(Resource): def post(self): response = {} """ if not request.headers.get('X-Forwarded-For'): abort(400, {"error": "invalid request _"}) else: if not ipaddress.ip_address(request.headers.get('X-Forwarded-For')).is_private: abort(401, {"error": "not authorized"}) """ if not request.headers.get('X-Secret'): abort(400, {"error": "invalid request"}) else: if request.headers.get('X-Secret') != api_key: abort(401, {"error": "not authorized"}) if not request.get_json(): abort(400,{"error": "invalid json"}) if not 'search_base' in request.get_json(): abort(400,{"error": "search base not provided"}) if not 'attributes' in request.get_json(): abort(400,{"error": "ldap attributes not provided"}) manifest = request.get_json() user_objects = [] try: if manifest["protocol"] == 'ldap' or manifest["protocol"] == 'LDAP': use_ssl = False else: use_ssl = True ldap_server_list = [] ldap_server_list.append( Server(manifest["server_a_address"], port=manifest["server_a_port"], use_ssl=use_ssl, connect_timeout=5)) ldap_server_list.append( Server(manifest["server_b_address"], port=manifest["server_b_port"], use_ssl=use_ssl, connect_timeout=5)) server_pool = ServerPool(ldap_server_list, FIRST, active=3, exhaust=True) ldap_connection = Connection(server_pool, user=manifest["username"], password=manifest["password"], version=manifest["ldap_version"], auto_range=True, receive_timeout=20) if not ldap_connection.bind(): abort(400,{"error": "LDAP bind error " + str(ldap_connection.result)}) #print('error in bind', ldap_connection.result) # return non 200 response w/result group_dn_list = ldap_connection.extend.standard.paged_search(search_base=manifest["groups_search_base"], search_filter='(&(objectCategory=group)(cn=' + manifest["group_name"] + '))', search_scope=SUBTREE, attributes=manifest["attributes"], paged_size=100000, generator=False) for group_entry in group_dn_list: if 'dn' in group_entry.keys(): #print() #print('dn: ' + group_entry['dn']) #ldap_connection.bind() user_list = ldap_connection.extend.standard.paged_search(search_base=manifest["search_base"], search_filter='(&(objectCategory=Person)(sAMAccountName=*)(memberOf:1.2.840.113556.1.4.1941:=' + group_entry['dn'] + '))', search_scope=SUBTREE, attributes=manifest["attributes"], paged_size=100000, generator=False) for user_entry in user_list: if 'dn' in user_entry.keys(): #print('attributes: ' + str(user_entry['attributes'])) user_object = {} for attribute in user_entry['attributes']: if user_entry['attributes'][attribute] != []: user_object[attribute] = user_entry['attributes'][attribute] else: user_object[attribute] = None #print('user objects: ' + str(user_object)) user_objects.append(user_object) return (user_objects) ldap_connection.unbind() except: abort(400, {"error": {"exception: ": str(error_handling())}})@api.route('/ldap/groupdn')class groupdn(Resource): def post(self): response = {} """ if not request.headers.get('X-Forwarded-For'): abort(400, {"error": "invalid request _"}) else: if not ipaddress.ip_address(request.headers.get('X-Forwarded-For')).is_private: abort(401, {"error": "not authorized"}) """ if not request.headers.get('X-Secret'): abort(400, {"error": "invalid request"}) else: if request.headers.get('X-Secret') != api_key: abort(401, {"error": "not authorized"}) if not request.get_json(): abort(400, {"error": "invalid json"}) if not 'search_base' in request.get_json(): abort(400, {"error": "search base not provided"}) if not 'attributes' in request.get_json(): abort(400, {"error": "ldap attributes not provided"}) manifest = request.get_json() user_objects = [] try: if manifest["protocol"] == 'ldap' or manifest["protocol"] == 'LDAP': use_ssl = False else: use_ssl = True ldap_server_list = [] ldap_server_list.append( Server(manifest["server_a_address"], port=manifest["server_a_port"], use_ssl=use_ssl, connect_timeout=5)) ldap_server_list.append( Server(manifest["server_b_address"], port=manifest["server_b_port"], use_ssl=use_ssl, connect_timeout=5)) server_pool = ServerPool(ldap_server_list, FIRST, active=3, exhaust=True) ldap_connection = Connection(server_pool, user=manifest["username"], password=manifest["password"], version=manifest["ldap_version"], auto_range=True, receive_timeout=20) if not ldap_connection.bind(): abort(400, {"error": "LDAP bind error " + str(ldap_connection.result)}) # print('error in bind', ldap_connection.result) # return non 200 response w/result group_dn_list = ldap_connection.extend.standard.paged_search(search_base=manifest["groups_search_base"], search_filter='(&(objectCategory=group)(cn=' + manifest["group_name"] + '))', search_scope=SUBTREE, attributes=manifest["attributes"], paged_size=100000, generator=False) for group_entry in group_dn_list: if 'dn' in group_entry.keys(): return group_entry['dn'] ldap_connection.unbind() abort(400, {"error": "group dn not found"}) except: abort(400, {"error": {"exception: ": str(error_handling())}})
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement