"""
CheckedDict
-----------
A dictionary based on a default prototype. A :class:`CheckedDict` can only define
``key:value`` pairs which are already present in the default. It is possible to
define a docstring for each key and different restrictions for the values
regarding possible values, ranges and type. A CheckedDict is useful for
configuration settings.
If no mutable values are used, a CheckedDict is hashable
ConfigDict
----------
Based on :class:`CheckedDict`, a :class:`ConfigDict` is a persistent, unique dictionary. It is
saved under the config folder determined by the OS and it is updated with each
modification. It is useful for implementing configuration of a module / library
/ app, where there is a default/initial state and the user needs to be able to
configure global settings which must be persisted between sessions (similar to
the settings in an application)
Example
~~~~~~~
.. code::
from configdict import ConfigDict
config = ConfigDict("myproj.subproj")
config.addKey("keyA", 10, doc="documentaion of keyA")
config.addKey("keyB", 0.5, range=(0, 1))
config.addKey("keyC", "blue", choices=("blue", "red"),
doc="documentation of keyC")
config.load()
Alternativaly, a :class:`ConfigDict` or a :class:`CheckedDict` can be built
via a context manager::
with ConfigDict("plotting") as cfg:
# While building a config, __call__ is equivalent to addKey
cfg('backend', 'matplotlib', choices={'matlotlib'})
cfg('spectrogram.figsize', (24, 8))
cfg('spectrogram.maxfreq', 12000,
doc="Highest frequency in a spectrogram")
cfg('spectrogram.window', 'hamming', choices={'hamming', 'hanning'})
# no need to call .load, it is called automatically
A :class:`ConfigDict` can be created all at once
.. code::
config = ConfigDict("myapp",
default = {
'font-size': 10.0,
'font-family': "Monospace",
'port' : 9100,
},
validator = {
'font-size::range' : (8, 24),
'port::range' : (9000, 65000),
'font-family::choices' : {'Roboto', 'Monospace'},
'port': lambda cfg, port: checkPortAvailable(port)
},
docs = {
'port': 'The port number to listen to',
'font-size': 'The size of the font, in pixels'
}
)
This will create the dictionary and load any persisted version. Any saved
modifications will override the default values. Whenever the user changes any
value (via ``config[key] = newvalue``) the dictionary will be saved.
In all other respects a :class:`ConfigDict` behaves like a normal dictionary.
"""
from __future__ import annotations
import appdirs
import os
import json
import yaml
import logging
import sys
import re
import textwrap
import tempfile
from functools import cache
from types import FunctionType
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Optional, Any, Union, Callable, TypeVar, Set
validatefunc_t = Callable[[dict, str, Any], bool]
_CheckedDictT = TypeVar("_CheckedDictT", bound="CheckedDict")
_ConfigDictT = TypeVar("_ConfigDictT", bound="ConfigDict")
__all__ = ("CheckedDict",
"ConfigDict",
"getConfig",
"activeConfigs",
"configPathFromName")
logger = logging.getLogger("configdict")
_UNKNOWN = object()
class ReadOnlyError(Exception):
"""Raised when a dict is marked as read-only and the user attempts to modify it"""
_editHeaderWatch = (r'''# ****************************************************
# * Edit this file to modify the configuration *
# * When you are finished editing, save the file *
# ****************************************************
''')
_editHeaderPopup = (r'''# **********************************************************
# * Edit this file to modify the configuration *
# * Click OK on the popup dialog to finish the opeartion *
# **********************************************************
''')
def sortNatural(seq: list, key: Callable[[Any], str] | None = None) -> list:
"""
Sort a string sequence naturally
Sorts the sequence so that 'item1' and 'item2' are before 'item10'
Args:
seq: the sequence to sort
key: a function to convert an item in seq to a string
Examples
~~~~~~~~
>>> seq = ["e10", "e2", "f", "e1"]
>>> sorted(seq)
['e1', 'e10', 'e2', 'f']
>>> sortNatural(seq)
['e1', 'e2', 'e10', 'f']
>>> seq = [(2, "e10"), (10, "e2")]
>>> sortNatural(seq, key=lambda tup:tup[1])
[(10, 'e2'), (2, 'e10')]
"""
def convert(text: str):
return int(text) if text.isdigit() else text.lower()
def alphanum_key(key: str):
return [convert(c) for c in re.split('([0-9]+)', key)]
if key is not None:
return sorted(seq, key=lambda x: alphanum_key(key(x)))
return sorted(seq, key=alphanum_key)
def _asChoiceStr(x) -> str:
return f"'{x}'" if isinstance(x, str) else str(x)
def _makeReplacer(conditions: dict) -> Callable:
"""
Create a function to replace many subtrings at once
Args:
conditions: a dictionary mapping a string to its replacement
Example::
>>> replacer = makeReplacer({"&":"&", " ":"_", "(":"\\(", ")":"\\)"})
>>> replacer("foo & (bar)")
"foo_&_\(bar\)"
"""
rep = {re.escape(k): v for k, v in conditions.items()}
pattern = re.compile("|".join(rep.keys()))
return lambda txt: pattern.sub(lambda m: rep[re.escape(m.group(0))], txt)
_keyNormalizer = _makeReplacer({'.': '', '_': '', '-': ''})
@cache
def normalizeKey(key: str) -> str:
return _keyNormalizer(key.lower())
def _yamlComment(doc: Optional[str],
default: Any,
choices: Optional[set],
valuerange: Optional[tuple[float, float]],
valuetype: Optional[str],
maxwidth=80) -> str:
"""
This generated the yaml comments used when saving the config to yaml
Args:
doc: documentation for this key
default: the default value
choices: choices possible to this value
valuerange: a tuplet indicating a valid range for this value
valuetype: the type as string
maxwidth: the max. width of one line
Returns:
the generated comment as a string. It might contain multiple lines
"""
if all(_ is None for _ in (doc, default, choices, valuerange, valuetype)):
return ""
"""
# this is the documentation for bla
# default: xxx, choices: 10, 20, 30, type: int, range: 0.0 - 1.0
"""
lines = []
infoparts = [f"default: {default}"]
if doc:
if len(doc) < maxwidth:
lines.append(f"# {doc}")
else:
lines.extend("# " + line for line in textwrap.wrap(doc, maxwidth))
if choices:
valuetype = None
if valuetype:
infoparts.append(f"type: {valuetype}")
if choices:
infoparts.append(f"choices: {', '.join(map(str, choices))}")
if valuerange:
infoparts.append(f"range: {valuerange[0]} - {valuerange[1]}")
if infoparts:
lines.append("# ** " + ", ".join(infoparts))
return "\n".join(lines)
def _yamlValue(value) -> str:
if isinstance(value, tuple):
value = list(value)
s = yaml.dump(value, default_flow_style=True)
return s.replace("\n...\n", "")
def _typeName(t: str | type | tuple[type, ...]) -> str:
if isinstance(t, str):
return t
elif isinstance(t, type):
return t.__name__
elif isinstance(t, tuple):
return " | ".join(v.__name__ for v in t)
else:
raise TypeError(f"Expected a str, type or tuple of types, got {t}")
def _asRstLinkKey(key: str) -> str:
return key.replace(".", "_").replace(" ", "").lower()
def _asYaml(d: dict[str, Any],
doc: dict[str, str],
default: dict[str, Any],
validator: dict[str, Any] | None = None,
keys: list[str] | None = None,
advancedPrefix: str = '.'
) -> str:
lines = []
# detect if keys have advanced keys and they are all at the end
if keys:
items = [(k, d[k]) for k in keys]
else:
items = list(d.items())
firstAdvanced = next((i for i, item in enumerate(items) if item[0].startswith(advancedPrefix)), None)
if firstAdvanced is not None and all(k.startswith(advancedPrefix) for k, v in items[firstAdvanced:]):
addAdvancedSeparator = True
else:
addAdvancedSeparator = False
for key, value in items:
if addAdvancedSeparator and key.startswith(advancedPrefix):
addAdvancedSeparator = False
lines.append("\n"
"#####################################################\n"
"# Advanced Keys #\n"
"#####################################################\n")
if validator is not None:
choices = validator.get(f"{key}::choices")
valuerange = validator.get(f"{key}::range")
valuetype = validator.get(f"{key}::type")
else:
choices, valuerange, valuetype = None, None, None
valuetypestr = type(value).__name__ if valuetype is None else _typeName(valuetype)
comment = _yamlComment(doc=doc.get(key), default=default.get(key),
choices=choices, valuerange=valuerange,
valuetype=valuetypestr)
lines.append(comment)
l = f"{key}: {_yamlValue(value)}"
lines.append(l)
if not l.endswith("\n"):
lines.append("")
return "\n".join(lines)
def _htmlTable(rows: list, headers, maxwidths=None, rowstyles=None) -> str:
parts = []
_ = parts.append
_("<table>")
_("<thead>")
_("<tr>")
if maxwidths is None:
maxwidths = [0] * len(headers)
if rowstyles is None:
rowstyles = [None] * len(headers)
for colname in headers:
_(f'<th style="text-align:left">{colname}</th>')
_("</tr></thead><tbody>")
for row in rows:
_("<tr>")
for cell, maxwidth, rowstyle in zip(row, maxwidths, rowstyles):
if rowstyle is not None:
cell = f'<{rowstyle}>{cell}</{rowstyle}>'
if maxwidth > 0:
_(f'<td style="text-align:left;max-width:{maxwidth}px;">{cell}</td>')
else:
_(f'<td style="text-align:left">{cell}</td>')
_("</tr>")
_("</tbody></table>")
return "".join(parts)
def _checkDocs(docs: dict[str, str], keys: set[str]) -> bool:
ok = True
keyslist = list(keys)
for key in docs.keys():
if key not in keys:
likely = _bestMatches(text=key, options=keyslist, limit=16, minpercent=60)
logger.warning(f"Key {key} not defined. Did you mean {likely}?. \nPossible keys: {keys}")
ok = False
return ok
def _checkValidator(validatordict: dict[str, Any], defaultdict: dict[str, Any]) -> dict[str, Any]:
"""
Checks the validity of the validator itself, and makes any needed
postprocessing on the validator
Args:
validatordict: the validator dict
defaultdict: the dict containing defaults
Returns:
a postprocessed validator dict
"""
stripped_keys = {key.split("::")[0] for key in validatordict.keys()}
not_present = stripped_keys-defaultdict.keys()
if any(not_present):
notpres = ", ".join(sorted(not_present))
raise KeyError(f"The validator dict has keys not present "
f"in the defaultdict ({notpres})")
v = {}
for key, value in validatordict.items():
if key.endswith('::choices') and isinstance(value, (list, tuple)):
value = set(value)
v[key] = value
return v
def _isfloaty(value) -> bool:
return isinstance(value, (int, float)) or hasattr(value, '__float__')
def _openInStandardApp(path: str) -> None:
"""
Open path with the app defined to handle it by the user
at the os level (xdg-open in linux, start in win, open in osx)
"""
import subprocess
platform = sys.platform
if not os.path.exists(path):
raise RuntimeError(f"Trying to open '{path}', but file does not exist")
if platform == 'linux':
subprocess.call(["xdg-open", path])
elif platform == "win32":
# This function is only present in windows
os.startfile(path)
elif platform == "darwin":
subprocess.call(["open", path])
else:
raise RuntimeError(f"platform {platform} not supported")
def _notify(title: str, msg: str) -> None:
import subprocess
if sys.platform == "linux":
print(f"**Notify** {title}: {msg}")
subprocess.call(['notify-send', title, msg])
def _waitOnFileModified(path: str, timeout: float | None = None, notification='') -> bool:
try:
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
except ImportError:
logger.warning("watchdog is needed to be able to wait on file events. "
"Install via `pip install watchdog`")
_waitForClick()
return False
directory, base = os.path.split(path)
if not directory:
directory = "."
handler = PatternMatchingEventHandler([base], ignore_patterns="",
ignore_directories=True, case_sensitive=True)
observer = Observer()
modified = False
def on_modified(event):
nonlocal modified
modified = True
observer.stop()
handler.on_modified = on_modified
observer.schedule(handler, path=directory, recursive=False)
observer.start()
if timeout is None:
timeout = 60 * 20 # 20 minutes
observer.join(timeout)
if notification:
if "::" in notification:
title, body = notification.split("::")
else:
title, body = "Edit", notification
_notify(title, body)
return modified
def _showInfoDialog(msg: str, title: str = None) -> None:
"""
Creates a simple confirmation dialog box
Args:
msg: the message to display
title: a title for the window
"""
import tkinter as tk
from tkinter import messagebox
window = tk.Tk()
window.wm_withdraw()
messagebox.showinfo(title, msg)
window.destroy()
def _waitForClick(title: str = None):
_showInfoDialog("Click OK when finished editing", title=title)
def _openInEditor(cfg: str) -> None:
_openInStandardApp(cfg)
def _bestMatches(text: str, options: list[str], limit: int, minpercent: int, lengthMatchPercent=0) -> list[str]:
from fuzzywuzzy import process
possibleChoices = process.extract(text, options, limit=limit)
possibleChoices.sort(key=lambda item: item[1], reverse=True)
if lengthMatchPercent:
lens = len(text)
lengthdiff = lens * (1 - lengthMatchPercent/100)
minlength = lens - lengthdiff
maxlength = lens + lengthdiff
return [choice for choice, percent in possibleChoices
if percent >= minpercent and minlength <= len(choice) <= maxlength]
else:
selected = [choice for choice, percent in possibleChoices
if percent >= minpercent]
return selected[:limit]
INVALID = object()
def _forceHash(values) -> int:
hashes = []
for value in values:
if isinstance(value, list):
hashes.append(hash(tuple(value)))
elif isinstance(value, dict):
raise ValueError(f"Dicts cannot be forced to have a hash ({value}")
else:
hashes.append(hash(value))
return hash(tuple(hashes))
[docs]
class CheckedDict(dict):
"""
A dictionary which checks that the keys and values are valid
according to a default dict and a validator. In a :class:`CheckedDict`,
only keys are allowed which are already present in the default given.
Args:
default: a dict will all default values. A config can accept only
keys which are already present in the default
validator: a dict containing choices and types for the keys in the
default. Given a default like: ``{'keyA': 'foo', 'keyB': 20, 'keyC': 0.5}``,
a validator could be::
{'keyA::choices': ['foo', 'bar'],
'keyB::type': float,
'keyB': lambda d, value: value > d['keyC'] * 10
'keyC::range': (0, 1)
}
choices can be defined lazyly by giving a lambda which returns a list
of possible choices
adaptor: a dict mapping keys to functions to convert the value before being set. An adaptor
callback has the form `(key: str, newvalue: Any, oldvalue: Any) -> Any`. The value returned
will be the value set for the given key
docs: a dict containing help lines for keys defined in default
callback: function ``(key, value) -> None``. This function is called **after**
the modification has been done.
precallback: function ``(key, value) -> newvalue``. If given, a precallback intercepts
any change and can modify the value or return INVALID to prevent the modification
strict: if False keys are case and punktuation insensitive, meaning that
a key like 'foo.barBaz' will also be matched by 'foo_bar_baz' or 'foo_barbaz'
Example
=======
.. code::
from configdict import *
default = {
'color': '#FF0000',
'size': 10,
'name': ''
}
validator = {
'size::range': (6, 30),
'color': lambda d, value: iscolor(value)
}
checked = CheckedDict(default, validator=validator)
"""
def __init__(self,
default: dict[str, Any] = None,
validator: dict[str, Any] = None,
docs: dict[str, str] = None,
callback: Callable[[str, Any], None] = None,
adaptor: dict[str, Callable[[str, Any, Any], Any]] = None,
precallback=None,
autoload=True,
strict=True,
readonly=False,
advancedPrefix='.') -> None:
self.default: dict[str, Any] = default if default is not None else {}
"""The default dict"""
self.readonly = False
"""True if this dict is read-only"""
self._validator: dict[str, Any] = validator if validator is not None else {}
self._docs = docs if docs is not None else {}
self._allowedkeys = set(default.keys()) if default is not None else set()
self._adaptor = adaptor if adaptor is not None else {}
self._precallback = precallback
self._callback = callback
self._building = False
self._normalizedKeys: dict[str, str] = {}
self._bypass = False
self._advancedPrefix = advancedPrefix
self._cache = {}
if docs:
_checkDocs(docs, self._allowedkeys)
if self.default:
if autoload:
self.load()
if not strict:
self._normalizedKeys = {normalizeKey(k): k for k in self.default.keys()}
self.readonly = readonly
self._strict = strict
if self._validator:
self._validator = _checkValidator(self._validator, self.default)
def __hash__(self) -> int:
keyshash = hash(tuple(self.keys()))
try:
valueshash = hash(tuple(self.values()))
except TypeError:
logger.debug(f"Some values are unhashable, using unsafe hash ({self.values()}")
valueshash = id(self)
return hash((len(self), keyshash, valueshash, hash(self._precallback), hash(self._callback)))
def _changed(self) -> None:
self._allowedkeys = set(self.default.keys())
[docs]
@staticmethod
def normalizeKey(key: str) -> str:
return normalizeKey(key)
[docs]
def copy(self: _CheckedDictT) -> _CheckedDictT:
"""
Create a copy of this dict
"""
out = self.__class__(default=self.default,
validator=self._validator,
docs=self._docs,
precallback=self._precallback,
callback=self._callback,
autoload=False,
adaptor=self._adaptor.copy())
out._bypass = True
out.update(self)
out._bypass = False
return out
[docs]
def clone(self: _CheckedDictT, updates: dict = None, **kws) -> _CheckedDictT:
"""
Clone self with modifications
Args:
updates: a dict with updated values for the clone dict
kws: any keyworg arg will be used to update the resulting dict
Returns:
the cloned dict
Examples
~~~~~~~~
>>> import configdict
>>> d = configdict.CheckedDict(default={'A': 10, 'B': 20, 'C':30})
>>> d2 = d.clone({'B':21}, C=31)
>>> d2
{'A': 10, 'B': 21, 'C': 31)
"""
out = self.copy()
if updates:
out.update(updates)
if kws:
out.update(kws)
return out
def _infoStr(self, k: str) -> str:
info = []
choices = self.getChoices(k)
if choices:
choices = sortNatural([str(choice) for choice in choices])
choicestr = "{" + ", ".join(str(ch) for ch in choices) + "}"
info.append(choicestr)
elif (keyrange := self.getRange(k)) is not None:
low, high = keyrange
info.append(f"between {low} - {high}")
else:
typestr = self.getTypestr(k)
info.append("type: " + typestr)
if self[k] != self.default[k]:
info.append(f'default: {self.default[k]}')
return" | ".join(info) if info else ""
[docs]
def makeDefault(self: _CheckedDictT) -> _CheckedDictT:
"""
Create a version of this class with all values set to the default
"""
return self.clone(updates=self.default)
[docs]
def diff(self, other: dict = None) -> dict:
"""
Get a dict containing keys:values which differ from the default or from another dict
Args:
other: if given, another dict which this is compared against. Otherwise
the diff is calculated to the default dict
Returns:
a dict containing key: value pairs where self differs from other
"""
if other is None:
other = self.default
assert other is not None
return {k: v for k, v in self.items()
if v != other.get(k, _UNKNOWN)}
[docs]
def __call__(self, key: str, value: Any, type=None, choices=None,
range: tuple[Any, Any] = None, doc: str = '',
validatefunc: validatefunc_t = None) -> None:
if not self._building:
raise RuntimeError("Not inside a context manager context")
self.addKey(key=key, value=value, type=type, choices=choices,
range=range, doc=doc, validatefunc=validatefunc)
[docs]
def addKey(self,
key: str,
value: Any,
type: Union[type, tuple[type, ...]] = None,
choices: Union[Set, tuple] = None,
range: tuple[Any, Any] = None,
validatefunc: validatefunc_t = None,
adaptor: Callable[[str, Any, Any], Any] = None,
doc: str = None) -> None:
"""
Add a ``key: value`` pair to the default settings.
This is used when building the default config item by item (see example).
After adding all new keys it is necessary to call :meth:`ConfigDict.load()`
Example
=======
.. code::
cfg = ConfigDict("foo", load=False)
# We define a default step by step
cfg.addKey("width", 100, range=(50, 150))
cfg.addKey("color", "red", choices=("read", "blue", "green"))
cfg.addKey("height",
doc="Height should be higher than width",
validatefunc=lambda cfg, key, height: height > cfg['width'])
# Now update the dict with the newly defined default and any
# saved version
cfg.load()
Args:
key: a string key
value: a default value
type: the type accepted, as passed to isinstance (can be a tuple)
choices: a set/tuple of possible values
range: a (min, max) tuple defining an allowed range for this value
validatefunc: a function ``(config: dict, key:str, value) -> bool``,
should return `True` if value is valid for `key` or False otherwise
doc: documentation for this key
"""
self.default[key] = value
self._allowedkeys.add(key)
validator = self._validator
if type:
validator[f"{key}::type"] = type
if choices:
validator[f"{key}::choices"] = choices
if range:
validator[f"{key}::range"] = range
if validatefunc:
assert callable(validatefunc), f"Validate function ({validatefunc}) is not callable for key: {key}"
validator[key] = validatefunc
if doc:
self._docs[key] = doc
if adaptor:
self._adaptor[key] = adaptor
def __getitem__(self, key: str):
if (value := dict.get(self, key, _UNKNOWN)) is not _UNKNOWN:
return value
if self._normalizedKeys and (key2 := self._normalizedKeys.get(normalizeKey(key))):
return dict.__getitem__(self, key2)
nearest = self._bestMatches(key, limit=8)
raise KeyError(f"key '{key}' not known. Did you mean {nearest}?\n"
f"Possible keys: {sorted(self.keys())}")
def __setitem__(self, key: str, value) -> None:
if self._bypass:
dict.__setitem__(self, key, value)
return
if self.readonly:
if isinstance(value, str):
value = "'{value}'"
raise ReadOnlyError(f"This dict is read-only. Tried to set '{key}'={value}")
if key not in self._allowedkeys:
if self._normalizedKeys and (normkey := self._normalizedKeys.get(normalizeKey(key))):
key = normkey
else:
mostlikely = self._bestMatches(key=key, limit=8)
msg = f"Unknown key {key}. Did you mean {', '.join(mostlikely)}?"
raise KeyError(msg)
oldvalue = self.get(key)
if oldvalue is not None and oldvalue == value:
return
if self._validator:
errormsg = self.checkValue(key, value)
if errormsg:
raise ValueError(errormsg)
if self._precallback:
newvalue = self._precallback(self, key, oldvalue, value)
if newvalue is not INVALID:
value = newvalue
super().__setitem__(key, value)
if self._callback is not None:
self._callback(key, value)
def _bestMatches(self, key: str, limit=16, minpercent=60):
return _bestMatches(key, list(self._allowedkeys), limit=limit, minpercent=minpercent)
[docs]
def load(self) -> None:
"""
Update any undefined key in self with the default value
Example
~~~~~~~
::
from configdict import *
config = CheckedConfig()
config.addKey(...)
config.addKey(...)
...
config.load()
# Now config is fully defined
"""
if not self.default:
raise ValueError("This dict has no default")
if len(self) == 0:
super().update(self.default)
else:
d = self.default.copy()
d.update(self)
self.update(d)
[docs]
def checkDict(self, d: dict) -> str:
"""
Check if dict `d` can be used to update self
Args:
d (dict): a dict which might update self
Returns:
An error message if `d` has any invalid `key` or `value`,
"" if everything is ok
"""
invalidkeys = [key for key in d if key not in self.default]
if invalidkeys:
return f"Some keys are not valid: {invalidkeys}"
if self._validator:
for k, v in d.items():
errormsg = self.checkValue(k, v)
if errormsg:
return errormsg
return ""
[docs]
def getValidateFunc(self, key: str) -> Optional[validatefunc_t]:
"""
Returns a function to validate a value for ``key``
A validate function has the form ``(config, value) -> bool``
Args:
key (str): the key to query for a validate function
Returns:
The validate function, or None
"""
func = self._validator.get(key, None)
assert func is None or callable(func), \
f"Validate func should be callable for key {key}, got {func}"
return func
[docs]
def getChoices(self, key: str) -> Optional[list]:
"""
Return a seq. of possible values for key ``k`` or ``None``
"""
if key not in self._allowedkeys:
raise KeyError(f"{key} is not a valid key")
if not self._validator:
logger.debug("getChoices: validator not set")
return None
key2 = key+"::choices"
choices = self._validator.get(key2, None)
if isinstance(choices, FunctionType):
realchoices = choices()
self._validator[key2] = set(realchoices)
return realchoices
return choices
[docs]
def getDoc(self, key: str) -> Optional[str]:
""" Get documentation for key (if present) """
if self._docs:
return self._docs.get(key)
[docs]
def checkValue(self, key: str, value) -> Optional[str]:
"""
Check if value is valid for key
This is only possible if a validator was set
Args:
key: the key to check
value: the value to check according to the contraints defined
for the key (range, type, etc)
Returns:
None if the value is acceptable for the key, an error message
otherwise
Example
=======
.. code::
error = config.checkType(key, value)
if error:
print(error)
"""
if not self._validator:
logger.debug(f"Validator not set, cannot check value {value} (key '{key}')")
return
for validatortype in self.validatorTypes(key):
if validatortype == 'choices':
choices = self.getChoices(key)
if choices is not None and value not in choices:
if isinstance(value, str):
value = f"'{value}'"
return f"key '{key}' should be one of {choices}, got {value}"
elif validatortype == 'func':
func = self.getValidateFunc(key)
assert func is not None
error = func(self, key, value)
if error is False:
return f"{value} is not valid for key '{key}'"
elif isinstance(error, str) and error:
return f"{value} is not valid for key '{key}': {error}"
elif validatortype == 'type':
t = self.getType(key)
if t == float:
if not _isfloaty(value):
return f"Expected floatlike for key '{key}', got {type(value).__name__}"
elif t == str:
if not isinstance(value, (bytes, str)):
return f"Expected str or bytes for key '{key}', got {type(value).__name__}"
elif not isinstance(value, t):
return f"Expected {t.__name__} for key '{key}', got {type(value).__name__}"
elif validatortype == 'range':
if (r := self.getRange(key)) and not (r[0] <= value <= r[1]):
return f"Value for key '{key}' should be within range {r}, got {value}"
return None
[docs]
def validatorTypes(self, key: str) -> list[str]:
"""
Return the validator types for a given key
A validator type for a given key can be a choices validator, where a set of
possible values is given for a given key; it can be a range, where the
value must be within a given range; a type, where a value must be of
a certain type; or a function, which must return True if the value
is valid, or False or an error message as string if the value is invalid
Args:
key: the key to query
Returns:
a list of validator types, where each item is one of 'choices',
'range', 'type', 'func'
"""
validatorTypesCache = self._cache.get('validatortypes')
if validatorTypesCache is None:
validatorTypesCache = {}
self._cache['validatortypes'] = validatorTypesCache
elif key in validatorTypesCache:
return validatorTypesCache[key]
validators = []
if f"{key}::choices" in self._validator:
validators.append('choices')
if f"{key}::range" in self._validator:
validators.append('range')
if key in self._validator:
validators.append('func')
if f"{key}::type" in self._validator:
validators.append('type')
validatorTypesCache[key] = validators
return validators
[docs]
def getRange(self, key: str) -> Optional[tuple]:
"""
Returns the valid range for this key's value, if specified.
Args:
key: the key to get the range from.
Returns:
the range of values allowed for this key, or None if there is no
range defined for this key.
Raises KeyError if the key is not present
"""
if key not in self._allowedkeys:
raise KeyError(f"{key} is not a valid key")
if not self._validator:
logger.debug("getRange: validator not set")
return None
return self._validator.get(key+"::range", None)
[docs]
def getType(self, key: str) -> Union[type, tuple[type, ...]]:
"""
Returns the expected type for key's value
Args:
key: the key to query
.. note::
All numbers are reduced to type float, all strings are of type str,
otherwise the type of the default value, which can be a collection
like a list or a dict
See Also: :meth:`checkValue`
"""
if self._validator is not None:
definedtype = self._validator.get(key+"::type")
if definedtype:
return definedtype
choices = self.getChoices(key)
if choices:
types = set(type(choice) for choice in choices)
if len(types) == 1:
return type(next(iter(choices)))
return tuple(types)
defaultval = self.default.get(key, _UNKNOWN)
if defaultval is _UNKNOWN:
raise KeyError(f"Key {key} is not present in default config. "
f"Possible keys: {list(self.default.keys())}")
return str if isinstance(defaultval,
(bytes, str)) else type(defaultval)
[docs]
def getTypestr(self, key: str) -> str:
"""
The same as `.getType` but returns a string representation of the type
Args:
key: the key to query
"""
t = self.getType(key)
if isinstance(t, tuple):
return "("+", ".join(x.__name__ for x in t)+")"
else:
return t.__name__
[docs]
def reset(self) -> None:
"""
Resets the config to its default (inplace)
"""
self.clear()
self.update(self.default)
def _normalizeDict(self, d: dict) -> dict:
out = {}
keys = self.keys()
for k, v in d.items():
if k in keys:
out[k] = v
elif k2 := self._normalizedKeys.get(normalizeKey(k)):
out[k2] = v
else:
raise KeyError(f"Unsupported key: {k}")
return out
[docs]
def update(self, d: dict = None, **kws) -> None:
"""
Update ths dict with `d` or any key:value pair passed as keyword
"""
if d:
errormsg = self.checkDict(d)
if errormsg:
raise ValueError(f"dict is invalid: {errormsg}")
super().update(d)
if kws:
for k, v in kws.items():
if k not in self._allowedkeys and self._normalizedKeys:
k2 = self._normalizedKeys.get(normalizeKey(k))
if k2:
del kws[k]
kws[k2] = v
errormsg = self.checkDict(kws)
if errormsg:
raise ValueError(f"invalid keywords: {errormsg}")
super().update(kws)
[docs]
def updated(self: _CheckedDictT, d: dict = None, **kws) -> _CheckedDictT:
"""
The same as :meth:`~CheckedDict.update`, but returns self
"""
self.update(d, **kws)
return self
[docs]
def override(self, key: str, value, default=None) -> None:
"""
The same as `value if value is not None else config.get(key, default)`
"""
return value if value is not None else self.get(key, default)
[docs]
def asYaml(self, sortKeys=False) -> str:
"""
Returns this dict as yaml str, with comments, defaults, etc.
"""
if sortKeys:
keys = self._sortedKeys()
else:
keys = list(self.keys())
keys.sort(key=lambda key: int(key.startswith(self._advancedPrefix)))
return _asYaml(self, doc=self._docs, validator=self._validator,
default=self.default, keys=keys)
def __enter__(self):
self._building = True
return self
def __exit__(self, *args, **kws):
self._building = False
self.load()
def _saveAsYaml(self, path: str, header: str = '', sortKeys=False,
separateAdvancedKeys=True) -> None:
yamlstr = self.asYaml(sortKeys=sortKeys)
folder = os.path.split(path)[0]
os.makedirs(folder, exist_ok=True)
with open(path, "w") as f:
if header:
f.write(header)
f.write("\n")
f.write(yamlstr)
if not os.path.exists(path):
raise RuntimeError(f"Could not save config to file '{path}', file not found")
def _sortedKeys(self) -> list[str]:
if (out := self._cache.get('sortedkeys')) is not None:
return out
keys = list(self.keys())
keys.sort()
keys.sort(key=lambda k: int(k.startswith(self._advancedPrefix)))
self._cache['sortedkeys'] = keys
return keys
def _repr_html_(self) -> str:
parts = [f'<div><h4>{type(self).__name__}</h4>']
parts.append("<br>")
rows = []
keys = self._sortedKeys()
for k in keys:
v = self[k]
rows.append((k, str(v), self._infoStr(k), self.getDoc(k)))
table = _htmlTable(rows,
headers=('Key', 'Value', 'Type', 'Descr'),
maxwidths=[0, 0, 150, 400],
rowstyles=('strong', 'code', None, None))
parts.append(table)
parts.append("</div>")
return "".join(parts)
def _loadJson(path: str) -> Optional[dict]:
try:
return json.load(open(path))
except json.JSONDecodeError:
error = sys.exc_info()[0]
logger.error(f"Could not read config {path}: {error}")
logger.debug("Using default as fallback")
def _loadYaml(path: str, fail=False) -> Optional[dict]:
try:
with open(path) as f:
return yaml.load(f, Loader=yaml.SafeLoader)
except Exception as e:
err = sys.exc_info()[0]
logger.error(f"Could not read config {path}: {err}")
if fail:
raise e
def _loadDict(path: str) -> Optional[dict]:
fmt = os.path.splitext(path)[1]
if fmt == ".json":
return _loadJson(path)
elif fmt == ".yaml":
return _loadYaml(path, fail=False)
else:
raise ValueError(f"format {fmt} unknown, supported formats: json, yaml")
[docs]
class ConfigDict(CheckedDict):
"""
This is an optionally persistent dictionary used for configuration.
It is saved under the config folder determined by
the OS (and is thus OS dependent). In persistent mode no two instances of the same
config can coexist.
Args:
name: a str of the form ``prefix.name`` or ``prefix/name``
(these are the same) or simply ``name`` if this is an
isolated configuration. The
data will be saved at ``$USERCONFIGDIR/{prefix}/{name}.{fmt}`` if
prefix is given, or ``$USERCONFIGDIR/{name}.{fmt}``.
For instance, in Linux a config with a name "myproj.myconfig" and
a yaml format will be saved to "~/.config/mydir/myconfig.yaml"
default: a dict with all default values. A config can accept only
keys which are already present in the default. This argument can be
None if the config is built successively via :meth:`ConfigDict.addKey`
(see example below) but the dict is not usable until all the keys have
been added and the user calls :meth:`ConfigDict.load` explicitely
validator: a dict containing choices, types and/or ranges for the keys in the
default. Given a default like: ``{'keyA': 'foo', 'keyB': 20}``,
a validator could be::
{
'keyA::choices': ['foo', 'bar'],
'keyB::type': float,
'keyB::range': (10, 30)
}
Choices can be defined lazyly by giving a lambda
docs: a dict containing documentation for each key
persistent: if True, any change to the dict will be automatically saved.
Otherwise a dict can be saved manually via :meth:`ConfigDict.save`
load: if True, the saved version will be loaded after creation.
This is disabled if no default dict is given. This is the case when building
the default after creation - :meth:`ConfigDict.load` should be called
manually in this case (see example).
precallback: function `(dict, key, oldvalue, newvalue) -> None|newvalue`,
If given, it is called *before* the modification is done. This function
should return **None** to allow modification, **any value** to modify the
value, or **raise ValueError** to stop the transaction
sortKeys: if True, keys are sorted whenever the dict is saved/edited.
advancedPrefix: keys with this prefix are marked as advanced. Whenever the dict
is displayed or edited, these keys appear after all the other keys
Example
=======
.. code::
# No default given. The default is built by adding keys subsequently.
# load needs to be called to end the declaration
# This method is somewhat similar to ArgParse
config = ConfigDict("myproj.subproj")
config.addKey("keyA", 10, doc="documentaion of keyA")
config.addKey("keyB", 0.5, range=(0, 1))
config.addKey("keyC", "blue", choices=("blue", "red"),
doc="documentation of keyC")
config.load()
# Alternatively, a config can be built within a context manager. 'load'
is called when exiting the context:
with ConfigDict("maelzel.snd.plotting") as conf
conf.addKey('backend', 'matplotlib', choices={'matlotlib'})
conf.addKey('spectrogram.colormap', 'inferno', choices=_cmaps)
conf.addKey('samplesplot.figsize', (24, 4))
conf.addKey('spectrogram.figsize', (24, 8))
conf.addKey('spectrogram.maxfreq', 12000,
doc="Highest frequency in a spectrogram")
# The same effect can be achieved by passing the default/validator/doc
default = {
"keyA": 10,
"keyB": 0.5,
"keyC": "blue
}
validator = {
"keyB::range": (0, 1),
"keyC::choices": ("blue", "red")
}
docs = {
"keyA": "documentation of keyA"
"keyC": "documentation of keyC"
}
cfg = ConfigDict("myproj.subproj",
default=default,
validator=validator,
docs=docs)
# no need to call .load in this case
# Using inheritance
class MyConfig(ConfigDict):
def __init__(self):
super().__init__(name="myconfig",
default=default,
validator=validator,
docs=docs)
cfg = MyConfig()
"""
_registry: dict[str, ConfigDict] = {}
_helpwidth: int = 58
_infowidth: int = 58
_valuewidth: int = 36
def __init__(self,
name: str,
default: dict[str, Any] = None,
validator: dict[str, Any] = None,
docs: dict[str, str] = None,
adaptor: dict[str, Callable[[str, Any, Any], Any]] = None,
precallback: Callable[[ConfigDict, str, Any, Any], Any] = None,
persistent=False,
load=True,
fmt='yaml',
sortKeys=False,
description='',
strict=True,
advancedPrefix='.') -> None:
self._name = ''
self._base = ''
self._persistent = persistent
self._configPath = None
self._callbacks = []
self._loaded = False
self.bypassCallbacks = False
self.description = description
if name:
name = _normalizeName(name)
if not _isValidName(name):
raise ValueError(f"name {name} is invalid for a config")
previous = self._registry.get(name)
if previous:
if persistent and previous.persistent:
raise ValueError(f"A persistent ConfigDict with the name {name} already exists!")
elif default != previous.default:
logger.warning(f"ConfigDict: instance with name {name} already created"
"with different defaults. It will be overwritten")
self._registry[name] = self
self._name = name
else:
assert not persistent, "A persistent dict needs a name"
load = False
self.fmt = fmt
super().__init__(default=default,
validator=validator,
adaptor=adaptor,
docs=docs,
callback=self._mycallback,
precallback=precallback,
autoload=False,
strict=strict,
advancedPrefix=advancedPrefix)
self.sortKeys = sortKeys
if default is not None:
self._updateWithDefault()
if load:
self.load()
@property
def name(self) -> Optional[str]:
"""
The name of this ConfigDict. The name determines where it is saved
"""
return self._name
@property
def persistent(self) -> bool:
"""Is this a persistent ConfigDict?"""
return self._persistent
@persistent.setter
def persistent(self, value) -> None:
"""Make this dict persistent. There can only be one persistent dict per name"""
if self._persistent == value:
return
self._persistent = value
if value:
if self._name in self._registry:
raise ValueError(f"A persistent ConfigDict with the name {self._name} already exists")
if not self._name:
raise ValueError("A ConfigDict without namecannot be set to persistent")
self._ensureWritable()
self._registry[self._name] = self
else:
assert self._name in self._registry
del self._registry[self._name]
def _mycallback(self, key, value):
"""
own callback used to dispatch to any registered callbacks and save
self after any change
"""
if self.bypassCallbacks:
return
for pattern, func in self._callbacks:
if re.match(pattern, key):
func(self, key, value)
if self._persistent:
self.save()
[docs]
def update(self, d: dict = None, **kws) -> None:
"""
Update this dict with the values in d.
.. note::
keywords have priority over d (similar to builtin dict)
Args:
d: values in this dictionary will overwrite values in self.
Keys not present in self will raise an exception
**kws: any key:value here will also be used to update self
"""
if not d and not kws:
return
self._persistent, persistent = False, self._persistent
CheckedDict.update(self, d, **kws)
self._persistent = persistent
if persistent:
self.save()
[docs]
def copy(self: _CheckedDictT) -> _CheckedDictT:
"""
Create a copy if this dict.
The copy will be unnamed and not persistent. Use :meth:`ConfigDict.clone`
to create a named/persistent clone of this dict.
Returns:
the copy of this dict
"""
return self.clone()
[docs]
def isCongruentWith(self, other: ConfigDict) -> bool:
"""
Returns True if self and other share same default
"""
return self.default == other.default
[docs]
def clone(self: _ConfigDictT, updates: dict = None, name: str = None, persistent=False,
cloneCallbacks=True, **kws
) -> _ConfigDictT:
"""
Create a clone of this dict
Args:
name: the name of the clone. If not given, the name of this dict is used.
persistent: Should the clone be made persitent?
cloneCallbacks: should the registered callbacks of the original (if any) be
cloned?
updates: a dict with updates
**kws: same as updates but only for keys which are valid keywords
Returns:
the cloned dict
"""
if name is None:
name = self._name
out = self.__class__(default=self.default, validator=self._validator, docs=self._docs,
persistent=persistent, load=False, name=name)
out._bypass = True
dict.update(out, self)
out._bypass = False
if updates:
out.update(updates)
if kws:
out.update(**kws)
if cloneCallbacks and self._callbacks:
for pattern, func in self._callbacks:
out.registerCallback(func, pattern)
return out
[docs]
def registerCallback(self,
func: Callable[[ConfigDict, str, Any], None],
pattern=r".*"
) -> None:
"""
Register a callback to be fired when a key matching the given pattern is changed.
If no pattern is given, the function will be called for every key.
Args:
func: a function of the form ``(dict, key, value) -> None``, where *dict* is
this ConfigDict itself, *key* is the key which was just changed and *value*
is the new value.
pattern: a regex pattern. The function will be called if the pattern matches
the key being modified.
"""
self._callbacks.append((pattern, func))
def _ensureWritable(self) -> None:
""" Make sure that we can serialize this dict to disk """
folder, _ = os.path.split(self.getPath())
if not os.path.exists(folder):
os.makedirs(folder)
[docs]
def reset(self, save=True) -> None:
""" Reset this dict to its default """
super().reset()
if save:
self.save()
[docs]
def resetKey(self, key: str) -> None:
"""Reset the given key to its default value"""
self[key] = self.default[key]
[docs]
def save(self, path: str = None, header='') -> None:
"""
Save this to its persistent path (or a custom path)
If this config was created with the `persistent` flag on,
it does not need to be saved manually, it is saved whenever it
is modified. However if it was created with ``persistent=False``
then this method can be used to write this dict so it will be
loaded in a future session
Args:
path: the path to save the config. If None and this
is a named config, it is saved to the path returned by
:meth:`~ConfigDict.getPath`
header: if given, this string is written prior to the dict, as
a comment. This is only supported when saving to yaml
"""
if not path:
path = self.getPath()
fmt = self.fmt
else:
fmt = os.path.splitext(path)[1][1:]
assert fmt in {'json', 'yaml', 'csv'}, f"Invalid format {fmt}, expected one of 'yaml', 'json', 'csv'"
logger.debug(f"Saving config to {path}")
if fmt is None:
fmt = self.fmt
if fmt == 'json':
with open(path, "w") as f:
json.dump(self, f, indent=True, sort_keys=True)
elif fmt == 'yaml' or fmt == 'yml':
self._saveAsYaml(path, header=header, sortKeys=self.sortKeys)
elif fmt == 'csv':
csvstr = self.asCsv()
open(path, "w").write(csvstr)
else:
raise ValueError(f"Extention '{fmt}' not suported. It should be one of .yaml, .yml, .json, .csv")
assert os.path.exists(path), f"Saved file to '{path}', but file does not exist"
[docs]
def dump(self):
""" Dump this config to stdout """
print(str(self))
def _asRows(self):
rows = []
for key, value in self.items():
infostr = self._infoStr(key)
doc = self.getDoc(key)
rows.append((key, str(value), infostr, doc if doc else ""))
return rows
[docs]
def generateRstDocumentation(self, maxWidth=80, withName=True, withDescription=True,
withLink=True, linkPrefix=''
) -> str:
"""
Generate ReST documentation for this dictionary
The generated string can then be dumped to a file and included
in documentation
Args:
maxWidth: the max. width of a line
withName: if True, add the name of the config (if it has a name)
withDescription: if True, add this dict's description (if it has any)
withLink: if True, for each key:value pair generate a RST link using the given linkPrefix
For example, for a key 'foo' and a linkPrefix='config' the generated link will be
``.. _configfoo``. This link can be used within the documentation to link to this key
linkPrefix: a prefix to use for all links
Returns:
the generated rst documentation, as str.
"""
lines = []
_ = lines.append
if withName and self.name:
_(self.name)
_("-" * len(self.name))
_('')
if withDescription and self.description:
_(textwrap.wrap(self.description, width=maxWidth))
_('\n------------------------\n')
for key, value in self.default.items():
if withLink:
linkkey = _asRstLinkKey(key)
if linkPrefix:
linkkey = linkPrefix + linkkey
_(f".. _{linkkey}:\n")
_(f"{key}:")
if isinstance(value, str) and not value:
value = "''"
_(f" | Default: **{value}** -- ``{self.getTypestr(key)}``")
if choices := self.getChoices(key):
choices = sortNatural([str(_) for _ in choices])
choicestr = ', '.join(choices)
_(f" | Choices: ``{choicestr}``")
if valuerange := self.getRange(key):
a, b = valuerange
_(f" | Between {a} - {b}")
if doc := self.getDoc(key):
_(f" | *{doc}*")
_("")
return "\n".join(lines)
[docs]
def asCsv(self) -> str:
"""
Returns this dict as a csv str, with columns: key, value, spec, doc
"""
rows = [("# key", "value", "spec", "doc")]
rows.extend(self._asRows())
from io import StringIO
import csv
s = StringIO()
writer = csv.writer(s)
writer.writerows(rows)
return s.getvalue()
# def _infoStr(self, k: str) -> str:
# info = []
# choices = self.getChoices(k)
# if choices:
# choices = sortNatural([str(choice) for choice in choices])
# choicestr = "{" + ", ".join(str(ch) for ch in choices) + "}"
# info.append(choicestr)
# elif (keyrange := self.getRange(k)) is not None:
# low, high = keyrange
# info.append(f"between {low} - {high}")
# else:
# typestr = self.getTypestr(k)
# info.append("type: " + typestr)
#
# if self[k] != self.default[k]:
# info.append(f'default: {self.default[k]}')
# return" | ".join(info) if info else ""
def _repr_html_(self) -> str:
parts = [f'<div><h4>{type(self).__name__}: <strong>{self.name}</strong></h4>']
if self.persistent:
parts.append(f'persistent (<code>"{self.getPath()}"</code>)')
parts.append("<br>")
rows = []
keys = self._sortedKeys()
for k in keys:
v = self[k]
descr = self.getDoc(k)
if v == self.default[k]:
strv = str(v)
else:
strv = f'<i><b>{v}</b></i>'
rows.append((k, strv, self._infoStr(k), descr))
table = _htmlTable(rows, headers=('Key', 'Value', 'Type', 'Descr'), maxwidths=[0, 0, 150, 400],
rowstyles=('strong', 'code', None, None))
parts.append(table)
parts.append("</div>")
return "".join(parts)
def _repr_pretty_(self, printer, cycle) -> str:
return printer.text(str(self))
def _repr_rows(self) -> list[str]:
try:
termwidth = os.get_terminal_size()[0] - 6
except OSError:
termwidth = 80
maxwidth = self._infowidth + self._valuewidth + max(len(k) for k in self.keys())
infowidth = int(self._infowidth / maxwidth * termwidth)
valuewidth = int(self._valuewidth / maxwidth * termwidth)
rows = []
keys = sorted(self.keys())
for k in keys:
v = self[k]
infostr = self._infoStr(k)
if len(infostr) > infowidth:
infolines = textwrap.wrap(infostr, infowidth)
infostr = "\n".join(infolines)
valuestr = str(v)
if len(valuestr) > valuewidth:
valuestr = "\n".join(textwrap.wrap(valuestr, valuewidth))
rows.append((k, valuestr, infostr))
doc = self.getDoc(k)
if doc:
if len(doc) > infowidth:
doclines = textwrap.wrap(doc, infowidth)
doc = "\n".join(doclines)
rows.append(("", "", doc))
return rows
def __str__(self) -> str:
import tabulate
header = f"Config: {self._name}\n"
rows = self._repr_rows()
return header + tabulate.tabulate(rows) + '\n'
[docs]
def getPath(self) -> str:
""" Return the path this dict will be saved to
If the dict has no name, an empty string is returned
"""
if not self._name:
return ''
if not self._configPath:
self._configPath = configPathFromName(self._name, self.fmt)
return self._configPath
[docs]
def edit(self, waitOnModified=True, sortKeys=False) -> None:
"""
Edit this config by opening it in an external application.
The format used is *yaml*. This is independent of the format used for
persistence. The application used is the user's default application
for the .yaml format and can be configured at the os level. In macos
we use ``open``, in linux ``xdg-open`` and in windows ``start``, which
all respond to the user's own configuration regarding default applications.
.. note::
A temporary file is created for editing. The persisted file is only
modified if the editing is accepted.
Args:
waitOnModified: if True, the transaction is accepted whenever the
file being edited is saved. Otherwise a message box is created
which needs to be clicked in order to confirm the transaction.
Just exiting the application will not cancel the edit job since
many applications which have a server mode or unique instance
mode might in fact exit right away from the perspective of the
subprocess which launched them
sortKeys: if True, keys appear in sorted order
"""
header = _editHeaderWatch if waitOnModified else _editHeaderPopup
configfile = tempfile.mktemp(suffix=".yaml")
self._saveAsYaml(configfile, header=header, sortKeys=sortKeys)
assert os.path.exists(configfile)
_openInEditor(configfile)
if waitOnModified:
try:
_notify(f"Config Edit: {self.name}", "Modify the values as needed. Save the file to accept the changes "
f"or press ctrl-c at the python prompt to cancel (path: {configfile})")
_waitOnFileModified(configfile)
_notify("Edit", "Editing finished, any further modifications will have no effect")
except KeyboardInterrupt:
logger.debug("Editing aborted")
_notify(f"Config Edit: {self.name}", "Editing aborted")
return
else:
_waitForClick(title=self.name)
self.load(configfile)
if self.persistent:
self.save()
def _updateWithDefault(self, bypass=True) -> None:
try:
self._bypass = True
dict.update(self, self.default)
self._bypass = False
except ValueError as e:
errmsg = textwrap.indent(str(e), prefix=" ")
raise ValueError(f"Could not load default dict, error:\n{errmsg}")
def _fill(self, other: dict) -> None:
for key in other:
if key not in self:
self[key] = other[key]
[docs]
def load(self, configpath: str = None) -> None:
"""
Read the saved config, update self.
If there is no saved version or the dict has no name, then
the dict is set to the default defined at construction.
When defining the default iteratively (via addKey), calling
load marks the end of the definition: after calling load no other
keys can be added to this dict.
Args:
configpath: an custom path to load a saved version from. Otherwise
it is loaded from :meth:`ConfigDict.getPath` (this is only
possible if the dict has a name, since the resolved path
is determined from the name)
Example
-------
.. code::
from configdict import ConfigDict
conf = ConfigDict('foo.bar')
conf.addKey('key1', 'value1', ...)
conf.addKey('key2', 'value2', ...)
...
# When finished defining keys, call .load
conf.load()
# Now the dict can be used
When
"""
assert self.default
if len(self) == 0:
# load after defining the default
super().update(self.default)
if configpath is None:
configpath = self.getPath()
if not configpath or not os.path.exists(configpath):
logger.debug(f"No saved version found for dict '{self.name}', using default")
super().update(self.default)
return
logger.debug(f"Reading config from disk: {configpath}")
confdict = _loadDict(configpath)
if confdict is None:
logger.error("Could not load saved config, skipping")
return
# only keys in default should be accepted, but keys in the read
# config should be discarded with a warning
keysNotInDefault = confdict.keys() - self.default.keys()
needsSave = False
if keysNotInDefault:
logger.info(f"ConfigDict {self._name}, saved at {configpath}\n"
"There are keys defined in the saved config which are not"
" present in the default config, they will be skipped: \n"
f" {keysNotInDefault}\n ")
for k in keysNotInDefault:
del confdict[k]
needsSave = True
# merge strategy:
# * if a key is shared between default and read dict, read dict has priority
# * if a key is present only in default, it is added
# check invalid values
if self._validator:
keysWithInvalidValues = []
for k, v in confdict.items():
errormsg = self.checkValue(k, v)
if errormsg:
logger.error(f"Error while loading config {self.name} (path: {configpath})")
logger.error(errormsg)
logger.error(f" Using default: {self.default[k]}")
keysWithInvalidValues.append(k)
for k in keysWithInvalidValues:
del confdict[k]
super().update(confdict)
self._loaded = True
if needsSave and self.persistent:
self.save()
def _makeName(configname: str, base: str = None) -> str:
if base is not None:
return f"{base}.{configname}"
else:
return f".{configname}"
def _mergeDicts(readdict: dict[str, Any], default: dict[str, Any]) -> dict[str, Any]:
"""
Merge readdict into default
Args:
readdict:
default:
Returns:
the merged dict
"""
out = {}
sharedkeys = readdict.keys() & default.keys()
for key in sharedkeys:
out[key] = readdict[key]
onlyInDefault = default.keys() - readdict.keys()
for key in onlyInDefault:
out[key] = default[key]
return out
def _parseName(name: str) -> tuple[str | None, str]:
"""
Returns (base, configname)
base can be none
"""
if ":" not in name:
base = None
configname = name
else:
base, *rest = name.split(":")
configname = ".".join(rest)
if not base:
base = None
return base, configname
def _isValidName(name: str) -> bool:
return re.fullmatch(r"[a-zA-Z0-9\.\:_]+", name) is not None
def _normalizeName(name: str) -> str:
"""
Originally a name would be of the form project:name,
later on we enabled / and . to act as path separator
"""
if "/" in name:
return name.replace("/", ":")
elif "." in name:
return name.replace(".", ":")
return name
def _checkName(name):
"""
check if name is a valid name for a config
"""
if not _isValidName(name):
raise ValueError(
f"{name} is not a valid name for a config."
" It should contain letters, numbers and any of '.', '_', ':'")
[docs]
def getConfig(name: str) -> Optional[ConfigDict]:
"""
Retrieve a previously created ConfigDict.
This will NOT load a saved config since for a ConfigDict to be properly
defined a default config must accompany the saved version. In order to
load a saved config as default just load it as a normal .yaml or .json
file and use that dict as the default.
Args:
name: the unique id of the configuration, as passed to ConfigDict
Returns:
the ConfigDict, if found. None otherwise.
"""
assert name, "name is empty"
name = _normalizeName(name)
_checkName(name)
return ConfigDict._registry.get(name)
[docs]
def activeConfigs() -> dict[str, ConfigDict]:
"""
Returns a dict of active configs
"""
return ConfigDict._registry.copy()
def _removeConfigFromDisk(name: str) -> bool:
"""
Remove the given config from disc, returns True if it was found and removed,
False otherwise
"""
configpath = configPathFromName(name)
if os.path.exists(configpath):
os.remove(configpath)
return True
return False
[docs]
def configPathFromName(name: str, fmt='yaml') -> str:
"""
Given a config name, return the path where it should be saved
Args:
name: the name of this config, with the format [prefix.]name
fmt: the format of the config (valid options: json, yaml)
Returns:
the path corresponding to this config name
"""
name = _normalizeName(name)
userconfigdir = appdirs.user_config_dir()
base, configname = _parseName(name)
if fmt == 'json':
configfile = configname + ".json"
elif fmt == 'yaml':
configfile = configname + '.yaml'
else:
raise ValueError("Formats supported: json, yaml")
if base is not None:
configdir = os.path.join(userconfigdir, base)
else:
configdir = userconfigdir
return os.path.join(configdir, configfile)