Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import io
- import gzip
- import struct
- import urllib.parse
- import json
- from time import *
- from google.protobuf import json_format
- from jsondiff import diff
- from testlib.main_test_case import *
- from testlib.tracker_api.tracker_data_generator import *
- from testlib.tracker_api.tracker_db import *
- import testlib.tracker_api.protobuf_common_structures.CoreStructure_pb2 as CoreStructure
- import testlib.tracker_api.protobuf_common_structures.ExtraFileStructure_pb2 as ExtraFileStructure
- import testlib.tracker_api.protobuf_common_structures.MetadataStructure_pb2 as MetadataStructure
- import testlib.tracker_api.protobuf_common_structures.RulesStructure_pb2 as RulesStructure
- class TrackerApiTestCase(MainTestCase):
- request_adconfig_get = "/adconfig/get"
- request_corefile_get = "/corefile/get"
- request_corefile_ios = "/corefile/ios"
- request_corepatch_get = "/corepatch/get"
- request_extrafile_get = "/extrafile/get"
- request_extrafile_ios = "/extrafile/ios"
- request_extrafile_ios_mapping = '/e/ios?q='
- request_netwfile_get = "/netwfile/get"
- request_add_device_in_smooth = "/admin/add_device"
- request_encode_params = "/admin/q_encode"
- request_decode_params = "/admin/q_decode"
- request_snapshot_sync = "/admin/snapshot/sync"
- request_snapshot_info = "/admin/snapshot/info"
- request_optout_add = "/optout/add"
- request_optout_exists = "/optout_admin/exists"
- request_optout_remove = "/optout_admin/remove"
- request_adconfig_mapping = "/a/?q="
- request_corefile_mapping = '/c/?q='
- request_corefile_ios_mapping = '/c/ios/?q='
- request_corepatch_mapping = '/p/?q='
- request_extrafile_mapping = '/e/?q='
- request_netwfile_mapping = '/n/?q='
- cheat = "&qHme3rK4bg=1"
- provider_cheat = lambda: (
- (True, 'cheat'),
- (False, 'cheat'),
- )
- default_smooth_id_7 = 9
- default_smooth_id_8 = 20
- files_to_deactivate = []
- json_diff_arr = []
- db = None # type: TrackerDb
- cfg_injector_ver = ''
- db_host = ''
- db_port = ''
- db_user = ''
- db_pass = ''
- sdk_test_method = "/sdk/test"
- sdk_method = "/sdk"
- block_type_version_request = '_VERSION_REQUEST'
- block_type_rules_primitives = 'RULES_PRIMITIVES'
- block_type_cache_info = 'CACHE_INFO'
- block_type_sdk_info = 'SDK_INFO'
- block_type_app_info = 'APP_INFO'
- block_type_rule = 'RULE'
- block_type_network = 'NETWORK_BLOCK'
- block_type_decor = 'DECOR'
- block_type_crypto_mining_data = 'CRYPTO_MINING_DATA'
- block_type_extra_file = 'EXTRA_FILE_BLOCK'
- block_type_core_file = 'CORE_FILE_TO'
- block_type_core_patch = 'CORE_PATCH_TO'
- block_class_by_type = {
- block_type_version_request: MetadataStructure.StreamMetadata.VersionRequest(),
- block_type_rules_primitives: RulesStructure.RulesPrimitives(),
- block_type_cache_info: RulesStructure.CacheInfo(),
- block_type_sdk_info: RulesStructure.SdkInfo(),
- block_type_app_info: RulesStructure.AppInfo(),
- block_type_rule: RulesStructure.Rule(),
- block_type_network: RulesStructure.NetworkBlock(),
- block_type_decor: RulesStructure.Decor(),
- block_type_crypto_mining_data: RulesStructure.CryptoMiningData(),
- block_type_extra_file: ExtraFileStructure.ExtraFileBlock(),
- block_type_core_file: CoreStructure.CoreFileTo(),
- block_type_core_patch: CoreStructure.CorePatchTo(),
- }
- block_type_by_class = {
- 'VersionRequest': [block_type_version_request, 0],
- 'RulesPrimitives': [block_type_rules_primitives, 10],
- 'CacheInfo': [block_type_cache_info, 11],
- 'SdkInfo': [block_type_sdk_info, 12],
- 'AppInfo': [block_type_app_info, 13],
- 'Rule': [block_type_rule, 14],
- 'NetworkBlock': [block_type_network, 15],
- 'Decor': [block_type_decor, 16],
- 'CryptoMiningData': [block_type_crypto_mining_data, 17],
- 'ExtraFileBlock': [block_type_extra_file, 18],
- 'CoreFileTo': [block_type_core_file, 19],
- 'CorePatchTo': [block_type_core_patch, 20],
- }
- block_exception = {
- "block - new - user": 'false',
- "date - block - new - user": 0,
- "owner": 197,
- "title": "Too many items",
- "geo_filter_list": 0,
- "analytics_url": "",
- "exp_update": 30,
- "protcore_silent": 180,
- }
- block_exception_in_dict = {
- "mediator_url": "http:\ / \ / api.mediator.tpc.re",
- "mediator": 12,
- "version": 5,
- "link": "test",
- "id": 1404
- }
- def tearDown(self):
- super().tearDown()
- self.deactivate_files()
- @classmethod
- def preparations_before_tests(cls):
- super().preparations_before_tests()
- cls.generator = TrackerDataGenerator()
- cls.injector_ver = cls.define_injector_ver()
- cls.fill_tracker_db = cls.define_fill_tracker_db()
- if cls.fill_tracker_db:
- cls.db = TrackerDb(db=cls.db, db_host=cls.db_host, db_port=cls.db_port, db_user=cls.db_user,
- db_pass=cls.db_pass)
- cls.prepare_tables()
- @classmethod
- def read_tests_config(cls):
- super().read_tests_config()
- cls.cfg_injector_ver = cls.config['tracker']['injector_version']
- cls.cfg_host = cls.config['tracker']['host']
- cls.ios_ver = cls.config['tracker']['ios_version']
- cls.db = cls.define_db_name()
- cls.db_host = cls.define_db_host()
- cls.db_port = cls.define_db_port()
- cls.db_user = cls.define_db_user()
- cls.db_pass = cls.define_db_password()
- cls.files_dir = cls.config['tracker']['files_path']
- @classmethod
- def define_injector_ver(cls):
- (options, args) = cls.parser.parse_args()
- inj_ver = options.injector_ver
- if not inj_ver:
- inj_ver = cls.cfg_injector_ver
- return inj_ver
- @classmethod
- def define_available_options(cls):
- super().define_available_options()
- cls.parser.add_option("-i", "--injector-ver", dest="injector_ver", help="set injector version")
- cls.parser.add_option("--fill-tracker-db", dest="fill_tracker_db", default="False",
- help="fill tracker DB",
- action='callback', callback=cls.optional_arg('True'))
- cls.parser.add_option("--db-name", dest="db_name", help="set tracker db name")
- cls.parser.add_option("--db-host", dest="db_host", help="set tracker db host")
- cls.parser.add_option("--db-port", dest="db_port", help="set tracker db port")
- cls.parser.add_option("--db-user", dest="db_user", help="set tracker db user")
- cls.parser.add_option("--db-password", dest="db_password", help="set tracker db password")
- @classmethod
- def define_fill_tracker_db(cls):
- (options, args) = cls.parser.parse_args()
- fill_tracker_db = options.fill_tracker_db
- return fill_tracker_db == 'True'
- @classmethod
- def define_db_name(cls):
- (options, args) = cls.parser.parse_args()
- db_name = options.db_name
- if not db_name:
- db_name = cls.config['tracker']['db_name']
- return db_name
- @classmethod
- def define_db_host(cls):
- (options, args) = cls.parser.parse_args()
- db_host = options.db_host
- if not db_host:
- db_host = cls.config['tracker']['db_host']
- return db_host
- @classmethod
- def define_db_port(cls):
- (options, args) = cls.parser.parse_args()
- db_port = options.db_port
- if not db_port:
- db_port = cls.config['tracker']['db_port']
- return db_port
- @classmethod
- def define_db_user(cls):
- (options, args) = cls.parser.parse_args()
- db_user = options.db_user
- if not db_user:
- db_user = cls.config['tracker']['db_user']
- return db_user
- @classmethod
- def define_db_password(cls):
- (options, args) = cls.parser.parse_args()
- db_password = options.db_password
- if not db_password:
- db_password = cls.config['tracker']['db_password']
- return db_password
- @classmethod
- def prepare_tables(cls):
- if not cls.db.check_table_exists(cls.db.core_files_table_name):
- cls.db.create_table_corefile()
- if not cls.db.check_table_exists(cls.db.core_files_ios_table_name):
- cls.db.create_table_corefile_ios()
- if not cls.db.check_table_exists(cls.db.core_patches_table_name):
- cls.db.create_table_corepatch()
- if not cls.db.check_table_exists(cls.db.extra_files_table_name):
- cls.db.create_table_extrafile()
- if not cls.db.check_table_exists(cls.db.extra_files_ios_table_name):
- cls.db.create_table_extrafile_ios()
- if not cls.db.check_table_exists(cls.db.netw_files_table_name):
- cls.db.create_table_netwfile()
- if not cls.db.check_table_exists(cls.db.smooth_files_table_name):
- cls.db.create_table_smoothcore()
- if not cls.db.check_table_exists(cls.db.decors_table_name):
- cls.db.create_table_decorations()
- if not cls.db.check_table_exists(cls.db.publishers_table_name):
- cls.db.create_table_publishers()
- if not cls.db.check_table_exists(cls.db.apps_table_name):
- cls.db.create_table_applications()
- if not cls.db.check_table_exists(cls.db.config_table_name):
- cls.db.create_table_config()
- if not cls.db.check_table_exists(cls.db.network_key_table_name):
- cls.db.create_table_network_key()
- if not cls.db.check_table_exists(cls.db.appgroup_table_name):
- cls.db.create_table_appgroup()
- if not cls.db.check_table_exists(cls.db.adnetwork_table_name):
- cls.db.create_table_adnetwork()
- if not cls.db.check_table_exists(cls.db.campaign_table_name):
- cls.db.create_table_campaign()
- def encode_params(self, params, cheat=False):
- params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)
- response = urllib.parse.quote(
- self.make_request(self.request_encode_params + "?" + params)['response']['qUrlEncoded'])
- if response.endswith("="):
- response = re.sub("=+$", "", response)
- if cheat:
- response += self.cheat
- return urllib.parse.unquote(response)
- def assert_response_ok(self, response):
- self.assertEqual("ok", response["response"]["result"])
- self.assertEqual("", response["response"]["error"])
- def assert_response_fail(self, response):
- self.assertEqual("fail", response["response"]["result"])
- def get_core_version(self, ios=False, u=None):
- if ios:
- request = self.request_corefile_ios
- else:
- request = self.request_corefile_get
- params = {'i': self.injector_ver}
- if u:
- params["u"] = u
- r = self.make_request(request=request, params=params)
- return str(r['response']['vCore'])
- def add_cheat_to_params(self, params):
- params['qHme3rK4bg'] = 1
- return params
- def assert_good_extra_file_response(self, response, core_version):
- for resp in response['response']:
- self.assertLessEqual(int(resp['core']), int(core_version))
- self.assertGreater(len(resp['name']), 0)
- self.assertGreater(len(resp['title']), 0)
- self.assertGreater(len(resp['url']), 0)
- def decode_params(self, params):
- return self.make_request(self.request_decode_params + '?value=' + params, decode_resp=False)
- def is_decoded(self, params):
- return self.decode_params(params)['status'] == self.status_code_ok
- def assert_core_without_smooth(self, response, i, v_core=None):
- self.assertNotEqual("", response['response']["vCore"])
- if v_core:
- self.assertEqual(v_core, response['response']["vCore"])
- self.assertEqual(int(i), response['response']["vInject"])
- self.assertEqual([], response['response']['countries'])
- self.assertNotEqual("", response['response']["url"])
- def assert_core_in_smooth(self, response, i, v_core=None):
- self.assertNotEqual("", response['response']["vCore"])
- if v_core:
- self.assertEqual(v_core, response['response']["vCore"])
- self.assertEqual(int(i), response['response']["vInject"])
- self.assertNotEqual([], response['response']['countries'])
- self.assertNotEqual("", response['response']["url"])
- def add_new_core_file(self, params=None, smooth=False, ios=False):
- if not self.fill_tracker_db:
- return
- data = {}
- if params:
- data = params
- if "vInject" not in data.keys():
- data["vInject"] = self.injector_ver
- if "file" not in data.keys():
- data["file"] = "%s/autotest-file-%s.zip" % (self.files_dir, uuid.uuid4().hex)
- self.touch_file(data["file"])
- if ios:
- snapshot_type = "IosCoreFiles"
- table = self.db.core_files_ios_table_name
- else:
- snapshot_type = "CoreFiles"
- table = self.db.core_files_table_name
- params = self.generator.generate_core_file(def_params=data, smooth=smooth, ios=ios)
- self.db.insert_core_file(params=params, ios=ios)
- self.sync_snapshots_and_wait([snapshot_type])
- self.files_to_deactivate.append({"title": params["title"], "table": table, "snapshot_type": snapshot_type})
- return params
- def add_new_corepatch(self, params=None, core_file_title=None):
- if not self.fill_tracker_db:
- return
- data = {}
- if params:
- data = params
- if "corefileId" not in data.keys() and core_file_title:
- data["corefileId"] = self.db.get_corefile_id(core_file_title)
- if "file" not in data.keys():
- data["file"] = "%s/autotest-file-%s.zip" % (self.files_dir, uuid.uuid4().hex)
- self.touch_file(data["file"])
- snapshot_type = "CorePatches"
- table = self.db.core_patches_table_name
- params = self.generator.generate_corepatch_file(def_params=data)
- self.db.insert_corepatch_file(params)
- self.sync_snapshots_and_wait([snapshot_type])
- self.files_to_deactivate.append({"title": params["title"], "table": table, "snapshot_type": snapshot_type})
- def add_new_extra_file(self, params=None, ios=False):
- if not self.fill_tracker_db:
- return
- data = {}
- if params:
- data = params
- if "file" not in data.keys():
- data["file"] = "%s/autotest-file-%s.zip" % (self.files_dir, uuid.uuid4().hex)
- self.touch_file(data["file"])
- if ios:
- snapshot_type = "IosExtraFiles"
- table = self.db.extra_files_ios_table_name
- else:
- snapshot_type = "ExtraFiles"
- table = self.db.extra_files_table_name
- params = self.generator.generate_extra_file(def_params=data, ios=ios)
- self.db.insert_extra_file(params=params, ios=ios)
- self.sync_snapshots_and_wait([snapshot_type])
- self.files_to_deactivate.append({"title": params["title"], "table": table, "snapshot_type": snapshot_type})
- return params
- def add_new_netw_file(self, params=None):
- if not self.fill_tracker_db:
- return
- data = {}
- if params:
- data = params
- if "file" not in data.keys():
- data["file"] = "%s/autotest-file-%s.zip" % (self.files_dir, uuid.uuid4().hex)
- self.touch_file(data["file"])
- snapshot_type = "NetworkFiles"
- table = self.db.netw_files_table_name
- params = self.generator.generate_netw_file(def_params=data)
- self.db.insert_netw_file(params)
- self.sync_snapshots_and_wait([snapshot_type])
- self.files_to_deactivate.append({"title": params["title"], "table": table, "snapshot_type": snapshot_type})
- def add_new_decoration(self, params=None):
- if not self.fill_tracker_db:
- return
- data = {}
- if params:
- data = params
- if "image_mdpi" not in data.keys():
- data["image_mdpi"] = "%s/autotest-file-mdpi-%s.png" % (self.files_dir, uuid.uuid4().hex)
- self.touch_file(data["image_mdpi"])
- if "image_hdpi" not in data.keys():
- data["image_hdpi"] = "%s/autotest-file-hdpi-%s.png" % (self.files_dir, uuid.uuid4().hex)
- self.touch_file(data["image_hdpi"])
- if "image_xhdpi" not in data.keys():
- data["image_xhdpi"] = "%s/autotest-file-xhdpi-%s.png" % (self.files_dir, uuid.uuid4().hex)
- self.touch_file(data["image_xhdpi"])
- if "image_xxhdpi" not in data.keys():
- data["image_xxhdpi"] = "%s/autotest-file-xxhdpi-%s.png" % (self.files_dir, uuid.uuid4().hex)
- self.touch_file(data["image_xxhdpi"])
- snapshot_type = "Decorations"
- table = self.db.decors_table_name
- params = self.generator.generate_decoration(def_params=data)
- self.db.insert_decoration(params)
- self.sync_snapshots_and_wait([snapshot_type])
- self.files_to_deactivate.append({"title": params["title"], "table": table, "snapshot_type": snapshot_type})
- def add_new_app(self, params=None, pub_login=None):
- if not self.fill_tracker_db:
- return
- data = {}
- if params:
- data = params
- if "owner" not in data.keys() and pub_login:
- data["owner"] = self.db.get_publisher_id(pub_login)
- params = self.generator.generate_app(def_params=data)
- self.db.insert_app(params)
- self.sync_snapshots_and_wait(["Applications"])
- return params
- def add_new_publisher(self, params=None):
- if not self.fill_tracker_db:
- return
- data = {}
- if params:
- data = params
- params = self.generator.generate_publisher(def_params=data)
- self.db.insert_publisher(params)
- return params
- def add_new_config(self, params=None):
- if not self.fill_tracker_db:
- return
- data = {}
- if params:
- data = params
- params = self.generator.generate_config(def_params=data)
- self.db.insert_config(params)
- self.sync_snapshots_and_wait(["ConfigProperties"])
- def add_new_network_key(self, params=None, pub_login=None):
- if not self.fill_tracker_db:
- return
- data = {}
- if params:
- data = params
- if "adnetwork" not in data.keys():
- adnetwork = self.add_new_adnetwork()
- data["adnetwork"] = "%s" % self.db.get_adnetwork_id(title=adnetwork["title"])
- if pub_login:
- data["owner"] = self.db.get_publisher_id(pub_login)
- params = self.generator.generate_network_key(def_params=data)
- self.db.insert_network_key(params)
- self.sync_snapshots_and_wait(["AdNetworkKeys"])
- return params
- def add_new_appgroup(self, params=None, keys_titles=None):
- if not self.fill_tracker_db:
- return
- data = {}
- if params:
- data = params
- if "adKeys" not in data.keys() and keys_titles:
- keys = []
- for key_title in keys_titles:
- keys.append(self.db.get_network_key_id(title=key_title))
- data["adKeys"] = keys
- params = self.generator.generate_appgroup(def_params=data)
- self.db.insert_appgroup(params)
- self.sync_snapshots_and_wait(["AppGroups"])
- return params
- def add_new_adnetwork(self, params=None):
- if not self.fill_tracker_db:
- return
- data = {}
- if params:
- data = params
- params = self.generator.generate_adnetwork(def_params=data)
- self.db.insert_adnetwork(params)
- self.sync_snapshots_and_wait(["AdNetworks"])
- return params
- def add_new_campaign(self, params=None, pub_login=None):
- if not self.fill_tracker_db:
- return
- data = {}
- if params:
- data = params
- if "owner" not in data.keys() and pub_login:
- data["owner"] = self.db.get_publisher_id(pub_login)
- params = self.generator.generate_campaign(def_params=data)
- self.db.insert_campaign(params)
- self.sync_snapshots_and_wait(["Campaigns"])
- return params
- def update_app(self, title, params):
- app_id = self.db.get_app_id(title)
- self.db.update_app(app_id, params)
- self.sync_snapshots_and_wait(["Applications"])
- def update_campaign(self, title, params):
- app_id = self.db.get_campaign_id(title)
- self.db.update_campaign(app_id, params)
- self.sync_snapshots_and_wait(["Campaigns"])
- def deactivate_files(self):
- if not self.fill_tracker_db or not self.files_to_deactivate:
- return
- snapshot_types = []
- for file_to_deactivate in self.files_to_deactivate:
- snapshot_types.append(file_to_deactivate["snapshot_type"])
- self.db.make_file_inactive(title=file_to_deactivate["title"], table=file_to_deactivate["table"])
- self.sync_snapshots_and_wait(snapshot_types)
- self.files_to_deactivate = []
- def sync_snapshots_and_wait(self, snapshot_types, timeout=20000):
- info_before = self.make_request(request=self.request_snapshot_info)['response']
- self.make_request(request=self.request_snapshot_sync)
- for snapshot_type in snapshot_types:
- i = 0
- while True:
- info_after = self.make_request(request=self.request_snapshot_info)['response'][snapshot_type]
- if (int(info_before[snapshot_type]["entitiesCount"]) > int(info_after["entitiesCount"])) or (
- int(info_before[snapshot_type]["lastSyncTimestamp"]) < int(
- info_after["lastSyncTimestamp"])):
- break
- if i >= timeout:
- raise TimeoutError('Timeout has been reached while waiting for snapshots sync')
- sleep(3.0 / 1000.0)
- i += 1
- def touch_file(self, path):
- if not os.path.isdir(os.path.dirname(path)):
- os.mkdir(os.path.dirname(path))
- cmd = "touch %s" % path
- os.system(cmd)
- def get_smooth_id(self, corefile=None, core_id=None):
- if not self.fill_tracker_db:
- smooth_id = {
- '7': self.default_smooth_id_7,
- '8': self.default_smooth_id_8,
- }
- return smooth_id[str(self.injector_ver)]
- if not core_id and not corefile:
- raise Exception("Please set core file title or core file id")
- if not core_id and corefile:
- core_id = self.db.get_corefile_id(title=corefile["title"])
- return self.db.get_smooth_id(core_id)
- def add_device_in_smooth(self, device_id, smooth_id=None, corefile=None):
- if not smooth_id:
- smooth_id = self.get_smooth_id(corefile=corefile)
- params = {"device": device_id, "smoothId": smooth_id}
- self.make_request(request=self.request_add_device_in_smooth, params=params)
- self.sync_snapshots_and_wait(snapshot_types=["SmoothTests"])
- def assert_file_correct(self, file, min_size=10000, expected_type="core"):
- self.assertEqual(self.status_code_ok, file['status'])
- self.assertGreater(int(file['headers'].get('Content-Length')), min_size)
- actual_type = file['headers'].get('Content-Type')
- self.assert_file_content_type(actual_type, expected_type)
- def assert_file_content_type(self, content_type, expected_type="core"):
- types = []
- if expected_type == "core":
- types = self.content_types_corefile
- if expected_type == "decors":
- types = self.content_types_decors
- self.assertIn(content_type, types)
- @staticmethod
- def xor(buffer, mask='testKeyForXor'):
- mask = mask.encode()
- result = []
- j = 0
- mask_len = len(mask)
- for i in range(len(buffer)):
- result.append(buffer[i] ^ mask[j])
- j += 1
- if j == mask_len:
- j = 0
- return bytearray(result)
- @staticmethod
- def write_blocks_to_stream(blocks):
- data_output_stream = io.BytesIO()
- stream_metadata = MetadataStructure.StreamMetadata()
- blocks_metadata = []
- for block in blocks:
- serialized_message = block.SerializeToString()
- block_metadata = stream_metadata.BlockMetadata()
- block_metadata.block_size = len(serialized_message)
- block_metadata.block_type = 0
- blocks_metadata.append(block_metadata)
- stream_metadata.blocks_metadata.extend(blocks_metadata)
- serialized_metadata = stream_metadata.SerializeToString()
- # write Stream Metadata size
- data_output_stream.write(struct.pack(">i", len(serialized_metadata)))
- # write Stream Metadata
- data_output_stream.write(serialized_metadata)
- # write Stream Body
- for block in blocks:
- data_output_stream.write(block.SerializeToString())
- return data_output_stream.getvalue()
- def read_blocks_from_stream(self, input_stream):
- result = []
- input_stream = io.BytesIO(input_stream)
- header_size, = struct.unpack('>i', input_stream.read(4))
- header_bytes = input_stream.read(header_size)
- stream_metadata = MetadataStructure.StreamMetadata()
- stream_metadata.ParseFromString(header_bytes)
- blocks_metadata_list = json_format.MessageToDict(stream_metadata)['blocksMetadata']
- if blocks_metadata_list:
- for block_metadata in blocks_metadata_list:
- body_buffer = input_stream.read(block_metadata.get('blockSize', 0))
- parsed_message = self.block_class_by_type[block_metadata['blockType']]
- parsed_message_default_values = {}
- for parsed_message_key in list(parsed_message.DESCRIPTOR.fields):
- try:
- parsed_message_default_values[
- self.snake_to_camel_case(parsed_message_key.name)] = parsed_message_key.default_value
- except NotImplementedError:
- parsed_message_default_values[
- self.snake_to_camel_case(parsed_message_key.name)] = 'NotImplemented!' # todo
- parsed_message.ParseFromString(body_buffer)
- # todo add fields with default values
- parsed_message = json_format.MessageToDict(parsed_message)
- parsed_message = {**parsed_message_default_values, **parsed_message}
- result.append({block_metadata['blockType']: [parsed_message, block_metadata['hash']]})
- return result
- def make_protobuf_request(self, request, params):
- blocks = self.get_blocks_by_request_type(request)
- version_request = MetadataStructure.StreamMetadata.VersionRequest()
- block_hashes = []
- for block in blocks:
- block_hash = version_request.BlockHash()
- block_hash.block_type = self.block_type_by_class[block.DESCRIPTOR.name][1]
- block_hash.hash = ""
- block_hashes.append(block_hash)
- version_request.block_hashes.extend(block_hashes)
- # todo request encryption
- if 'id' not in params:
- params['id'] = '2154-2963-3801'
- for k, v in params.items():
- version_request.request_params[k] = str(v)
- request = self.write_blocks_to_stream([version_request])
- request_gz = gzip.compress(request, compresslevel=1)
- request_gz_xor = self.xor(request_gz)
- response_raw = self.make_request(request=self.sdk_method, data=request_gz_xor, req_type='post',
- decode_resp=False)
- response_xor = self.xor(response_raw['response'].content, mask=params['id'])
- response_unzip = gzip.decompress(response_xor)
- if response_raw['status'] == self.status_code_ok:
- response = self.read_blocks_from_stream(response_unzip)
- return {'response': response, 'status': response_raw['status']}
- return {'response': response_raw['response'].content, 'status': response_raw['status']}
- def get_blocks_by_request_type(self, request):
- blocks_rules = [RulesStructure.RulesPrimitives(),
- RulesStructure.CacheInfo(),
- RulesStructure.AppInfo(),
- RulesStructure.CryptoMiningData(),
- RulesStructure.NetworkBlock(),
- RulesStructure.Decor(),
- RulesStructure.Rule(),
- RulesStructure.SdkInfo(),
- ]
- blocks_corefile = [CoreStructure.CoreFileTo(),
- ]
- blocks_corepatch = [CoreStructure.CorePatchTo(),
- ]
- blocks_extrafile = [ExtraFileStructure.ExtraFileBlock(),
- ]
- blocks = {
- self.request_adconfig_get: blocks_rules,
- self.request_adconfig_mapping: blocks_rules,
- self.request_corefile_get: blocks_corefile,
- self.request_corefile_mapping: blocks_corefile,
- self.request_corefile_ios: blocks_corefile,
- self.request_corefile_ios_mapping: blocks_corefile,
- self.request_corepatch_get: blocks_corepatch,
- self.request_corepatch_mapping: blocks_corepatch,
- self.request_extrafile_get: blocks_extrafile,
- self.request_extrafile_mapping: blocks_extrafile,
- self.request_extrafile_ios: blocks_extrafile,
- self.request_extrafile_ios_mapping: blocks_extrafile,
- self.request_netwfile_get: blocks_extrafile,
- self.request_netwfile_mapping: blocks_extrafile,
- }[request]
- return blocks
- @staticmethod
- def get_block(protobuf_response, block_type):
- result = []
- for block in protobuf_response:
- if list(block.keys())[0] == block_type:
- result.append(block)
- if len(result) != 1:
- raise Exception("Found {} blocks of type {}.".format(len(result), block_type))
- return result[0][block_type]
- @staticmethod
- def snake_to_camel_case(value):
- value = value.replace('-', '_')
- parts = value.split('_')
- return {
- 'startdelay': 'startDelay',
- }.get(value, parts[0] + ''.join(x.title() for x in parts[1:]))
- @staticmethod
- def camel_to_snake_case(value):
- if value in ['ruleId', # case converting rule exceptions: don't convert if both json and protobuf have 'ruleId'
- ]:
- return value
- return {
- 'startDelay': 'startdelay',
- }.get(value, re.sub('([a-z0-9])([A-Z])', r'\1_\2', re.sub('(.)([A-Z][a-z]+)', r'\1_\2', value)).lower())
- blocks_mapping = {
- 'sdk': block_type_sdk_info,
- 'app': block_type_app_info,
- 'rule': block_type_rule,
- 'cache': block_type_cache_info,
- 'networks': block_type_network,
- 'decors': block_type_decor,
- }
- # todo
- blocks_mapping2 = {
- block_type_sdk_info: 'sdk',
- block_type_app_info: 'app',
- block_type_rule: 'rule',
- block_type_cache_info: 'cache',
- block_type_network: 'networks',
- block_type_decor: 'decors',
- block_type_crypto_mining_data: 'crypto_mining_data',
- }
- rules_primitives = [
- 'result',
- 'error',
- 'silent',
- 'optout',
- 'country',
- ]
- def assert_protobuf_response2(self, json_response, protobuf_response, params):
- json_response = json_response['response']
- protobuf_response = protobuf_response['response']
- json_keys = [x for x in json_response.keys() if x not in self.rules_primitives]
- json_keys_primitives = [x for x in json_response.keys() if x in self.rules_primitives]
- diff = []
- # primitives
- protobuf_block = self.get_block(protobuf_response, self.block_type_rules_primitives)
- diff = self.assert_block(diff=diff,
- json_keys=json_keys_primitives,
- json_response=json_response,
- protobuf_block=protobuf_block,
- block_type=self.block_type_rules_primitives)
- # other keys
- for key in json_keys:
- json_block_keys = json_response[key].keys()
- protobuf_block = self.get_block(protobuf_response, self.blocks_mapping[key])
- diff = self.assert_block(diff=diff,
- json_keys=json_block_keys,
- json_response=json_response[key],
- protobuf_block=protobuf_block,
- block_type=self.blocks_mapping[key])
- # custom keys
- diff = self.assert_block(diff=diff,
- json_keys=json_response['rule']['decors'].keys(),
- json_response=json_response['rule']['decors'],
- protobuf_block=self.get_block(protobuf_response, self.blocks_mapping['decors']),
- block_type=self.blocks_mapping['decors'])
- # networks
- diff = self.assert_networks(json_response['rule']['networks'],
- self.get_block(protobuf_response, self.blocks_mapping['networks'])[0]['networks'],
- diff)
- # todo compare way 1
- # self.assertEqual(diff, [])
- print(params)
- def assert_block(self, diff, json_keys, json_response, protobuf_block, block_type):
- json_keys = [x.replace('-', '_') for x in json_keys]
- protobuf_keys = protobuf_block[0].keys()
- protobuf_keys_snake_case = [self.camel_to_snake_case(x) for x in protobuf_keys]
- keys_diff_json = set(json_keys) - set(protobuf_keys_snake_case)
- if block_type == self.block_type_rule and 'decors' in keys_diff_json:
- keys_diff_json.remove('decors')
- if block_type == self.block_type_rule and 'networks' in keys_diff_json:
- keys_diff_json.remove('networks')
- if keys_diff_json:
- diff.append([block_type, 'Keys that present in json but absent in protobuf.', keys_diff_json])
- json_keys_camel_case = [self.snake_to_camel_case(x) for x in json_keys]
- keys_diff_protobuf = set(protobuf_keys) - set(json_keys_camel_case)
- if keys_diff_protobuf:
- diff.append([block_type, 'Keys that present in protobuf but absent in json.', keys_diff_protobuf])
- block_diff = []
- if block_type == self.block_type_rules_primitives:
- for key in self.rules_primitives:
- block_diff = self.get_block_diff(block_diff, json_response, protobuf_block, key)
- else:
- for key in json_response:
- block_diff = self.get_block_diff(block_diff, json_response, protobuf_block, key)
- if block_diff:
- diff.append([block_type, 'Values diff.', block_diff])
- return diff
- def get_block_diff(self, diff, json_response, block, key):
- try:
- json_value = json_response[key]
- protobuf_value = block[0][self.snake_to_camel_case(key)]
- if type(json_value) is bool or type(protobuf_value) is bool:
- if json_value in ['true', 'false']:
- json_value = self.bool_from_string(json_value)
- if type(json_value) is int and type(self.try_int_from_string(protobuf_value)) is int:
- protobuf_value = self.try_int_from_string(protobuf_value)
- if json_value is None and protobuf_value == '':
- json_value = ''
- self.assertEqual(json_value, protobuf_value)
- except KeyError:
- # diff.append(["KeyError", key])
- pass
- except AssertionError:
- diff.append([
- 'json: {} = {} ({})'.format(key, json_response[key], type(json_response[key])),
- 'protobuf: {} = {} ({})'.format(self.snake_to_camel_case(key), block[0][self.snake_to_camel_case(key)],
- type(block[0][self.snake_to_camel_case(key)]))])
- return diff
- @staticmethod
- def bool_from_string(value):
- return {
- 'true': True,
- 'false': False,
- }[value]
- def build_json_from_protobuf(self, protobuf_response):
- json_rules = {'cache': {},
- 'sdk': {},
- 'app': {},
- 'rule': {},
- 'crypto_mining_data': {},
- }
- # RULES_PRIMITIVES
- block_primitives = self.get_block(protobuf_response, self.block_type_rules_primitives)[0]
- for k, v in block_primitives.items():
- json_rules[self.camel_to_snake_case(k)] = v
- # OTHER BLOCKS
- for block in protobuf_response:
- block_type = list(block.keys())[0]
- if block_type not in [self.block_type_rules_primitives,
- self.block_type_network,
- self.block_type_decor,
- ]:
- block_values = {}
- for k, v in list(block.values())[0][0].items():
- block_values[self.camel_to_snake_case(k)] = v
- json_rules[self.blocks_mapping2[block_type]].update(block_values)
- continue
- # NETWORKS
- if block_type in [self.block_type_network,
- ]:
- json_rules['rule'].update(list(block.values())[0][0])
- continue
- # DECORS
- if block_type in [self.block_type_decor,
- ]:
- json_rules['rule']['decors'] = list(block.values())[0][0]['imgs']
- continue
- return json_rules
- def assert_networks(self, json_networks, protobuf_networks, diff):
- diff_networks = []
- if len(json_networks) != len(protobuf_networks):
- diff_networks.append([self.block_type_network,
- f'Number of networks in json: {len(json_networks)} '
- f'!= number of networks in protobuf: {len(protobuf_networks)}'])
- for json_network in json_networks:
- protobuf_network = [x for x in protobuf_networks if x['id'] == json_network['id']][0]
- if 'params' in protobuf_network:
- for k, v in protobuf_network['params'].items():
- protobuf_network[k] = v
- del (protobuf_network['params'])
- diff_keys = json_network.keys() - protobuf_network.keys()
- if diff_keys:
- diff_networks.append([json_network['id'], diff_keys])
- for diff_key in diff_keys:
- del json_network[diff_key]
- for k, v in json_network.items():
- try:
- self.assertEqual(v, protobuf_network[k])
- except AssertionError:
- diff_networks.append(
- [json_network['id'], f'Values diff. json: {v}. protobuf: {protobuf_network[k]}'])
- diff.append(diff_networks)
- return diff
- decor_types = [
- 'mdpi',
- 'hdpi',
- 'xhdpi',
- 'xxhdpi',
- ]
- def remove_decors_from_json(self, json_response, decor='hdpi'):
- decors = self.flatten_decors(json_response['rule']['decors'][decor])
- json_response['rule']['decors'] = decors
- return json_response
- @staticmethod
- def flatten_decors(d):
- result = {}
- for k, v in d.items():
- if type(v) is dict:
- for kk, vv in v.items():
- result[f'{k}.{kk}'] = vv
- else:
- result[f'default.{k}'] = v
- return result
- def build_json_from_json(self, json_response):
- json_from_json = deepcopy(json_response)
- json_from_json = self.remove_decors_from_json(json_from_json)
- return json_from_json
- def assert_protobuf_response(self, json_response, protobuf_response, params):
- # todo: Compare json and protobuf responses.
- # Simplify both, convert them to similar json objects, then compare.
- json_from_json2 = json.dumps(self.build_json_from_json(json_response['response']), sort_keys=True)
- json_from_protobuf2 = json.dumps(self.build_json_from_protobuf(protobuf_response['response']), sort_keys=True)
- json_from_json = self.build_json_from_json(json_response['response'])
- json_from_protobuf = self.build_json_from_protobuf(protobuf_response['response'])
- log_file = open('json_logs.json', 'w')
- log_file.write(str(json_from_json))
- log_file.close()
- log_file = open('protobuf_logs.json', 'w')
- log_file.write(str(json_from_protobuf))
- log_file.close()
- # print(json_from_json == json_from_protobuf)
- # dict_diff = {k: json_from_protobuf[k] for k in set(json_from_protobuf) - set(json_from_json)}
- # print(dict_diff)
- #
- # diff = []
- # diff = self.get_rules_diff(json_from_json, json_from_protobuf, diff)
- # print(diff)
- diff = self.check_diff(json_from_json, json_from_protobuf)
- print("finish")
- # print(diff)
- for i in diff:
- print(i)
- print("finish")
- # for i in diff:
- # print(i)
- def get_rules_diff(self, json_from_json, json_from_protobuf, diff, key=''):
- for k, v in json_from_json.items():
- if (type(v) not in (dict, list, set)) or (type(v) is list and type(v[0]) not in (dict, list, set)):
- try:
- self.assertEqual(v, json_from_protobuf[k])
- except KeyError:
- diff.append(['KeyError', f'{key}.{k}'])
- except AssertionError:
- diff.append(['AssertionError',
- f'Json: {k} = {v} ({type(v)})',
- f'Protobuf {k} = {json_from_protobuf[k]} ({type(json_from_protobuf[k])})'])
- elif type(v) is dict:
- diff = self.get_rules_diff(v, json_from_protobuf[k], diff, k)
- elif type(v) is list and type(v[0]) is dict:
- diff = self.get_rules_diff(v, json_from_protobuf[k], diff, k)
- return diff
- @staticmethod
- def check_ext(json_key, json_ext):
- temp = True
- # print("here in check ext",json_key)
- for key in json_ext:
- if key == json_key:
- print('исключение', key)
- temp = False
- return temp
- return temp
- def check_diff(self, origin_json, protobuf_json):
- res = True
- for key in origin_json:
- res = self.check_ext(key, self.block_exception)
- if not res:
- continue
- try:
- if key == "networks" or key == "app" or key == "sdk":
- print('*****************{}*******************'.format(key))
- print(diff(origin_json[key], protobuf_json[key]))
- print('*****************{}*******************'.format(key))
- continue
- if (type(origin_json[key]) and type(protobuf_json[key])) is dict:
- self.check_diff(origin_json[key], protobuf_json[key])
- # elif (type(origin_json[key]) is dict and type(protobuf_json[key]) is not dict) or (type(origin_json[key]) not dict and type(protobuf_json[key]) not dict):
- # print('diffetent types')
- elif (type(origin_json[key]) and type(protobuf_json[key])) is list:
- pass
- # origin_sorted = sorted(origin_json[key], key=dict)
- # protobuf_sorted = sorted(protobuf_json[key], key=dict)
- # for origin_key in range(0, len(origin_sorted)):
- # for proto_key in range(0, len(protobuf_sorted)):
- # if isinstance(origin_sorted[origin_key], dict) and isinstance(
- # protobuf_sorted[proto_key],
- # dict):
- # self.check_diff(origin_sorted[origin_key], protobuf_sorted[proto_key])
- # # print('*****************json*******************')
- # # print(diff(origin_json[key][origin_key], protobuf_json[key][proto_key]))
- # # print('*****************json*******************')
- # elif str(origin_sorted[origin_key]).lower() != str(
- # protobuf_sorted[proto_key]).lower():
- # self.json_diff_arr.append({
- # key: (origin_sorted[origin_key], protobuf_sorted[proto_key])
- # })
- elif str(origin_json[key]).lower() != str(protobuf_json[key]).lower():
- self.json_diff_arr.append({
- key: (origin_json[key], protobuf_json[key])
- })
- except KeyError:
- print('************************************')
- print("error is here. key not found")
- print('key value', key)
- print("json", origin_json[key])
- print("prot", protobuf_json[key])
- print('************************************')
- return self.json_diff_arr
- def check_difference(self, orig, new):
- diff = {}
- result = False
- if type(orig) != type(new):
- print("Type difference")
- return True
- else:
- if type(orig) is dict and type(new) is dict:
- print("Types are both dicts")
- # Check each of these dicts from the key level
- diff_test = False
- for key in orig:
- try:
- result = self.check_difference(orig[key], new[key])
- except KeyError:
- print("error is here")
- print('result var', result)
- print('key value', key)
- print("json", orig[key])
- print("prot", new[key])
- # raise
- if result: # Means a difference was found and returned
- diff_test = True
- print("key/Values different: " + str(key))
- diff[key] = result
- else:
- result = False
- # And check for keys in second dataset that aren't in first
- for key in new:
- if key not in orig:
- diff[key] = ("KeyNotFound", new[key])
- diff_test = True
- if diff_test:
- return diff
- else:
- return False
- else:
- print("Types were not dicts, likely strings")
- if str(orig) == str(new):
- return False
- else:
- return [str(orig), str(new)]
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement