Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git a/fastavro/__init__.py b/fastavro/__init__.py
- index 0ae079d..5fa3e93 100644
- --- a/fastavro/__init__.py
- +++ b/fastavro/__init__.py
- @@ -79,6 +79,9 @@ acquaint_schema = _acquaint_schema
- _schema.acquaint_schema = _acquaint_schema
- is_avro = _reader.is_avro
- +UnknownType = _schema.UnknownType
- +load_schema = _schema.load_schema
- +
- __all__ = [
- n for n in locals().keys() if not n.startswith('_')
- ] + ['__version__']
- diff --git a/fastavro/_reader_py.py b/fastavro/_reader_py.py
- index 3b41f3c..2b94890 100644
- --- a/fastavro/_reader_py.py
- +++ b/fastavro/_reader_py.py
- @@ -16,7 +16,7 @@ from uuid import UUID
- from .six import (
- MemoryIO, xrange, btou, utob, iteritems, is_str, str2ints, fstint
- )
- -from ._schema_py import (
- +from .schema import (
- extract_record_type, acquaint_schema, populate_schema_defs,
- extract_logical_type
- )
- diff --git a/fastavro/_schema_py.py b/fastavro/_schema_py.py
- deleted file mode 100644
- index 783389e..0000000
- --- a/fastavro/_schema_py.py
- +++ /dev/null
- @@ -1,163 +0,0 @@
- -# cython: auto_cpdef=True
- -
- -from os import path
- -import json
- -
- -from ._schema_public import PRIMITIVES, SCHEMA_DEFS, UnknownType
- -
- -
- -def extract_record_type(schema):
- - if isinstance(schema, dict):
- - return schema['type']
- -
- - if isinstance(schema, list):
- - return 'union'
- -
- - return schema
- -
- -
- -def extract_logical_type(schema):
- - if not isinstance(schema, dict):
- - return None
- - d_schema = schema
- - rt = d_schema['type']
- - lt = d_schema.get('logicalType')
- - if lt:
- - # TODO: Building this string every time is going to be relatively slow.
- - return '{}-{}'.format(rt, lt)
- - return None
- -
- -
- -def schema_name(schema, parent_ns):
- - name = schema.get('name')
- - if not name:
- - return parent_ns, None
- -
- - namespace = schema.get('namespace', parent_ns)
- - if not namespace:
- - return namespace, name
- -
- - return namespace, '%s.%s' % (namespace, name)
- -
- -
- -def extract_named_schemas_into_repo(schema, repo, transformer, parent_ns=None):
- - if type(schema) == list:
- - for index, enum_schema in enumerate(schema):
- - namespaced_name = extract_named_schemas_into_repo(
- - enum_schema,
- - repo,
- - transformer,
- - parent_ns,
- - )
- - if namespaced_name:
- - schema[index] = namespaced_name
- - return
- -
- - if type(schema) != dict:
- - # If a reference to another schema is an unqualified name, but not one
- - # of the primitive types, then we should add the current enclosing
- - # namespace to reference name.
- - if schema not in PRIMITIVES and '.' not in schema and parent_ns:
- - schema = parent_ns + '.' + schema
- -
- - if schema not in repo:
- - raise UnknownType(schema)
- - return schema
- -
- - namespace, name = schema_name(schema, parent_ns)
- -
- - if name:
- - repo[name] = transformer(schema)
- -
- - schema_type = schema.get('type')
- - if schema_type == 'array':
- - namespaced_name = extract_named_schemas_into_repo(
- - schema['items'],
- - repo,
- - transformer,
- - namespace,
- - )
- - if namespaced_name:
- - schema['items'] = namespaced_name
- - return
- - if schema_type == 'map':
- - namespaced_name = extract_named_schemas_into_repo(
- - schema['values'],
- - repo,
- - transformer,
- - namespace,
- - )
- - if namespaced_name:
- - schema['values'] = namespaced_name
- - return
- - # Normal record.
- - for field in schema.get('fields', []):
- - namespaced_name = extract_named_schemas_into_repo(
- - field['type'],
- - repo,
- - transformer,
- - namespace,
- - )
- - if namespaced_name:
- - field['type'] = namespaced_name
- -
- -
- -def load_schema(schema_path):
- - '''
- - Returns a schema loaded from the file at `schema_path`.
- -
- - Will recursively load referenced schemas assuming they can be found in
- - files in the same directory and named with the convention
- - `<type_name>.avsc`.
- - '''
- - with open(schema_path) as fd:
- - schema = json.load(fd)
- - schema_dir, schema_file = path.split(schema_path)
- - return _load_schema(schema, schema_dir)
- -
- -
- -def _reader():
- - # FIXME: This is due to circular depedency, find a better way
- - try:
- - from . import _reader as reader
- - except ImportError:
- - from . import reader
- -
- - return reader
- -
- -
- -def _load_schema(schema, schema_dir):
- - try:
- - _reader().acquaint_schema(schema)
- - except UnknownType as e:
- - try:
- - avsc = path.join(schema_dir, '%s.avsc' % e.name)
- - load_schema(avsc)
- - except IOError:
- - raise e
- - _load_schema(schema, schema_dir)
- - return schema
- -
- -
- -def populate_schema_defs(schema, repo=None):
- - repo = SCHEMA_DEFS if repo is None else repo
- - extract_named_schemas_into_repo(
- - schema,
- - repo,
- - lambda schema: schema,
- - )
- -
- -
- -def acquaint_schema(schema,
- - repo=None,
- - reader_schema_defs=None):
- - """Extract schema in repo (default READERS)"""
- - repo = _reader().READERS if repo is None else repo
- - reader_schema_defs = \
- - SCHEMA_DEFS if reader_schema_defs is None else reader_schema_defs
- - extract_named_schemas_into_repo(
- - schema,
- - repo,
- - lambda schema: lambda fo, _, r_schema: _reader().read_data(
- - fo, schema, reader_schema_defs.get(r_schema)),
- - )
- diff --git a/fastavro/schema.py b/fastavro/schema.py
- index 00b72bb..783389e 100644
- --- a/fastavro/schema.py
- +++ b/fastavro/schema.py
- @@ -1,6 +1,163 @@
- -from ._schema_public import UnknownType # noqa: F401
- +# cython: auto_cpdef=True
- -try:
- - from ._schema import acquaint_schema, load_schema # noqa: F401
- -except ImportError:
- - from ._schema_py import acquaint_schema, load_schema # noqa: F401
- +from os import path
- +import json
- +
- +from ._schema_public import PRIMITIVES, SCHEMA_DEFS, UnknownType
- +
- +
- +def extract_record_type(schema):
- + if isinstance(schema, dict):
- + return schema['type']
- +
- + if isinstance(schema, list):
- + return 'union'
- +
- + return schema
- +
- +
- +def extract_logical_type(schema):
- + if not isinstance(schema, dict):
- + return None
- + d_schema = schema
- + rt = d_schema['type']
- + lt = d_schema.get('logicalType')
- + if lt:
- + # TODO: Building this string every time is going to be relatively slow.
- + return '{}-{}'.format(rt, lt)
- + return None
- +
- +
- +def schema_name(schema, parent_ns):
- + name = schema.get('name')
- + if not name:
- + return parent_ns, None
- +
- + namespace = schema.get('namespace', parent_ns)
- + if not namespace:
- + return namespace, name
- +
- + return namespace, '%s.%s' % (namespace, name)
- +
- +
- +def extract_named_schemas_into_repo(schema, repo, transformer, parent_ns=None):
- + if type(schema) == list:
- + for index, enum_schema in enumerate(schema):
- + namespaced_name = extract_named_schemas_into_repo(
- + enum_schema,
- + repo,
- + transformer,
- + parent_ns,
- + )
- + if namespaced_name:
- + schema[index] = namespaced_name
- + return
- +
- + if type(schema) != dict:
- + # If a reference to another schema is an unqualified name, but not one
- + # of the primitive types, then we should add the current enclosing
- + # namespace to reference name.
- + if schema not in PRIMITIVES and '.' not in schema and parent_ns:
- + schema = parent_ns + '.' + schema
- +
- + if schema not in repo:
- + raise UnknownType(schema)
- + return schema
- +
- + namespace, name = schema_name(schema, parent_ns)
- +
- + if name:
- + repo[name] = transformer(schema)
- +
- + schema_type = schema.get('type')
- + if schema_type == 'array':
- + namespaced_name = extract_named_schemas_into_repo(
- + schema['items'],
- + repo,
- + transformer,
- + namespace,
- + )
- + if namespaced_name:
- + schema['items'] = namespaced_name
- + return
- + if schema_type == 'map':
- + namespaced_name = extract_named_schemas_into_repo(
- + schema['values'],
- + repo,
- + transformer,
- + namespace,
- + )
- + if namespaced_name:
- + schema['values'] = namespaced_name
- + return
- + # Normal record.
- + for field in schema.get('fields', []):
- + namespaced_name = extract_named_schemas_into_repo(
- + field['type'],
- + repo,
- + transformer,
- + namespace,
- + )
- + if namespaced_name:
- + field['type'] = namespaced_name
- +
- +
- +def load_schema(schema_path):
- + '''
- + Returns a schema loaded from the file at `schema_path`.
- +
- + Will recursively load referenced schemas assuming they can be found in
- + files in the same directory and named with the convention
- + `<type_name>.avsc`.
- + '''
- + with open(schema_path) as fd:
- + schema = json.load(fd)
- + schema_dir, schema_file = path.split(schema_path)
- + return _load_schema(schema, schema_dir)
- +
- +
- +def _reader():
- + # FIXME: This is due to circular depedency, find a better way
- + try:
- + from . import _reader as reader
- + except ImportError:
- + from . import reader
- +
- + return reader
- +
- +
- +def _load_schema(schema, schema_dir):
- + try:
- + _reader().acquaint_schema(schema)
- + except UnknownType as e:
- + try:
- + avsc = path.join(schema_dir, '%s.avsc' % e.name)
- + load_schema(avsc)
- + except IOError:
- + raise e
- + _load_schema(schema, schema_dir)
- + return schema
- +
- +
- +def populate_schema_defs(schema, repo=None):
- + repo = SCHEMA_DEFS if repo is None else repo
- + extract_named_schemas_into_repo(
- + schema,
- + repo,
- + lambda schema: schema,
- + )
- +
- +
- +def acquaint_schema(schema,
- + repo=None,
- + reader_schema_defs=None):
- + """Extract schema in repo (default READERS)"""
- + repo = _reader().READERS if repo is None else repo
- + reader_schema_defs = \
- + SCHEMA_DEFS if reader_schema_defs is None else reader_schema_defs
- + extract_named_schemas_into_repo(
- + schema,
- + repo,
- + lambda schema: lambda fo, _, r_schema: _reader().read_data(
- + fo, schema, reader_schema_defs.get(r_schema)),
- + )
- diff --git a/fastavro/writer.py b/fastavro/writer.py
- index 40fe532..8da52e9 100644
- --- a/fastavro/writer.py
- +++ b/fastavro/writer.py
- @@ -9,7 +9,7 @@
- from .six import utob, MemoryIO, long, is_str, iterkeys, itervalues, \
- iteritems, mk_bits
- from ._reader_py import HEADER_SCHEMA, SYNC_SIZE, MAGIC
- -from ._schema_py import (
- +from .schema import (
- extract_named_schemas_into_repo, extract_record_type,
- extract_logical_type
- )
- diff --git a/tests/test_fastavro.py b/tests/test_fastavro.py
- index 4962f64..072cd36 100644
- --- a/tests/test_fastavro.py
- +++ b/tests/test_fastavro.py
- @@ -1,4 +1,5 @@
- -import fastavro.schema
- +import fastavro
- +from fastavro import UnknownType, acquaint_schema, load_schema
- from fastavro.six import MemoryIO
- import pytest
- @@ -84,7 +85,7 @@ def test_not_avro():
- def test_acquaint_schema_rejects_undleclared_name():
- try:
- - fastavro.schema.acquaint_schema({
- + acquaint_schema({
- "type": "record",
- "fields": [{
- "name": "left",
- @@ -92,13 +93,13 @@ def test_acquaint_schema_rejects_undleclared_name():
- }]
- })
- assert False, 'Never raised'
- - except fastavro.schema.UnknownType as e:
- + except UnknownType as e:
- assert 'Thinger' == e.name
- def test_acquaint_schema_rejects_unordered_references():
- try:
- - fastavro.schema.acquaint_schema({
- + acquaint_schema({
- "type": "record",
- "fields": [{
- "name": "left",
- @@ -116,12 +117,12 @@ def test_acquaint_schema_rejects_unordered_references():
- }]
- })
- assert False, 'Never raised'
- - except fastavro.schema.UnknownType as e:
- + except UnknownType as e:
- assert 'Thinger' == e.name
- def test_acquaint_schema_accepts_nested_namespaces():
- - fastavro.schema.acquaint_schema({
- + acquaint_schema({
- "namespace": "com.example",
- "name": "Outer",
- "type": "record",
- @@ -149,7 +150,7 @@ def test_acquaint_schema_accepts_nested_namespaces():
- def test_acquaint_schema_resolves_references_from_unions():
- - fastavro.schema.acquaint_schema({
- + acquaint_schema({
- "namespace": "com.other",
- "name": "Outer",
- "type": "record",
- @@ -175,7 +176,7 @@ def test_acquaint_schema_resolves_references_from_unions():
- def test_acquaint_schema_accepts_nested_records_from_arrays():
- - fastavro.schema.acquaint_schema({
- + acquaint_schema({
- "fields": [
- {
- "type": {
- @@ -202,15 +203,15 @@ def test_acquaint_schema_accepts_nested_records_from_arrays():
- def test_compose_schemas():
- schema_path = join(data_dir, 'Parent.avsc')
- - schema = fastavro.schema.load_schema(schema_path)
- + schema = load_schema(schema_path)
- assert isinstance(schema, dict)
- assert 'Child' in fastavro._reader.READERS
- def test_missing_schema():
- schema_path = join(data_dir, 'ParentMissingChild.avsc')
- - with pytest.raises(fastavro.schema.UnknownType):
- - fastavro.schema.load_schema(schema_path)
- + with pytest.raises(UnknownType):
- + load_schema(schema_path)
- def test_schemaless_writer_and_reader():
Add Comment
Please, Sign In to add comment