"""
Factory configuration, does nothing but can be used to instanciate a server.
"""
import importlib.util
import os
import pprint
from copy import deepcopy
from types import MappingProxyType
from typing import Self, Type
import orjson
from rodi import Container
from whistle import IAsyncEventDispatcher
from harp import get_logger
from harp.commandline.server import CommonServerOptions
from harp.config.application import Application
from harp.utils.identifiers import is_valid_dotted_identifier
logger = get_logger(__name__)
[docs]
def get_application_class_name(name):
return "".join(map(lambda x: x.title().replace("_", ""), name.rsplit(".", 1)[-1:])) + "Application"
def _resolve_application_aliases(spec):
if "." not in spec:
_candidate = ".".join(("harp_apps", spec))
if importlib.util.find_spec(_candidate):
return _candidate
return spec
[docs]
class Config:
DEFAULT_APPLICATIONS = [
"http_client",
"proxy",
"dashboard",
"sqlalchemy_storage",
"telemetry",
"janitor",
"harp_apps.contrib.sentry", # todo: allow to extend application list in config file without overriding all
]
[docs]
def __init__(self, settings=None, /, applications=None):
self._raw_settings = {"applications": applications or []} | (settings or {})
self._raw_settings["applications"] = [
_resolve_application_aliases(app) for app in self._raw_settings["applications"]
]
self._validated_settings = None
self._debug_applications = set()
self._application_types = {}
self._applications = []
self._disabled_applications = set()
def __eq__(self, other: Self):
return self.settings == other.settings
def __repr__(self):
return (
f"<{type(self).__name__}{'' if self._validated_settings else '*'} "
f"settings={orjson.dumps(self.settings.copy()).decode()}>"
)
@property
def settings(self):
if self._validated_settings is not None:
return self._validated_settings
return self._raw_settings
@property
def applications(self):
if not self._validated_settings:
raise RuntimeError("Configuration not validated.")
return self._validated_settings.get("applications", [])
[docs]
def reset(self):
self._validated_settings = None
[docs]
def add_defaults(self):
for application in type(self).DEFAULT_APPLICATIONS:
self.add_application(application)
[docs]
def set(self, key, value):
if not is_valid_dotted_identifier(key):
raise ValueError(f"Invalid settings key: {key}")
self.reset()
bits: list[str] = key.split(".")
current = self._raw_settings
for bit in bits[:-1]:
current = current.setdefault(bit, {})
current[bits[-1]] = value
[docs]
def add_application(self, name, /, *, debug=False):
if not is_valid_dotted_identifier(name):
raise ValueError(f"Invalid application name: {name}")
self.reset()
name = _resolve_application_aliases(name)
if name not in self._raw_settings["applications"]:
self._raw_settings["applications"].append(name)
if debug:
self._debug_applications.add(name)
[docs]
def remove_application(self, name):
if not is_valid_dotted_identifier(name):
raise ValueError(f"Invalid application name: {name}")
self.reset()
name = _resolve_application_aliases(name)
if name in self._raw_settings["applications"]:
self._raw_settings["applications"].remove(name)
self._debug_applications.discard(name)
[docs]
def read_env(self, options: CommonServerOptions, /):
"""
Parses sys.argv-like arguments.
:param args:
:param files: list of filenames to load in order. Will happen after defaults env and files.
:return: argparse.Namespace
"""
for _enabled_application in options.enable or ():
self.add_application(_enabled_application)
for _disabled_application in options.disable or ():
self.remove_application(_disabled_application)
from config.common import MapSource
from config.env import EnvVars
from config.yaml import YAMLFile
from .builder import ConfigurationBuilder
builder = ConfigurationBuilder(
MapSource({}),
EnvVars(prefix="DEFAULT__HARP_"),
MapSource(self._raw_settings),
)
# load default system config (if present)
if os.path.exists("/etc/harp.yaml"):
builder.add_source(YAMLFile("/etc/harp.yaml"))
elif os.path.exists("/etc/harp.yml"):
builder.add_source(YAMLFile("/etc/harp.yml"))
builder.add_examples(options.examples)
builder.add_files(options.files)
builder.add_source(EnvVars(prefix="HARP_"))
builder.add_values(options.options or {})
self._raw_settings = builder.build().values
self._raw_settings.setdefault("proxy", {})
self._raw_settings["proxy"].setdefault("endpoints", [])
for k, v in (options.endpoints or {}).items():
_port, _url = v.split(":", 1)
self._raw_settings["proxy"]["endpoints"].append(
{
"name": k,
"port": int(_port),
"url": _url,
}
)
[docs]
def validate(self, /, *, allow_extraneous_settings=False, secure=False):
if self._validated_settings is None:
to_be_validated = deepcopy(self._raw_settings)
application_names = to_be_validated.pop("applications", [])
validated = {"applications": []}
# round 1: import applications and set defaults before validation
application_types, to_be_validated = self._validate_round_1_import_applications(
application_names, to_be_validated
)
for application in application_types:
validated["applications"].append(application.name)
applications, newly_validated = self._validate_round_2_extract_and_validate_settings(
application_types, to_be_validated, secure=secure
)
newly_validated.pop("applications", None) # not allowed xxx todo raise
validated |= newly_validated
if to_be_validated != {} and not allow_extraneous_settings:
logger.warning(f"Unknown settings remaining: {to_be_validated}")
# propagate for the world to use
self._validated_settings = MappingProxyType(validated)
self._applications = applications
logger.debug(f"Configuration validated: {pprint.pformat(self._validated_settings)}")
return self._validated_settings
[docs]
def get_application_type(self, name: str) -> Type[Application]:
if name not in self._application_types:
application_spec = importlib.util.find_spec(name)
if not application_spec:
raise ValueError(f'Unable to find application "{name}".')
try:
application_module = __import__(".".join((application_spec.name, "__app__")), fromlist=["*"])
except ModuleNotFoundError as exc:
raise ModuleNotFoundError(
f'A python package for application "{name}" was found but it is not a valid harp application. '
'Did you forget to add an "__app__.py"?'
) from exc
application_class_name = get_application_class_name(name)
if not hasattr(application_module, application_class_name):
raise AttributeError(
f'Application module for {name} does not contain a "{application_class_name}" class.'
)
self._application_types[application_spec.name] = getattr(application_module, application_class_name)
self._application_types[application_spec.name].name = application_spec.name
if name not in self._application_types:
raise RuntimeError(f'Unable to load application "{name}", application class definition not found.')
return self._application_types[name]
def _validate_round_1_import_applications(self, names, to_be_validated):
# note: to_be_validated is modified in place (is this right ?), but still returned to avoid confusion
if len(self._debug_applications):
print("DEBUG: Configuration > Round 1 (import applications) > START")
types = []
for _name in names:
_type = self.get_application_type(_name)
if _type.settings_namespace:
_settings = to_be_validated.get(_type.settings_namespace, None)
if (_defaults := _type.defaults(deepcopy(_settings) if _settings else None)) is not None:
to_be_validated[_type.settings_namespace] = _defaults
else:
pass # todo assert application explicitely opted in for no defaults ?
if _type.name in self._debug_applications:
print(f"DEBUG: {_type}::set_default_settings(...)")
types.append(_type)
if len(self._debug_applications):
print("DEBUG: Configuration > Round 1 (import applications) > END")
print(f"DEBUG: to_be_validated={pprint.pformat(to_be_validated)}")
return types, to_be_validated
def _validate_round_2_extract_and_validate_settings(self, application_types, to_be_validated, /, *, secure=False):
applications = []
validated = {}
# round 2: validate and extract settings
if len(self._debug_applications):
print("DEBUG: Configuration > Round 2 > START")
for application_type in application_types:
if application_type.settings_namespace:
if not application_type.supports(to_be_validated.get(application_type.settings_namespace, {})):
raise ValueError(
f'Application "{application_type.name}" is not configurable with the current configuration.'
)
application_settings = to_be_validated.pop(application_type.settings_namespace, None)
if application_type.settings_type:
application_settings = application_type.settings_type(**application_settings)
else:
application_settings = None
if application_type.name in self._debug_applications:
print(f"DEBUG: {application_type}::extract_settings() -> {application_settings}")
if application_settings is None and application_type.settings_namespace is not None:
raise ValueError(
f'Application "{application_type.name}" is not configurable with the current configuration '
f"(expected some configuration, got none)."
)
application = application_type(application_settings)
application_validated_settings = application.validate(secure=secure)
if application_type.settings_namespace:
validated |= {application_type.settings_namespace: application_validated_settings}
applications.append(application)
if len(self._debug_applications):
print("DEBUG: Configuration > Round 2 > END")
return applications, validated
[docs]
def register_events(self, dispatcher: IAsyncEventDispatcher):
self.validate()
for application in self._applications:
application.register_events(dispatcher)
[docs]
def register_services(self, container: Container):
self.validate()
for application in self._applications:
application.register_services(container)
[docs]
def serialize(self):
self.validate()
return orjson.dumps(self.settings.copy())
[docs]
@classmethod
def deserialize(cls, settings):
return cls(orjson.loads(settings))