Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- Copyright (c) 2019 Textkernel BV
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in all
- copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- SOFTWARE.
- """
- import functools
- from collections import namedtuple
- from timeit import default_timer as timer
- import py2neo
- from neo4j.v1 import GraphDatabase
- Test = namedtuple('Test', ['name', 'kwargs'])
- class TestRunner:
- def __init__(self):
- username = ''
- password = ''
- self.connectors = [
- py2neoConnector(username, password),
- py2neoConnector(username, password, scheme='http'),
- Neo4jDriverConnector(username, password)
- ]
- self.max_name_len = max(len(connector.name) for connector in self.connectors)
- self.lexical_term_names = [line.strip() for line in open('lexical_term_names.txt')]
- self.tests = [
- Test('count_all', {}),
- Test('retrieve_nodes_by_limit', {'limit': 10}),
- Test('retrieve_nodes_by_limit', {'limit': 100}),
- Test('retrieve_nodes_by_limit', {'limit': 1000}),
- # Test('retrieve_nodes_by_limit', {'limit': 10000}),
- # Test('retrieve_nodes_by_limit', {'limit': 100000}),
- # Test('retrieve_nodes_by_limit', {'limit': 1000000}),
- Test('retrieve_nodes_by_name', {'names': self.lexical_term_names[:10]}),
- Test('retrieve_nodes_by_name', {'names': self.lexical_term_names[:100]}),
- Test('retrieve_nodes_by_name', {'names': self.lexical_term_names[:1000]}),
- Test('retrieve_nodes_by_name', {'names': self.lexical_term_names[:10000]}),
- # Test('retrieve_nodes_by_name', {'names': self.lexical_term_names[:50000]}),
- ]
- def run_tests(self, repetitions=10):
- print(f"Testing with {repetitions} repetitions", end='\n\n')
- for test in self.tests:
- print(str(test)[:250])
- for connector in self.connectors:
- raw_method = getattr(connector, test.name)
- if self.is_method_of_super_class(raw_method, connector):
- continue
- prepared_method = functools.partial(raw_method, **test.kwargs)
- times = [self.timeit_ms(prepared_method) for _ in range(repetitions)]
- display_times = ', '.join([f'{time:.2f}' for time in times])
- print(
- f"\t{connector.name:<{self.max_name_len}} took {self.average(times):.1f}ms ({display_times})")
- @staticmethod
- def timeit_ms(method):
- start = timer()
- method()
- end = timer()
- return (end - start) * 1000
- @staticmethod
- def average(values):
- return sum(values) / len(values)
- @staticmethod
- def is_method_of_super_class(method, connector):
- # connector
- class_name_string = str(type(connector))
- cleaned_class_name_string = class_name_string[class_name_string.find('.') + 1:-2]
- # method
- method_string = str(method)
- method_parent_string = method_string[14:method_string.find('.')]
- return cleaned_class_name_string != method_parent_string
- class AbstractConnector:
- def count_all(self):
- raise NotImplementedError()
- def retrieve_nodes_by_limit(self, limit):
- raise NotImplementedError
- def retrieve_nodes_by_name(self, names):
- raise NotImplementedError
- class py2neoConnector(AbstractConnector):
- def __init__(self, username, password, scheme='bolt', **kwargs):
- self.name = f"py2neo-{scheme}"
- self.connection = py2neo.Graph(auth=(username, password), scheme=scheme, **kwargs)
- def count_all(self):
- return self.connection.run("MATCH (n) RETURN COUNT(*)").data()
- def retrieve_nodes_by_limit(self, limit):
- return self.connection.run("MATCH (n:lexical_term) RETURN n LIMIT {limit}", {'limit': limit}).data()
- def retrieve_nodes_by_name(self, names):
- transaction = self.connection.begin()
- cursors = [
- transaction.run("MATCH (n:lexical_term {name: {name}}) RETURN n", {'name': name})
- for name in names
- ]
- # transaction.process()
- transaction.commit()
- return [cursor.data() for cursor in cursors]
- class Neo4jDriverConnector(AbstractConnector):
- def __init__(self, username, password, **kwargs):
- self.name = f"neo4j-driver"
- self.connection = GraphDatabase.driver("bolt://localhost:7687", auth=(username, password))
- def count_all(self):
- with self.connection.session() as session:
- return session.run("MATCH (n) RETURN COUNT(*)").data()
- def retrieve_nodes_by_limit(self, limit):
- with self.connection.session() as session:
- return session.run("MATCH (n:lexical_term) RETURN n LIMIT {limit}", {'limit': limit}).data()
- def retrieve_nodes_by_name(self, names):
- with self.connection.session() as session:
- transaction = session.begin_transaction()
- cursors = [
- transaction.run("MATCH (n:lexical_term {name: {name}}) RETURN n", {'name': name})
- for name in names
- ]
- transaction.sync()
- transaction.close()
- return [cursor.data() for cursor in cursors]
- if __name__ == '__main__':
- TestRunner().run_tests()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement