Commit fc188a38 authored by Teake Nutma's avatar Teake Nutma
Browse files

Merge branch 'release/v0.4'

parents cc2c96ff be30795d
Pipeline #681 passed with stage
in 58 seconds
[bumpversion]
current_version = 0.3.1
current_version = 0.4.0
parse = (?P<major>\d+)(\.(?P<minor>\d+))?(\.(?P<patch>\d+))?(\.(?P<release>[a-z]+))?
serialize =
serialize =
{major}.{minor}.{patch}.{release}
{major}.{minor}.{release}
{major}.{release}
......@@ -12,9 +12,11 @@ message = Bump version from {current_version} to {new_version}
[bumpversion:file:setup.py]
[bumpversion:file:datamodelvalidator/__main__.py]
[bumpversion:part:release]
optional_value = production
values =
values =
dev
production
......@@ -26,6 +26,7 @@ and the Euclid-specific schemas for schemas.
- EUCLID-STD-E-HB-80-503_0450 Xsd.Mngt.DictionaryNamespace
- EUCLID-STD-E-HB-80-503_0452 Xsd.Mngt.Unqualified
- EUCLID-STD-E-HB-80-503_0453 Xsd.Mngt.TargetNamespaceNaming
- EUCLID-STD-E-HB-80-503_0530 Data.Namespace
- EUCLID-STD-E-500_0030 Dictionary tree.DataProduct
## Requirements
......
......@@ -7,6 +7,7 @@ from .validator import Validator
@click.command()
@click.argument('path', type=click.Path(exists=True))
@click.version_option(version='0.4.0')
def validate_and_echo(path: str) -> None:
"""Performs validation of the Euclid Data Model.
......@@ -22,7 +23,7 @@ def validate_and_echo(path: str) -> None:
for issue in validator.issue_report.issues:
click.secho(issue.level_name, fg=issue.level_color, nl=False)
click.echo("\t{path}:{line} {message}".format(
path=issue.file.relative_path,
path=issue.item.relative_path,
line=issue.line if issue.line else '',
message=issue.message
))
......
import abc
import functools
import os
import re
from typing import Iterable
class DataModel:
class DataModelError(Exception):
def __init__(self, message: str, item: 'DataModelItem'):
super().__init__()
self.message = message
self.item = item
@functools.total_ordering
class DataModelItem(abc.ABC):
"""Abstract class for items of Euclid datamodels (e.g. subdirectories and files)."""
def __init__(self, path: str, parent: 'DataModel' = None) -> None:
self.path = os.path.abspath(path)
self.parent = parent
if parent is None:
self.relative_path = os.path.relpath(self.path, self.path)
else:
self.relative_path = os.path.relpath(self.path, parent.path)
if not os.path.exists(self.path):
raise DataModelError("File or directory does not exist.", self)
def __eq__(self, other: 'DataModelItem') -> bool:
return self.path == other.path
def __hash__(self) -> int:
return hash(self.path)
def __gt__(self, other: 'DataModelItem') -> bool:
return self.path > other.path
class DataModel(DataModelItem):
"""Represents a Euclid datamodel."""
def __init__(self, path: str) -> None:
......@@ -12,61 +45,47 @@ class DataModel:
Args:
path: Path to a directory containing a Euclid datamodel.
Raises:
DataModelError: When the given directory does not exist.
"""
self.path = os.path.abspath(path)
super().__init__(path)
def dictionary_files(self) -> Iterable['DataModelFile']:
"""Returns all dictionary files."""
return self.__filtered_files('dictionary', extension='xsd')
self.dictionary = DataModelDir(self.__subdir('dictionary'), self)
self.dpd = DataModelDir(self.__subdir('dpd'), self)
self.instances = DataModelDir(self.__subdir('instances'), self)
self.interfaces = DataModelDir(self.__subdir('interfaces'), self)
def dpd_files(self) -> Iterable['DataModelFile']:
"""Returns all data product definition files."""
return self.__filtered_files('dpd', extension='xsd')
def __subdir(self, relative_path: str) -> str:
return os.path.join(self.path, relative_path)
def __filtered_files(self, relative_path: str, extension: str = None) -> Iterable['DataModelFile']:
if extension is not None:
extension_pattern = r'^.*\.{}$'.format(extension)
extension_regex = re.compile(extension_pattern)
# Recursively walk the given path
path = os.path.join(self.path, relative_path)
for dirpath, _, filenames in os.walk(path):
for filename in filenames:
if extension is not None and not extension_regex.match(filename):
continue
abs_filename = os.path.join(dirpath, filename)
yield DataModelFile(abs_filename, self)
def __eq__(self, other: 'DataModel') -> bool:
return self.path == other.path
@functools.total_ordering
class DataModelFile:
"""Represents a file within a Euclid datamodel."""
class DataModelChild(DataModelItem):
def __init__(self, path: str, parent: DataModel):
"""DataModelFile constructor.
def __init__(self, path: str, parent: DataModel) -> None:
"""DataModelChild constructor.
Args:
path: Absoluate path to a file within a Euclid datamodel.
parent: The parent datamodel of this file.
path: Path to a an item within a Euclid data model.
parent: The parent data model.
Raises:
ValueError: When the file path does not match the parent DM path.
DataModelError: When the item does not exist or does not belong to the given data model.
"""
super().__init__(path, parent=parent)
if os.path.commonpath([parent.path, path]) != parent.path:
raise ValueError("{file} does not belong to data model {dm}".format(file=self, dm=parent.path))
raise DataModelError("Item does not belong to parent data model.", self)
self.path = path
self.parent = parent
self.relative_path = os.path.relpath(path, parent.path)
def __eq__(self, other: 'DataModelFile') -> bool:
return self.path == other.path and self.parent == other.parent
class DataModelDir(DataModelChild):
"""Represents a directory within a Euclid datamodel."""
def __hash__(self) -> int:
return hash(self.path)
def files(self) -> Iterable['DataModelFile']:
"""Returns all files within this directory."""
for dirpath, _, filenames in os.walk(self.path):
for filename in filenames:
abs_filename = os.path.join(dirpath, filename)
yield DataModelFile(abs_filename, self.parent)
def __gt__(self, other: 'DataModelFile') -> bool:
return self.relative_path > other.relative_path
class DataModelFile(DataModelChild):
"""Represents a file within a Euclid datamodel."""
pass
......@@ -2,7 +2,7 @@ import abc
import functools
import os
import re
from typing import Iterable
from typing import Iterable, Optional
from lxml import etree
......@@ -73,65 +73,69 @@ class FilenameValidator(FileValidator):
return True
class SchemaValidator(FileValidator):
"""Abstract generic file validator that validates against a specificed XSD schema."""
class _XmlSchemaValidator(FileValidator):
def __init__(self, xsd_schema_path: str = None) -> None:
super().__init__()
if xsd_schema_path is None:
path = os.path.dirname(os.path.abspath(__file__))
self.__xsd_schema_path = os.path.join(path, 'schemas')
else:
self.__xsd_schema_path = os.path.abspath(xsd_schema_path)
@abc.abstractmethod
def validate(self, file: DataModelFile) -> bool:
pass
def _validate_with_schema(self, file: DataModelFile, schema: str) -> True:
try:
doc = etree.parse(file.path)
except etree.XMLSyntaxError as exception:
self.issue_report.add_issue(str(exception), file)
return False
schema = self.__get_validating_schema(schema)
schema_location = self._get_schema_location(doc, file)
if schema_location is None:
return False
schema = self.__get_validating_schema(schema_location, file)
if schema is None:
return False
is_valid = schema.validate(doc)
for error in schema.error_log:
error_file = DataModelFile(error.filename, file.parent)
self.issue_report.add_issue(error.message, error_file, line=error.line)
return is_valid
@abc.abstractmethod
def _get_schema_location(self, doc, file: DataModelFile) -> Optional[str]:
pass
@functools.lru_cache()
def __get_validating_schema(self, filename: str) -> etree.XMLSchema:
schema_file = os.path.join(self.__xsd_schema_path, filename)
return etree.XMLSchema(file=schema_file)
def __get_validating_schema(self, schema_location: str, file: DataModelFile) -> Optional[etree.XMLSchema]:
try:
return etree.XMLSchema(file=schema_location)
except etree.XMLSchemaParseError:
self.issue_report.add_issue("Cannot validate due to invalid schema.", file)
return None
class DictionaryFileValidator(SchemaValidator):
class DictionaryFileValidator(_XmlSchemaValidator):
"""Validates dictionary files."""
def validate(self, file: DataModelFile) -> bool:
return self._validate_with_schema(file, 'dictionary.xsd')
def _get_schema_location(self, doc, file: DataModelFile):
return os.path.join(os.path.dirname(os.path.abspath(__file__)), 'schemas', 'dictionary.xsd')
class DpdFileValidator(SchemaValidator):
class DpdFileValidator(_XmlSchemaValidator):
"""Validates dpd files."""
def validate(self, file: DataModelFile) -> bool:
return self._validate_with_schema(file, 'dpd.xsd')
def _get_schema_location(self, doc, file: DataModelFile):
return os.path.join(os.path.dirname(os.path.abspath(__file__)), 'schemas', 'dpd.xsd')
class XsdFileValidator(SchemaValidator):
"""Checks if the XSD is parsable and validates against the unmodified W3C XML schema."""
class InterfacesFileValidator(_XmlSchemaValidator):
"""Validates dpd files."""
def validate(self, file: DataModelFile) -> bool:
validities = [
self._validate_with_schema(file, 'XMLSchema.xsd'),
self._parse_xsd(file)
]
return all(validities)
def _get_schema_location(self, doc, file: DataModelFile):
return os.path.join(os.path.dirname(os.path.abspath(__file__)), 'schemas', 'interfaces.xsd')
def _parse_xsd(self, file: DataModelFile) -> bool:
class XsdFileValidator(FileValidator):
"""Checks if the XSD is parsable."""
def validate(self, file: DataModelFile) -> bool:
try:
etree.XMLSchema(file=file.path)
except etree.XMLSchemaParseError as exception:
......@@ -140,3 +144,27 @@ class XsdFileValidator(SchemaValidator):
self.issue_report.add_issue(error.message, error_file, line=error.line)
return False
return True
class InstanceFileValidator(_XmlSchemaValidator):
"""Validates instance files."""
def _get_schema_location(self, doc: etree.ElementBase, file: DataModelFile) -> Optional[str]:
# Implements EUCLID-STD-E-HB-80-503_0530 Data.Namespace.
# Though ideally the dpd schemas should mandate the xsi:schemaLocation attribute of the root element.
xsi = "http://www.w3.org/2001/XMLSchema-instance"
element_namespace = doc.xpath('namespace-uri(/*)')
locations_attrs = doc.xpath("/*/@xsi:schemaLocation", namespaces={'xsi': xsi})
if not locations_attrs:
self.issue_report.add_issue("Missing schemaLocation attribute.", file)
return None
namespace, location = locations_attrs[0].strip().split()
if namespace != element_namespace:
self.issue_report.add_issue("Namespace in schemaLocation does not match root namespace.", file)
return None
dirname = os.path.dirname(file.path)
return os.path.join(dirname, location)
import functools
from typing import Iterable
from .datamodel import DataModelFile
from .datamodel import DataModelItem
@functools.total_ordering
......@@ -25,17 +25,17 @@ class Issue:
NOTSET: 'NOTSET',
}
def __init__(self, message: str, file: DataModelFile, level: int = None, line: int = None):
def __init__(self, message: str, item: DataModelItem, level: int = None, line: int = None):
"""Issue constructor.
Args:
message: Description of the issue.
file: Datamodel file in which the issue occurs.
item: Datamodel item for which the issue occurs.
level: Severity of the issue.
line: Line number in which the issue occurs. None if not applicable.
"""
self.message = message
self.file = file
self.item = item
self.line = line
if level is None:
self.level = self.DEFAULT
......@@ -59,7 +59,7 @@ class Issue:
@property
def _key(self):
"""Returns a key that can be used for comparison and hashing."""
return self.file, self.line if self.line else 0, self.message
return self.item, self.line if self.line else 0, self.message
def __eq__(self, other: 'Issue') -> bool:
return self._key == other._key
......@@ -77,16 +77,16 @@ class IssueReport:
def __init__(self):
self.__issues = set()
def add_issue(self, message: str, file: DataModelFile, level: int = None, line: int = None):
def add_issue(self, message: str, item: DataModelItem, level: int = None, line: int = None):
"""Convenience method for adding an issue.
Args:
message: Description of the issue.
file: Datamodel file in which the issue occurs.
item: Datamodel item for which the issue occurs.
level: Severity of the issue.
line: Line number in which the issue occurs. None if not applicable.
"""
issue = Issue(message, file, level=level, line=line)
issue = Issue(message, item, level=level, line=line)
self.add(issue)
def add(self, issue: Issue):
......
<?xml version='1.0' encoding='UTF-8'?>
<xs:schema
targetNamespace="http://www.w3.org/2001/XMLSchema"
xmlns="http://www.w3.org/2001/XMLSchema"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
elementFormDefault="qualified"
>
<xs:redefine schemaLocation="basedm.xsd">
<xs:group name="dmContent">
<xs:sequence>
<xs:element name="element" type="interfaceElement" maxOccurs="unbounded"/>
</xs:sequence>
</xs:group>
</xs:redefine>
<xs:complexType name="interfaceElement">
<xs:sequence>
<xs:element ref="xs:annotation" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="name" type="xs:interfaceName" use="required"/>
<xs:attribute name="type" type="xs:QName" use="required"/>
</xs:complexType>
<xs:simpleType name="interfaceName">
<xs:restriction base="xs:QName">
<xs:pattern value="[A-Z]+[a-zA-Z0-9]*"/>
</xs:restriction>
</xs:simpleType>
</xs:schema>
import os
from typing import Iterable
from .datamodel import DataModel
from .issues import IssueReportAware
from .datamodel import DataModel, DataModelDir, DataModelError
from .filevalidators import (
DictionaryFileValidator, XsdFileValidator, DpdFileValidator, FilenameValidator, FileValidatorComposite
DictionaryFileValidator, XsdFileValidator, DpdFileValidator, FileValidator,
FilenameValidator, FileValidatorComposite, InstanceFileValidator, InterfacesFileValidator
)
from .issues import IssueReportAware
class Validator(IssueReportAware):
......@@ -24,7 +26,11 @@ class Validator(IssueReportAware):
Args:
path: Path to the datamodel to validate
"""
dm = DataModel(path)
try:
dm = DataModel(path)
except DataModelError as error:
self.issue_report.add_issue(error.message, error.item)
return False
return self.validate(dm)
def validate(self, dm: DataModel) -> bool:
......@@ -33,23 +39,31 @@ class Validator(IssueReportAware):
Args
dm: The datamodel to validate.
"""
dict_validator = FileValidatorComposite()
dict_validator.issue_report = self.issue_report
dict_validator.add_all([
FilenameValidator(check_strict=True),
DictionaryFileValidator(),
XsdFileValidator()
])
dpd_validator = FileValidatorComposite()
dpd_validator.issue_report = self.issue_report
dpd_validator.add_all([
FilenameValidator(check_strict=False),
DpdFileValidator(),
XsdFileValidator()
])
dict_validity = dict_validator.validate_all(dm.dictionary_files())
dpd_validity = dpd_validator.validate_all(dm.dpd_files())
return dict_validity and dpd_validity
validators = {
dm.dictionary: [
FilenameValidator(check_strict=True),
DictionaryFileValidator(),
XsdFileValidator()
],
dm.dpd: [
FilenameValidator(check_strict=False),
DpdFileValidator(),
XsdFileValidator()
],
dm.instances: [
InstanceFileValidator()
],
dm.interfaces: [
FilenameValidator(check_strict=True),
InterfacesFileValidator(),
XsdFileValidator()
]
}
validities = [self.__validate_datamodeldir(d, v) for d, v in validators.items()]
return all(validities)
def __validate_datamodeldir(self, dm_dir: DataModelDir, validators: Iterable[FileValidator]) -> bool:
composite = FileValidatorComposite()
composite.issue_report = self.issue_report
composite.add_all(validators)
return composite.validate_all(dm_dir.files())
......@@ -8,7 +8,7 @@ if sys.version_info < (3, 5):
setup(
name='datamodelvalidator',
version='0.3.1',
version='0.4.0',
description='Performs validation of the Euclid Data Model',
license='LGPL-3.0+',
packages=find_packages(exclude=('tests',)),
......
<?xml version="1.0" encoding="UTF-8"?>
<le1:DpdNirRawFrame
xmlns:le1="http://euclid.esa.org/schema/dpd/le1"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://euclid.esa.org/schema/dpd/le1 ../dpd/euc-test-le1-NirRawFrame-ok.xsd">
<Header/>
</le1:DpdNirRawFrame>
<?xml version="1.0" encoding="UTF-8"?>
<DpdNirRawFrame>
<Header/>
<Data/>
</DpdNirRawFrame>
<?xml version="1.0" encoding="UTF-8"?>
<le1:DpdNirRawFrame
xmlns:le1="http://euclid.esa.org/schema/dpd/le1"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://euclid.esa.org/schema/dpd/le1 ../dpd/euc-test-le1-NirRawFrame-ok.xsd">
<Header/>
<Data/>
</le1:DpdNirRawFrame>
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema targetNamespace="http://euclid.esa.org/schema/interfaces/bas/cat" xmlns="http://euclid.esa.org/schema/interfaces/bas/cat"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:cat="http://euclid.esa.org/schema/bas/cat"
elementFormDefault="unqualified" version="0.1">
<xs:import namespace="http://euclid.esa.org/schema/bas/cat"
schemaLocation="../dictionary/euc-test-cat-ok.xsd"/>
<xs:element name="catalogContainer" type="cat:catalogProperty"/>
</xs:schema>
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema targetNamespace="http://euclid.esa.org/schema/interfaces/bas/cat" xmlns="http://euclid.esa.org/schema/interfaces/bas/cat"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:cat="http://euclid.esa.org/schema/bas/cat"
elementFormDefault="unqualified" version="0.1">
<xs:import namespace="http://euclid.esa.org/schema/bas/cat"
schemaLocation="../dictionary/euc-test-cat-ok.xsd"/>
<xs:element name="CatalogContainer" type="cat:catalogProperty"/>
</xs:schema>
......@@ -15,6 +15,16 @@ def dpd_validator() -> fv.FileValidator:
return fv.DpdFileValidator()
@pytest.fixture()
def instances_validator() -> fv.FileValidator:
return fv.InstanceFileValidator()
@pytest.fixture()
def interfaces_validator() -> fv.FileValidator:
return fv.InterfacesFileValidator()
@pytest.mark.parametrize(
'relative_path, is_valid',
[
......@@ -52,6 +62,29 @@ def test_dpd_validity(relative_path: str, is_valid: bool, dpd_validator: fv.File
assert_file_validity(relative_path, is_valid, dpd_validator)
@pytest.mark.parametrize(
'relative_path, is_valid',
[
('instances/euc-test-le1-NirRawFrame-ok.xsd', True),
('instances/euc-test-le1-NirRawFrame-missingdata.xsd', False),
('instances/euc-test-le1-NirRawFrame-missingschemalocation.xsd', False)
]
)
def test_instances_validity(relative_path: str, is_valid: bool, instances_validator: fv.FileValidator):
assert_file_validity(relative_path, is_valid, instances_validator)
@pytest.mark.parametrize(
'relative_path, is_valid',
[
('interfaces/euc-test-cat-interface-ok.xsd', True),
('interfaces/euc-test-cat-interface-elementname.xsd', False)
]
)
def test_interfaces_validity(relative_path: str, is_valid: bool, interfaces_validator: fv.FileValidator):
assert_file_validity(relative_path, is_valid, interfaces_validator)