Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from ldap3 import Server, Connection, SUBTREE, ServerPool, FIRST
- @api.route('/ldap/groupmembers_recursive')
- class groupmembers_recursive(Resource):
- def post(self):
- response = {}
- """
- if not request.headers.get('X-Forwarded-For'):
- abort(400, {"error": "invalid request _"})
- else:
- if not ipaddress.ip_address(request.headers.get('X-Forwarded-For')).is_private:
- abort(401, {"error": "not authorized"})
- """
- if not request.headers.get('X-Secret'):
- abort(400, {"error": "invalid request"})
- else:
- if request.headers.get('X-Secret') != api_key:
- abort(401, {"error": "not authorized"})
- if not request.get_json():
- abort(400,{"error": "invalid json"})
- if not 'search_base' in request.get_json():
- abort(400,{"error": "search base not provided"})
- if not 'attributes' in request.get_json():
- abort(400,{"error": "ldap attributes not provided"})
- manifest = request.get_json()
- user_objects = []
- try:
- if manifest["protocol"] == 'ldap' or manifest["protocol"] == 'LDAP':
- use_ssl = False
- else:
- use_ssl = True
- ldap_server_list = []
- ldap_server_list.append(
- Server(manifest["server_a_address"], port=manifest["server_a_port"], use_ssl=use_ssl,
- connect_timeout=5))
- ldap_server_list.append(
- Server(manifest["server_b_address"], port=manifest["server_b_port"], use_ssl=use_ssl,
- connect_timeout=5))
- server_pool = ServerPool(ldap_server_list, FIRST, active=3, exhaust=True)
- ldap_connection = Connection(server_pool, user=manifest["username"], password=manifest["password"],
- version=manifest["ldap_version"], auto_range=True, receive_timeout=20)
- if not ldap_connection.bind():
- abort(400,{"error": "LDAP bind error " + str(ldap_connection.result)})
- #print('error in bind', ldap_connection.result) # return non 200 response w/result
- group_dn_list = ldap_connection.extend.standard.paged_search(search_base=manifest["groups_search_base"],
- search_filter='(&(objectCategory=group)(cn=' +
- manifest["group_name"] + '))',
- search_scope=SUBTREE,
- attributes=manifest["attributes"],
- paged_size=100000,
- generator=False)
- for group_entry in group_dn_list:
- if 'dn' in group_entry.keys():
- #print()
- #print('dn: ' + group_entry['dn'])
- #ldap_connection.bind()
- user_list = ldap_connection.extend.standard.paged_search(search_base=manifest["search_base"],
- search_filter='(&(objectCategory=Person)(sAMAccountName=*)(memberOf:1.2.840.113556.1.4.1941:=' +
- group_entry['dn'] + '))',
- search_scope=SUBTREE,
- attributes=manifest["attributes"],
- paged_size=100000,
- generator=False)
- for user_entry in user_list:
- if 'dn' in user_entry.keys():
- #print('attributes: ' + str(user_entry['attributes']))
- user_object = {}
- for attribute in user_entry['attributes']:
- if user_entry['attributes'][attribute] != []:
- user_object[attribute] = user_entry['attributes'][attribute]
- else:
- user_object[attribute] = None
- #print('user objects: ' + str(user_object))
- user_objects.append(user_object)
- return (user_objects)
- ldap_connection.unbind()
- except:
- abort(400, {"error": {"exception: ": str(error_handling())}})
- @api.route('/ldap/groupdn')
- class groupdn(Resource):
- def post(self):
- response = {}
- """
- if not request.headers.get('X-Forwarded-For'):
- abort(400, {"error": "invalid request _"})
- else:
- if not ipaddress.ip_address(request.headers.get('X-Forwarded-For')).is_private:
- abort(401, {"error": "not authorized"})
- """
- if not request.headers.get('X-Secret'):
- abort(400, {"error": "invalid request"})
- else:
- if request.headers.get('X-Secret') != api_key:
- abort(401, {"error": "not authorized"})
- if not request.get_json():
- abort(400, {"error": "invalid json"})
- if not 'search_base' in request.get_json():
- abort(400, {"error": "search base not provided"})
- if not 'attributes' in request.get_json():
- abort(400, {"error": "ldap attributes not provided"})
- manifest = request.get_json()
- user_objects = []
- try:
- if manifest["protocol"] == 'ldap' or manifest["protocol"] == 'LDAP':
- use_ssl = False
- else:
- use_ssl = True
- ldap_server_list = []
- ldap_server_list.append(
- Server(manifest["server_a_address"], port=manifest["server_a_port"], use_ssl=use_ssl,
- connect_timeout=5))
- ldap_server_list.append(
- Server(manifest["server_b_address"], port=manifest["server_b_port"], use_ssl=use_ssl,
- connect_timeout=5))
- server_pool = ServerPool(ldap_server_list, FIRST, active=3, exhaust=True)
- ldap_connection = Connection(server_pool, user=manifest["username"], password=manifest["password"],
- version=manifest["ldap_version"], auto_range=True, receive_timeout=20)
- if not ldap_connection.bind():
- abort(400, {"error": "LDAP bind error " + str(ldap_connection.result)})
- # print('error in bind', ldap_connection.result) # return non 200 response w/result
- group_dn_list = ldap_connection.extend.standard.paged_search(search_base=manifest["groups_search_base"],
- search_filter='(&(objectCategory=group)(cn=' +
- manifest["group_name"] + '))',
- search_scope=SUBTREE,
- attributes=manifest["attributes"],
- paged_size=100000,
- generator=False)
- for group_entry in group_dn_list:
- if 'dn' in group_entry.keys():
- return group_entry['dn']
- ldap_connection.unbind()
- abort(400, {"error": "group dn not found"})
- except:
- abort(400, {"error": {"exception: ": str(error_handling())}})
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement