serde.core
pyserde core module.
1""" 2pyserde core module. 3""" 4 5from __future__ import annotations 6import dataclasses 7import enum 8import functools 9import logging 10import sys 11import re 12import casefy 13from dataclasses import dataclass 14from beartype.door import is_bearable 15from collections.abc import Mapping, Sequence, Callable 16from typing import ( 17 overload, 18 TypeVar, 19 Generic, 20 Optional, 21 Any, 22 Protocol, 23 get_type_hints, 24 Union, 25) 26 27from .compat import ( 28 T, 29 SerdeError, 30 dataclass_fields, 31 get_origin, 32 is_bare_dict, 33 is_bare_list, 34 is_bare_set, 35 is_bare_tuple, 36 is_class_var, 37 is_dict, 38 is_generic, 39 is_list, 40 is_literal, 41 is_new_type_primitive, 42 is_any, 43 is_opt, 44 is_opt_dataclass, 45 is_set, 46 is_tuple, 47 is_union, 48 is_variable_tuple, 49 type_args, 50 typename, 51 _WithTagging, 52) 53 54__all__ = [ 55 "Scope", 56 "gen", 57 "add_func", 58 "Func", 59 "Field", 60 "fields", 61 "FlattenOpts", 62 "conv", 63 "union_func_name", 64] 65 66logger = logging.getLogger("serde") 67 68 69# name of the serde context key 70SERDE_SCOPE = "__serde__" 71 72# main function keys 73FROM_ITER = "from_iter" 74FROM_DICT = "from_dict" 75TO_ITER = "to_iter" 76TO_DICT = "to_dict" 77TYPE_CHECK = "typecheck" 78 79# prefixes used to distinguish the direction of a union function 80UNION_SE_PREFIX = "union_se" 81UNION_DE_PREFIX = "union_de" 82 83LITERAL_DE_PREFIX = "literal_de" 84 85SETTINGS = {"debug": False} 86 87 88def init(debug: bool = False) -> None: 89 SETTINGS["debug"] = debug 90 91 92@dataclass 93class Cache: 94 """ 95 Cache the generated code for non-dataclass classes. 96 97 for example, a type not bound in a dataclass is passed in from_json 98 99 ``` 100 from_json(Union[Foo, Bar], ...) 101 ``` 102 103 It creates the following wrapper dataclass on the fly, 104 105 ``` 106 @serde 107 @dataclass 108 class Union_Foo_bar: 109 v: Union[Foo, Bar] 110 ``` 111 112 Then store this class in this cache. Whenever the same type is passed, 113 the class is retrieved from this cache. So the overhead of the codegen 114 should be only once. 115 """ 116 117 classes: dict[str, type[Any]] = dataclasses.field(default_factory=dict) 118 119 def _get_class(self, cls: type[Any]) -> type[Any]: 120 """ 121 Get a wrapper class from the the cache. If not found, it will generate 122 the class and store it in the cache. 123 """ 124 class_name = f"Wrapper{typename(cls)}" 125 wrapper = self.classes.get(class_name) 126 return wrapper or self._generate_class(cls) 127 128 def _generate_class(self, cls: type[Any]) -> type[Any]: 129 """ 130 Generate a wrapper dataclass then make the it (de)serializable using 131 @serde decorator. 132 """ 133 from . import serde 134 135 class_name = f"Wrapper{typename(cls)}" 136 logger.debug(f"Generating a wrapper class code for {class_name}") 137 138 wrapper = dataclasses.make_dataclass(class_name, [("v", cls)]) 139 140 serde(wrapper) 141 self.classes[class_name] = wrapper 142 143 logger.debug(f"(de)serializing code for {class_name} was generated") 144 return wrapper 145 146 def serialize(self, cls: type[Any], obj: Any, **kwargs: Any) -> Any: 147 """ 148 Serialize the specified type of object into dict or tuple. 149 """ 150 wrapper = self._get_class(cls) 151 scope: Scope = getattr(wrapper, SERDE_SCOPE) 152 data = scope.funcs[TO_DICT](wrapper(obj), **kwargs) 153 154 logging.debug(f"Intermediate value: {data}") 155 156 return data["v"] 157 158 def deserialize(self, cls: type[T], obj: Any) -> T: 159 """ 160 Deserialize from dict or tuple into the specified type. 161 """ 162 wrapper = self._get_class(cls) 163 scope: Scope = getattr(wrapper, SERDE_SCOPE) 164 return scope.funcs[FROM_DICT](data={"v": obj}).v # type: ignore 165 166 def _get_union_class(self, cls: type[Any]) -> Optional[type[Any]]: 167 """ 168 Get a wrapper class from the the cache. If not found, it will generate 169 the class and store it in the cache. 170 """ 171 union_cls, tagging = _extract_from_with_tagging(cls) 172 class_name = union_func_name( 173 f"{tagging.produce_unique_class_name()}Union", list(type_args(union_cls)) 174 ) 175 wrapper = self.classes.get(class_name) 176 return wrapper or self._generate_union_class(cls) 177 178 def _generate_union_class(self, cls: type[Any]) -> type[Any]: 179 """ 180 Generate a wrapper dataclass then make the it (de)serializable using 181 @serde decorator. 182 """ 183 import serde 184 185 union_cls, tagging = _extract_from_with_tagging(cls) 186 class_name = union_func_name( 187 f"{tagging.produce_unique_class_name()}Union", list(type_args(union_cls)) 188 ) 189 wrapper = dataclasses.make_dataclass(class_name, [("v", union_cls)]) 190 serde.serde(wrapper, tagging=tagging) 191 self.classes[class_name] = wrapper 192 return wrapper 193 194 def serialize_union(self, cls: type[Any], obj: Any) -> Any: 195 """ 196 Serialize the specified Union into dict or tuple. 197 """ 198 union_cls, _ = _extract_from_with_tagging(cls) 199 wrapper = self._get_union_class(cls) 200 scope: Scope = getattr(wrapper, SERDE_SCOPE) 201 func_name = union_func_name(UNION_SE_PREFIX, list(type_args(union_cls))) 202 return scope.funcs[func_name](obj, False, False) 203 204 def deserialize_union(self, cls: type[T], data: Any) -> T: 205 """ 206 Deserialize from dict or tuple into the specified Union. 207 """ 208 union_cls, _ = _extract_from_with_tagging(cls) 209 wrapper = self._get_union_class(cls) 210 scope: Scope = getattr(wrapper, SERDE_SCOPE) 211 func_name = union_func_name(UNION_DE_PREFIX, list(type_args(union_cls))) 212 return scope.funcs[func_name](cls=union_cls, data=data) # type: ignore 213 214 215def _extract_from_with_tagging(maybe_with_tagging: Any) -> tuple[Any, Tagging]: 216 try: 217 if isinstance(maybe_with_tagging, _WithTagging): 218 union_cls = maybe_with_tagging.inner 219 tagging = maybe_with_tagging.tagging 220 else: 221 raise Exception() 222 except Exception: 223 union_cls = maybe_with_tagging 224 tagging = ExternalTagging 225 226 return (union_cls, tagging) 227 228 229CACHE = Cache() 230""" Global cache variable for non-dataclass classes """ 231 232 233@dataclass 234class Scope: 235 """ 236 Container to store types and functions used in code generation context. 237 """ 238 239 cls: type[Any] 240 """ The exact class this scope is for 241 (needed to distinguish scopes between inherited classes) """ 242 243 funcs: dict[str, Callable[..., Any]] = dataclasses.field(default_factory=dict) 244 """ Generated serialize and deserialize functions """ 245 246 defaults: dict[str, Union[Callable[..., Any], Any]] = dataclasses.field(default_factory=dict) 247 """ Default values of the dataclass fields (factories & normal values) """ 248 249 code: dict[str, str] = dataclasses.field(default_factory=dict) 250 """ Generated source code (only filled when debug is True) """ 251 252 union_se_args: dict[str, list[type[Any]]] = dataclasses.field(default_factory=dict) 253 """ The union serializing functions need references to their types """ 254 255 reuse_instances_default: bool = True 256 """ Default values for to_dict & from_dict arguments """ 257 258 convert_sets_default: bool = False 259 260 def __repr__(self) -> str: 261 res: list[str] = [] 262 263 res.append("==================================================") 264 res.append(self._justify(self.cls.__name__)) 265 res.append("==================================================") 266 res.append("") 267 268 if self.code: 269 res.append("--------------------------------------------------") 270 res.append(self._justify("Functions generated by pyserde")) 271 res.append("--------------------------------------------------") 272 res.extend(list(self.code.values())) 273 res.append("") 274 275 if self.funcs: 276 res.append("--------------------------------------------------") 277 res.append(self._justify("Function references in scope")) 278 res.append("--------------------------------------------------") 279 for k, v in self.funcs.items(): 280 res.append(f"{k}: {v}") 281 res.append("") 282 283 if self.defaults: 284 res.append("--------------------------------------------------") 285 res.append(self._justify("Default values for the dataclass fields")) 286 res.append("--------------------------------------------------") 287 for k, v in self.defaults.items(): 288 res.append(f"{k}: {v}") 289 res.append("") 290 291 if self.union_se_args: 292 res.append("--------------------------------------------------") 293 res.append(self._justify("Type list by used for union serialize functions")) 294 res.append("--------------------------------------------------") 295 for k, lst in self.union_se_args.items(): 296 res.append(f"{k}: {list(lst)}") 297 res.append("") 298 299 return "\n".join(res) 300 301 def _justify(self, s: str, length: int = 50) -> str: 302 white_spaces = int((50 - len(s)) / 2) 303 return " " * (white_spaces if white_spaces > 0 else 0) + s 304 305 306def raise_unsupported_type(obj: Any) -> None: 307 # needed because we can not render a raise statement everywhere, e.g. as argument 308 raise SerdeError(f"Unsupported type: {typename(type(obj))}") 309 310 311def gen( 312 code: str, globals: Optional[dict[str, Any]] = None, locals: Optional[dict[str, Any]] = None 313) -> str: 314 """ 315 A wrapper of builtin `exec` function. 316 """ 317 if SETTINGS["debug"]: 318 # black formatting is only important when debugging 319 try: 320 from black import FileMode, format_str 321 322 code = format_str(code, mode=FileMode(line_length=100)) 323 except Exception: 324 pass 325 exec(code, globals, locals) 326 return code 327 328 329def add_func(serde_scope: Scope, func_name: str, func_code: str, globals: dict[str, Any]) -> None: 330 """ 331 Generate a function and add it to a Scope's `funcs` dictionary. 332 333 * `serde_scope`: the Scope instance to modify 334 * `func_name`: the name of the function 335 * `func_code`: the source code of the function 336 * `globals`: global variables that should be accessible to the generated function 337 """ 338 339 code = gen(func_code, globals) 340 serde_scope.funcs[func_name] = globals[func_name] 341 342 if SETTINGS["debug"]: 343 serde_scope.code[func_name] = code 344 345 346def is_instance(obj: Any, typ: Any) -> bool: 347 """ 348 pyserde's own `isinstance` helper. It accepts subscripted generics e.g. `list[int]` and 349 deeply check object against declared type. 350 """ 351 if dataclasses.is_dataclass(typ): 352 if not isinstance(typ, type): 353 raise SerdeError("expect dataclass class but dataclass instance received") 354 return isinstance(obj, typ) 355 elif is_opt(typ): 356 return is_opt_instance(obj, typ) 357 elif is_union(typ): 358 return is_union_instance(obj, typ) 359 elif is_list(typ): 360 return is_list_instance(obj, typ) 361 elif is_set(typ): 362 return is_set_instance(obj, typ) 363 elif is_tuple(typ): 364 return is_tuple_instance(obj, typ) 365 elif is_dict(typ): 366 return is_dict_instance(obj, typ) 367 elif is_generic(typ): 368 return is_generic_instance(obj, typ) 369 elif is_literal(typ): 370 return True 371 elif is_new_type_primitive(typ): 372 inner = getattr(typ, "__supertype__", None) 373 if type(inner) is type: 374 return isinstance(obj, inner) 375 else: 376 return False 377 elif is_any(typ): 378 return True 379 elif typ is Ellipsis: 380 return True 381 else: 382 return is_bearable(obj, typ) 383 384 385def is_opt_instance(obj: Any, typ: type[Any]) -> bool: 386 if obj is None: 387 return True 388 opt_arg = type_args(typ)[0] 389 return is_instance(obj, opt_arg) 390 391 392def is_union_instance(obj: Any, typ: type[Any]) -> bool: 393 for arg in type_args(typ): 394 if is_instance(obj, arg): 395 return True 396 return False 397 398 399def is_list_instance(obj: Any, typ: type[Any]) -> bool: 400 if not isinstance(obj, list): 401 return False 402 if len(obj) == 0 or is_bare_list(typ): 403 return True 404 list_arg = type_args(typ)[0] 405 # for speed reasons we just check the type of the 1st element 406 return is_instance(obj[0], list_arg) 407 408 409def is_set_instance(obj: Any, typ: type[Any]) -> bool: 410 if not isinstance(obj, (set, frozenset)): 411 return False 412 if len(obj) == 0 or is_bare_set(typ): 413 return True 414 set_arg = type_args(typ)[0] 415 # for speed reasons we just check the type of the 1st element 416 return is_instance(next(iter(obj)), set_arg) 417 418 419def is_tuple_instance(obj: Any, typ: type[Any]) -> bool: 420 args = type_args(typ) 421 422 if not isinstance(obj, tuple): 423 return False 424 425 # empty tuple 426 if len(args) == 0 and len(obj) == 0: 427 return True 428 429 # In the form of tuple[T, ...] 430 elif is_variable_tuple(typ): 431 # Get the first type arg. Since tuple[T, ...] is homogeneous tuple, 432 # all the elements should be of this type. 433 arg = type_args(typ)[0] 434 for v in obj: 435 if not is_instance(v, arg): 436 return False 437 return True 438 439 # bare tuple "tuple" is equivalent to tuple[Any, ...] 440 if is_bare_tuple(typ) and isinstance(obj, tuple): 441 return True 442 443 # All the other tuples e.g. tuple[int, str] 444 if len(obj) == len(args): 445 for element, arg in zip(obj, args): 446 if not is_instance(element, arg): 447 return False 448 else: 449 return False 450 451 return True 452 453 454def is_dict_instance(obj: Any, typ: type[Any]) -> bool: 455 if not isinstance(obj, dict): 456 return False 457 if len(obj) == 0 or is_bare_dict(typ): 458 return True 459 ktyp = type_args(typ)[0] 460 vtyp = type_args(typ)[1] 461 for k, v in obj.items(): 462 # for speed reasons we just check the type of the 1st element 463 return is_instance(k, ktyp) and is_instance(v, vtyp) 464 return False 465 466 467def is_generic_instance(obj: Any, typ: type[Any]) -> bool: 468 return is_instance(obj, get_origin(typ)) 469 470 471@dataclass 472class Func: 473 """ 474 Function wrapper that provides `mangled` optional field. 475 476 pyserde copies every function reference into global scope 477 for code generation. Mangling function names is needed in 478 order to avoid name conflict in the global scope when 479 multiple fields receives `skip_if` attribute. 480 """ 481 482 inner: Callable[..., Any] 483 """ Function to wrap in """ 484 485 mangeld: str = "" 486 """ Mangled function name """ 487 488 def __call__(self, v: Any) -> None: 489 return self.inner(v) # type: ignore 490 491 @property 492 def name(self) -> str: 493 """ 494 Mangled function name 495 """ 496 return self.mangeld 497 498 499def skip_if_false(v: Any) -> Any: 500 return not bool(v) 501 502 503def skip_if_default(v: Any, default: Optional[Any] = None) -> Any: 504 return v == default # Why return type is deduced to be Any? 505 506 507@dataclass 508class FlattenOpts: 509 """ 510 Flatten options. Currently not used. 511 """ 512 513 514def field( 515 *args: Any, 516 rename: Optional[str] = None, 517 alias: Optional[list[str]] = None, 518 skip: Optional[bool] = None, 519 skip_if: Optional[Callable[[Any], Any]] = None, 520 skip_if_false: Optional[bool] = None, 521 skip_if_default: Optional[bool] = None, 522 serializer: Optional[Callable[..., Any]] = None, 523 deserializer: Optional[Callable[..., Any]] = None, 524 flatten: Optional[Union[FlattenOpts, bool]] = None, 525 metadata: Optional[dict[str, Any]] = None, 526 **kwargs: Any, 527) -> Any: 528 """ 529 Declare a field with parameters. 530 """ 531 if not metadata: 532 metadata = {} 533 534 if rename is not None: 535 metadata["serde_rename"] = rename 536 if alias is not None: 537 metadata["serde_alias"] = alias 538 if skip is not None: 539 metadata["serde_skip"] = skip 540 if skip_if is not None: 541 metadata["serde_skip_if"] = skip_if 542 if skip_if_false is not None: 543 metadata["serde_skip_if_false"] = skip_if_false 544 if skip_if_default is not None: 545 metadata["serde_skip_if_default"] = skip_if_default 546 if serializer: 547 metadata["serde_serializer"] = serializer 548 if deserializer: 549 metadata["serde_deserializer"] = deserializer 550 if flatten is True: 551 metadata["serde_flatten"] = FlattenOpts() 552 elif flatten: 553 metadata["serde_flatten"] = flatten 554 555 return dataclasses.field(*args, metadata=metadata, **kwargs) 556 557 558@dataclass 559class Field(Generic[T]): 560 """ 561 Field class is similar to `dataclasses.Field`. It provides pyserde specific options. 562 563 `type`, `name`, `default` and `default_factory` are the same members as `dataclasses.Field`. 564 """ 565 566 type: type[T] 567 """ Type of Field """ 568 name: Optional[str] 569 """ Name of Field """ 570 default: Any = field(default_factory=dataclasses._MISSING_TYPE) 571 """ Default value of Field """ 572 default_factory: Any = field(default_factory=dataclasses._MISSING_TYPE) 573 """ Default factory method of Field """ 574 init: bool = field(default_factory=dataclasses._MISSING_TYPE) 575 repr: Any = field(default_factory=dataclasses._MISSING_TYPE) 576 hash: Any = field(default_factory=dataclasses._MISSING_TYPE) 577 compare: Any = field(default_factory=dataclasses._MISSING_TYPE) 578 metadata: Mapping[str, Any] = field(default_factory=dict) 579 kw_only: bool = False 580 case: Optional[str] = None 581 alias: list[str] = field(default_factory=list) 582 rename: Optional[str] = None 583 skip: Optional[bool] = None 584 skip_if: Optional[Func] = None 585 skip_if_false: Optional[bool] = None 586 skip_if_default: Optional[bool] = None 587 serializer: Optional[Func] = None # Custom field serializer. 588 deserializer: Optional[Func] = None # Custom field deserializer. 589 flatten: Optional[FlattenOpts] = None 590 parent: Optional[Any] = None 591 type_args: Optional[list[str]] = None 592 593 @classmethod 594 def from_dataclass(cls, f: dataclasses.Field[T], parent: Optional[Any] = None) -> Field[T]: 595 """ 596 Create `Field` object from `dataclasses.Field`. 597 """ 598 skip_if_false_func: Optional[Func] = None 599 if f.metadata.get("serde_skip_if_false"): 600 skip_if_false_func = Func(skip_if_false, cls.mangle(f, "skip_if_false")) 601 602 skip_if_default_func: Optional[Func] = None 603 if f.metadata.get("serde_skip_if_default"): 604 skip_if_def = functools.partial(skip_if_default, default=f.default) 605 skip_if_default_func = Func(skip_if_def, cls.mangle(f, "skip_if_default")) 606 607 skip_if: Optional[Func] = None 608 if f.metadata.get("serde_skip_if"): 609 func = f.metadata.get("serde_skip_if") 610 if callable(func): 611 skip_if = Func(func, cls.mangle(f, "skip_if")) 612 613 serializer: Optional[Func] = None 614 func = f.metadata.get("serde_serializer") 615 if func: 616 serializer = Func(func, cls.mangle(f, "serializer")) 617 618 deserializer: Optional[Func] = None 619 func = f.metadata.get("serde_deserializer") 620 if func: 621 deserializer = Func(func, cls.mangle(f, "deserializer")) 622 623 flatten = f.metadata.get("serde_flatten") 624 if flatten is True: 625 flatten = FlattenOpts() 626 if flatten and not (dataclasses.is_dataclass(f.type) or is_opt_dataclass(f.type)): 627 raise SerdeError(f"pyserde does not support flatten attribute for {typename(f.type)}") 628 629 kw_only = bool(f.kw_only) if sys.version_info >= (3, 10) else False 630 631 return cls( 632 f.type, # type: ignore 633 f.name, 634 default=f.default, 635 default_factory=f.default_factory, 636 init=f.init, 637 repr=f.repr, 638 hash=f.hash, 639 compare=f.compare, 640 metadata=f.metadata, 641 rename=f.metadata.get("serde_rename"), 642 alias=f.metadata.get("serde_alias", []), 643 skip=f.metadata.get("serde_skip"), 644 skip_if=skip_if or skip_if_false_func or skip_if_default_func, 645 serializer=serializer, 646 deserializer=deserializer, 647 flatten=flatten, 648 parent=parent, 649 kw_only=kw_only, 650 ) 651 652 def to_dataclass(self) -> dataclasses.Field[T]: 653 f = dataclasses.Field( 654 default=self.default, 655 default_factory=self.default_factory, 656 init=self.init, 657 repr=self.repr, 658 hash=self.hash, 659 compare=self.compare, 660 metadata=self.metadata, 661 kw_only=self.kw_only, 662 ) 663 assert self.name 664 f.name = self.name 665 f.type = self.type 666 return f 667 668 def is_self_referencing(self) -> bool: 669 if self.type is None: 670 return False 671 if self.parent is None: 672 return False 673 return self.type == self.parent # type: ignore 674 675 @staticmethod 676 def mangle(field: dataclasses.Field[Any], name: str) -> str: 677 """ 678 Get mangled name based on field name. 679 """ 680 return f"{field.name}_{name}" 681 682 def conv_name(self, case: Optional[str] = None) -> str: 683 """ 684 Get an actual field name which `rename` and `rename_all` conversions 685 are made. Use `name` property to get a field name before conversion. 686 """ 687 return conv(self, case or self.case) 688 689 def supports_default(self) -> bool: 690 return not getattr(self, "iterbased", False) and ( 691 has_default(self) or has_default_factory(self) 692 ) 693 694 695F = TypeVar("F", bound=Field[Any]) 696 697 698def fields(field_cls: type[F], cls: type[Any], serialize_class_var: bool = False) -> list[F]: 699 """ 700 Iterate fields of the dataclass and returns `serde.core.Field`. 701 """ 702 fields = [field_cls.from_dataclass(f, parent=cls) for f in dataclass_fields(cls)] 703 704 if serialize_class_var: 705 for name, typ in get_type_hints(cls).items(): 706 if is_class_var(typ): 707 fields.append(field_cls(typ, name, default=getattr(cls, name))) 708 709 return fields # type: ignore 710 711 712def conv(f: Field[Any], case: Optional[str] = None) -> str: 713 """ 714 Convert field name. 715 """ 716 name = f.name 717 if case: 718 casef = getattr(casefy, case, None) 719 if not casef: 720 raise SerdeError( 721 f"Unkown case type: {f.case}. Pass the name of case supported by 'casefy' package." 722 ) 723 name = casef(name) 724 if f.rename: 725 name = f.rename 726 if name is None: 727 raise SerdeError("Field name is None.") 728 return name 729 730 731def union_func_name(prefix: str, union_args: Sequence[Any]) -> str: 732 """ 733 Generate a function name that contains all union types 734 735 * `prefix` prefix to distinguish between serializing and deserializing 736 * `union_args`: type arguments of a Union 737 738 >>> from ipaddress import IPv4Address 739 >>> union_func_name("union_se", [int, list[str], IPv4Address]) 740 'union_se_int_list_str__IPv4Address' 741 """ 742 return re.sub(r"[^A-Za-z0-9]", "_", f"{prefix}_{'_'.join([typename(e) for e in union_args])}") 743 744 745def literal_func_name(literal_args: Sequence[Any]) -> str: 746 """ 747 Generate a function name with all literals and corresponding types specified with Literal[...] 748 749 750 * `literal_args`: arguments of a Literal 751 752 >>> literal_func_name(["r", "w", "a", "x", "r+", "w+", "a+", "x+"]) 753 'literal_de_r_str_w_str_a_str_x_str_r__str_w__str_a__str_x__str' 754 """ 755 return re.sub( 756 r"[^A-Za-z0-9]", 757 "_", 758 f"{LITERAL_DE_PREFIX}_{'_'.join(f'{a}_{typename(type(a))}' for a in literal_args)}", 759 ) 760 761 762@dataclass(unsafe_hash=True) 763class Tagging: 764 """ 765 Controls how union is (de)serialized. This is the same concept as in 766 https://serde.rs/enum-representations.html 767 """ 768 769 class Kind(enum.Enum): 770 External = enum.auto() 771 Internal = enum.auto() 772 Adjacent = enum.auto() 773 Untagged = enum.auto() 774 775 tag: Optional[str] = None 776 content: Optional[str] = None 777 kind: Kind = Kind.External 778 779 def is_external(self) -> bool: 780 return self.kind == self.Kind.External 781 782 def is_internal(self) -> bool: 783 return self.kind == self.Kind.Internal 784 785 def is_adjacent(self) -> bool: 786 return self.kind == self.Kind.Adjacent 787 788 def is_untagged(self) -> bool: 789 return self.kind == self.Kind.Untagged 790 791 @classmethod 792 def is_taggable(cls, typ: type[Any]) -> bool: 793 return dataclasses.is_dataclass(typ) 794 795 def check(self) -> None: 796 if self.is_internal() and self.tag is None: 797 raise SerdeError('"tag" must be specified in InternalTagging') 798 if self.is_adjacent() and (self.tag is None or self.content is None): 799 raise SerdeError('"tag" and "content" must be specified in AdjacentTagging') 800 801 def produce_unique_class_name(self) -> str: 802 """ 803 Produce a unique class name for this tagging. The name is used for generated 804 wrapper dataclass and stored in `Cache`. 805 """ 806 if self.is_internal(): 807 tag = casefy.pascalcase(self.tag) # type: ignore 808 if not tag: 809 raise SerdeError('"tag" must be specified in InternalTagging') 810 return f"Internal{tag}" 811 elif self.is_adjacent(): 812 tag = casefy.pascalcase(self.tag) # type: ignore 813 content = casefy.pascalcase(self.content) # type: ignore 814 if not tag: 815 raise SerdeError('"tag" must be specified in AdjacentTagging') 816 if not content: 817 raise SerdeError('"content" must be specified in AdjacentTagging') 818 return f"Adjacent{tag}{content}" 819 else: 820 return self.kind.name 821 822 def __call__(self, cls: T) -> _WithTagging[T]: 823 return _WithTagging(cls, self) 824 825 826@overload 827def InternalTagging(tag: str) -> Tagging: ... 828 829 830@overload 831def InternalTagging(tag: str, cls: T) -> _WithTagging[T]: ... 832 833 834def InternalTagging(tag: str, cls: Optional[T] = None) -> Union[Tagging, _WithTagging[T]]: 835 tagging = Tagging(tag, kind=Tagging.Kind.Internal) 836 if cls: 837 return tagging(cls) 838 else: 839 return tagging 840 841 842@overload 843def AdjacentTagging(tag: str, content: str) -> Tagging: ... 844 845 846@overload 847def AdjacentTagging(tag: str, content: str, cls: T) -> _WithTagging[T]: ... 848 849 850def AdjacentTagging( 851 tag: str, content: str, cls: Optional[T] = None 852) -> Union[Tagging, _WithTagging[T]]: 853 tagging = Tagging(tag, content, kind=Tagging.Kind.Adjacent) 854 if cls: 855 return tagging(cls) 856 else: 857 return tagging 858 859 860ExternalTagging = Tagging() 861 862Untagged = Tagging(kind=Tagging.Kind.Untagged) 863 864 865DefaultTagging = ExternalTagging 866 867 868def ensure(expr: Any, description: str) -> None: 869 if not expr: 870 raise Exception(description) 871 872 873def should_impl_dataclass(cls: type[Any]) -> bool: 874 """ 875 Test if class doesn't have @dataclass. 876 877 `dataclasses.is_dataclass` returns True even Derived class doesn't actually @dataclass. 878 >>> @dataclasses.dataclass 879 ... class Base: 880 ... a: int 881 >>> class Derived(Base): 882 ... b: int 883 >>> dataclasses.is_dataclass(Derived) 884 True 885 886 This function tells the class actually have @dataclass or not. 887 >>> should_impl_dataclass(Base) 888 False 889 >>> should_impl_dataclass(Derived) 890 True 891 """ 892 if not dataclasses.is_dataclass(cls): 893 return True 894 895 # Checking is_dataclass is not enough in such a case that the class is inherited 896 # from another dataclass. To do it correctly, check if all fields in __annotations__ 897 # are present as dataclass fields. 898 annotations = getattr(cls, "__annotations__", {}) 899 if not annotations: 900 return False 901 902 field_names = [field.name for field in dataclass_fields(cls)] 903 for field_name, annotation in annotations.items(): 904 # Omit InitVar field because it doesn't appear in dataclass fields. 905 if is_instance(annotation, dataclasses.InitVar): 906 continue 907 # This field in __annotations__ is not a part of dataclass fields. 908 # This means this class does not implement dataclass directly. 909 if field_name not in field_names: 910 return True 911 912 # If all of the fields in __annotation__ are present as dataclass fields, 913 # the class already implemented dataclass, thus returns False. 914 return False 915 916 917@dataclass 918class TypeCheck: 919 """ 920 Specify type check flavors. 921 """ 922 923 class Kind(enum.Enum): 924 Disabled = enum.auto() 925 """ No check performed """ 926 927 Coerce = enum.auto() 928 """ Value is coerced into the declared type """ 929 Strict = enum.auto() 930 """ Value are strictly checked against the declared type """ 931 932 kind: Kind 933 934 def is_strict(self) -> bool: 935 return self.kind == self.Kind.Strict 936 937 def is_coerce(self) -> bool: 938 return self.kind == self.Kind.Coerce 939 940 def __call__(self, **kwargs: Any) -> TypeCheck: 941 # TODO 942 return self 943 944 945disabled = TypeCheck(kind=TypeCheck.Kind.Disabled) 946 947coerce = TypeCheck(kind=TypeCheck.Kind.Coerce) 948 949strict = TypeCheck(kind=TypeCheck.Kind.Strict) 950 951 952def coerce_object(cls: str, field: str, typ: type[Any], obj: Any) -> Any: 953 try: 954 return typ(obj) if is_coercible(typ, obj) else obj 955 except Exception as e: 956 raise SerdeError( 957 f"failed to coerce the field {cls}.{field} value {obj} into {typename(typ)}: {e}" 958 ) 959 960 961def is_coercible(typ: type[Any], obj: Any) -> bool: 962 if obj is None: 963 return False 964 return True 965 966 967def has_default(field: Field[Any]) -> bool: 968 """ 969 Test if the field has default value. 970 971 >>> @dataclasses.dataclass 972 ... class C: 973 ... a: int 974 ... d: int = 10 975 >>> has_default(dataclasses.fields(C)[0]) 976 False 977 >>> has_default(dataclasses.fields(C)[1]) 978 True 979 """ 980 return not isinstance(field.default, dataclasses._MISSING_TYPE) 981 982 983def has_default_factory(field: Field[Any]) -> bool: 984 """ 985 Test if the field has default factory. 986 987 >>> @dataclasses.dataclass 988 ... class C: 989 ... a: int 990 ... d: dict = dataclasses.field(default_factory=dict) 991 >>> has_default_factory(dataclasses.fields(C)[0]) 992 False 993 >>> has_default_factory(dataclasses.fields(C)[1]) 994 True 995 """ 996 return not isinstance(field.default_factory, dataclasses._MISSING_TYPE) 997 998 999class ClassSerializer(Protocol): 1000 """ 1001 Interface for custom class serializer. 1002 1003 This protocol is intended to be used for custom class serializer. 1004 1005 >>> from datetime import datetime 1006 >>> from serde import serde 1007 >>> from plum import dispatch 1008 >>> class MySerializer(ClassSerializer): 1009 ... @dispatch 1010 ... def serialize(self, value: datetime) -> str: 1011 ... return value.strftime("%d/%m/%y") 1012 """ 1013 1014 def serialize(self, value: Any) -> Any: 1015 pass 1016 1017 1018class ClassDeserializer(Protocol): 1019 """ 1020 Interface for custom class deserializer. 1021 1022 This protocol is intended to be used for custom class deserializer. 1023 1024 >>> from datetime import datetime 1025 >>> from serde import serde 1026 >>> from plum import dispatch 1027 >>> class MyDeserializer(ClassDeserializer): 1028 ... @dispatch 1029 ... def deserialize(self, cls: type[datetime], value: Any) -> datetime: 1030 ... return datetime.strptime(value, "%d/%m/%y") 1031 """ 1032 1033 def deserialize(self, cls: Any, value: Any) -> Any: 1034 pass 1035 1036 1037GLOBAL_CLASS_SERIALIZER: list[ClassSerializer] = [] 1038 1039GLOBAL_CLASS_DESERIALIZER: list[ClassDeserializer] = [] 1040 1041 1042def add_serializer(serializer: ClassSerializer) -> None: 1043 """ 1044 Register custom global serializer. 1045 """ 1046 GLOBAL_CLASS_SERIALIZER.append(serializer) 1047 1048 1049def add_deserializer(deserializer: ClassDeserializer) -> None: 1050 """ 1051 Register custom global deserializer. 1052 """ 1053 GLOBAL_CLASS_DESERIALIZER.append(deserializer)
234@dataclass 235class Scope: 236 """ 237 Container to store types and functions used in code generation context. 238 """ 239 240 cls: type[Any] 241 """ The exact class this scope is for 242 (needed to distinguish scopes between inherited classes) """ 243 244 funcs: dict[str, Callable[..., Any]] = dataclasses.field(default_factory=dict) 245 """ Generated serialize and deserialize functions """ 246 247 defaults: dict[str, Union[Callable[..., Any], Any]] = dataclasses.field(default_factory=dict) 248 """ Default values of the dataclass fields (factories & normal values) """ 249 250 code: dict[str, str] = dataclasses.field(default_factory=dict) 251 """ Generated source code (only filled when debug is True) """ 252 253 union_se_args: dict[str, list[type[Any]]] = dataclasses.field(default_factory=dict) 254 """ The union serializing functions need references to their types """ 255 256 reuse_instances_default: bool = True 257 """ Default values for to_dict & from_dict arguments """ 258 259 convert_sets_default: bool = False 260 261 def __repr__(self) -> str: 262 res: list[str] = [] 263 264 res.append("==================================================") 265 res.append(self._justify(self.cls.__name__)) 266 res.append("==================================================") 267 res.append("") 268 269 if self.code: 270 res.append("--------------------------------------------------") 271 res.append(self._justify("Functions generated by pyserde")) 272 res.append("--------------------------------------------------") 273 res.extend(list(self.code.values())) 274 res.append("") 275 276 if self.funcs: 277 res.append("--------------------------------------------------") 278 res.append(self._justify("Function references in scope")) 279 res.append("--------------------------------------------------") 280 for k, v in self.funcs.items(): 281 res.append(f"{k}: {v}") 282 res.append("") 283 284 if self.defaults: 285 res.append("--------------------------------------------------") 286 res.append(self._justify("Default values for the dataclass fields")) 287 res.append("--------------------------------------------------") 288 for k, v in self.defaults.items(): 289 res.append(f"{k}: {v}") 290 res.append("") 291 292 if self.union_se_args: 293 res.append("--------------------------------------------------") 294 res.append(self._justify("Type list by used for union serialize functions")) 295 res.append("--------------------------------------------------") 296 for k, lst in self.union_se_args.items(): 297 res.append(f"{k}: {list(lst)}") 298 res.append("") 299 300 return "\n".join(res) 301 302 def _justify(self, s: str, length: int = 50) -> str: 303 white_spaces = int((50 - len(s)) / 2) 304 return " " * (white_spaces if white_spaces > 0 else 0) + s
Container to store types and functions used in code generation context.
The exact class this scope is for (needed to distinguish scopes between inherited classes)
Default values of the dataclass fields (factories & normal values)
312def gen( 313 code: str, globals: Optional[dict[str, Any]] = None, locals: Optional[dict[str, Any]] = None 314) -> str: 315 """ 316 A wrapper of builtin `exec` function. 317 """ 318 if SETTINGS["debug"]: 319 # black formatting is only important when debugging 320 try: 321 from black import FileMode, format_str 322 323 code = format_str(code, mode=FileMode(line_length=100)) 324 except Exception: 325 pass 326 exec(code, globals, locals) 327 return code
A wrapper of builtin exec
function.
330def add_func(serde_scope: Scope, func_name: str, func_code: str, globals: dict[str, Any]) -> None: 331 """ 332 Generate a function and add it to a Scope's `funcs` dictionary. 333 334 * `serde_scope`: the Scope instance to modify 335 * `func_name`: the name of the function 336 * `func_code`: the source code of the function 337 * `globals`: global variables that should be accessible to the generated function 338 """ 339 340 code = gen(func_code, globals) 341 serde_scope.funcs[func_name] = globals[func_name] 342 343 if SETTINGS["debug"]: 344 serde_scope.code[func_name] = code
Generate a function and add it to a Scope's funcs
dictionary.
serde_scope
: the Scope instance to modifyfunc_name
: the name of the functionfunc_code
: the source code of the functionglobals
: global variables that should be accessible to the generated function
472@dataclass 473class Func: 474 """ 475 Function wrapper that provides `mangled` optional field. 476 477 pyserde copies every function reference into global scope 478 for code generation. Mangling function names is needed in 479 order to avoid name conflict in the global scope when 480 multiple fields receives `skip_if` attribute. 481 """ 482 483 inner: Callable[..., Any] 484 """ Function to wrap in """ 485 486 mangeld: str = "" 487 """ Mangled function name """ 488 489 def __call__(self, v: Any) -> None: 490 return self.inner(v) # type: ignore 491 492 @property 493 def name(self) -> str: 494 """ 495 Mangled function name 496 """ 497 return self.mangeld
Function wrapper that provides mangled
optional field.
pyserde copies every function reference into global scope
for code generation. Mangling function names is needed in
order to avoid name conflict in the global scope when
multiple fields receives skip_if
attribute.
559@dataclass 560class Field(Generic[T]): 561 """ 562 Field class is similar to `dataclasses.Field`. It provides pyserde specific options. 563 564 `type`, `name`, `default` and `default_factory` are the same members as `dataclasses.Field`. 565 """ 566 567 type: type[T] 568 """ Type of Field """ 569 name: Optional[str] 570 """ Name of Field """ 571 default: Any = field(default_factory=dataclasses._MISSING_TYPE) 572 """ Default value of Field """ 573 default_factory: Any = field(default_factory=dataclasses._MISSING_TYPE) 574 """ Default factory method of Field """ 575 init: bool = field(default_factory=dataclasses._MISSING_TYPE) 576 repr: Any = field(default_factory=dataclasses._MISSING_TYPE) 577 hash: Any = field(default_factory=dataclasses._MISSING_TYPE) 578 compare: Any = field(default_factory=dataclasses._MISSING_TYPE) 579 metadata: Mapping[str, Any] = field(default_factory=dict) 580 kw_only: bool = False 581 case: Optional[str] = None 582 alias: list[str] = field(default_factory=list) 583 rename: Optional[str] = None 584 skip: Optional[bool] = None 585 skip_if: Optional[Func] = None 586 skip_if_false: Optional[bool] = None 587 skip_if_default: Optional[bool] = None 588 serializer: Optional[Func] = None # Custom field serializer. 589 deserializer: Optional[Func] = None # Custom field deserializer. 590 flatten: Optional[FlattenOpts] = None 591 parent: Optional[Any] = None 592 type_args: Optional[list[str]] = None 593 594 @classmethod 595 def from_dataclass(cls, f: dataclasses.Field[T], parent: Optional[Any] = None) -> Field[T]: 596 """ 597 Create `Field` object from `dataclasses.Field`. 598 """ 599 skip_if_false_func: Optional[Func] = None 600 if f.metadata.get("serde_skip_if_false"): 601 skip_if_false_func = Func(skip_if_false, cls.mangle(f, "skip_if_false")) 602 603 skip_if_default_func: Optional[Func] = None 604 if f.metadata.get("serde_skip_if_default"): 605 skip_if_def = functools.partial(skip_if_default, default=f.default) 606 skip_if_default_func = Func(skip_if_def, cls.mangle(f, "skip_if_default")) 607 608 skip_if: Optional[Func] = None 609 if f.metadata.get("serde_skip_if"): 610 func = f.metadata.get("serde_skip_if") 611 if callable(func): 612 skip_if = Func(func, cls.mangle(f, "skip_if")) 613 614 serializer: Optional[Func] = None 615 func = f.metadata.get("serde_serializer") 616 if func: 617 serializer = Func(func, cls.mangle(f, "serializer")) 618 619 deserializer: Optional[Func] = None 620 func = f.metadata.get("serde_deserializer") 621 if func: 622 deserializer = Func(func, cls.mangle(f, "deserializer")) 623 624 flatten = f.metadata.get("serde_flatten") 625 if flatten is True: 626 flatten = FlattenOpts() 627 if flatten and not (dataclasses.is_dataclass(f.type) or is_opt_dataclass(f.type)): 628 raise SerdeError(f"pyserde does not support flatten attribute for {typename(f.type)}") 629 630 kw_only = bool(f.kw_only) if sys.version_info >= (3, 10) else False 631 632 return cls( 633 f.type, # type: ignore 634 f.name, 635 default=f.default, 636 default_factory=f.default_factory, 637 init=f.init, 638 repr=f.repr, 639 hash=f.hash, 640 compare=f.compare, 641 metadata=f.metadata, 642 rename=f.metadata.get("serde_rename"), 643 alias=f.metadata.get("serde_alias", []), 644 skip=f.metadata.get("serde_skip"), 645 skip_if=skip_if or skip_if_false_func or skip_if_default_func, 646 serializer=serializer, 647 deserializer=deserializer, 648 flatten=flatten, 649 parent=parent, 650 kw_only=kw_only, 651 ) 652 653 def to_dataclass(self) -> dataclasses.Field[T]: 654 f = dataclasses.Field( 655 default=self.default, 656 default_factory=self.default_factory, 657 init=self.init, 658 repr=self.repr, 659 hash=self.hash, 660 compare=self.compare, 661 metadata=self.metadata, 662 kw_only=self.kw_only, 663 ) 664 assert self.name 665 f.name = self.name 666 f.type = self.type 667 return f 668 669 def is_self_referencing(self) -> bool: 670 if self.type is None: 671 return False 672 if self.parent is None: 673 return False 674 return self.type == self.parent # type: ignore 675 676 @staticmethod 677 def mangle(field: dataclasses.Field[Any], name: str) -> str: 678 """ 679 Get mangled name based on field name. 680 """ 681 return f"{field.name}_{name}" 682 683 def conv_name(self, case: Optional[str] = None) -> str: 684 """ 685 Get an actual field name which `rename` and `rename_all` conversions 686 are made. Use `name` property to get a field name before conversion. 687 """ 688 return conv(self, case or self.case) 689 690 def supports_default(self) -> bool: 691 return not getattr(self, "iterbased", False) and ( 692 has_default(self) or has_default_factory(self) 693 )
Field class is similar to dataclasses.Field
. It provides pyserde specific options.
type
, name
, default
and default_factory
are the same members as dataclasses.Field
.
594 @classmethod 595 def from_dataclass(cls, f: dataclasses.Field[T], parent: Optional[Any] = None) -> Field[T]: 596 """ 597 Create `Field` object from `dataclasses.Field`. 598 """ 599 skip_if_false_func: Optional[Func] = None 600 if f.metadata.get("serde_skip_if_false"): 601 skip_if_false_func = Func(skip_if_false, cls.mangle(f, "skip_if_false")) 602 603 skip_if_default_func: Optional[Func] = None 604 if f.metadata.get("serde_skip_if_default"): 605 skip_if_def = functools.partial(skip_if_default, default=f.default) 606 skip_if_default_func = Func(skip_if_def, cls.mangle(f, "skip_if_default")) 607 608 skip_if: Optional[Func] = None 609 if f.metadata.get("serde_skip_if"): 610 func = f.metadata.get("serde_skip_if") 611 if callable(func): 612 skip_if = Func(func, cls.mangle(f, "skip_if")) 613 614 serializer: Optional[Func] = None 615 func = f.metadata.get("serde_serializer") 616 if func: 617 serializer = Func(func, cls.mangle(f, "serializer")) 618 619 deserializer: Optional[Func] = None 620 func = f.metadata.get("serde_deserializer") 621 if func: 622 deserializer = Func(func, cls.mangle(f, "deserializer")) 623 624 flatten = f.metadata.get("serde_flatten") 625 if flatten is True: 626 flatten = FlattenOpts() 627 if flatten and not (dataclasses.is_dataclass(f.type) or is_opt_dataclass(f.type)): 628 raise SerdeError(f"pyserde does not support flatten attribute for {typename(f.type)}") 629 630 kw_only = bool(f.kw_only) if sys.version_info >= (3, 10) else False 631 632 return cls( 633 f.type, # type: ignore 634 f.name, 635 default=f.default, 636 default_factory=f.default_factory, 637 init=f.init, 638 repr=f.repr, 639 hash=f.hash, 640 compare=f.compare, 641 metadata=f.metadata, 642 rename=f.metadata.get("serde_rename"), 643 alias=f.metadata.get("serde_alias", []), 644 skip=f.metadata.get("serde_skip"), 645 skip_if=skip_if or skip_if_false_func or skip_if_default_func, 646 serializer=serializer, 647 deserializer=deserializer, 648 flatten=flatten, 649 parent=parent, 650 kw_only=kw_only, 651 )
Create Field
object from dataclasses.Field
.
653 def to_dataclass(self) -> dataclasses.Field[T]: 654 f = dataclasses.Field( 655 default=self.default, 656 default_factory=self.default_factory, 657 init=self.init, 658 repr=self.repr, 659 hash=self.hash, 660 compare=self.compare, 661 metadata=self.metadata, 662 kw_only=self.kw_only, 663 ) 664 assert self.name 665 f.name = self.name 666 f.type = self.type 667 return f
676 @staticmethod 677 def mangle(field: dataclasses.Field[Any], name: str) -> str: 678 """ 679 Get mangled name based on field name. 680 """ 681 return f"{field.name}_{name}"
Get mangled name based on field name.
699def fields(field_cls: type[F], cls: type[Any], serialize_class_var: bool = False) -> list[F]: 700 """ 701 Iterate fields of the dataclass and returns `serde.core.Field`. 702 """ 703 fields = [field_cls.from_dataclass(f, parent=cls) for f in dataclass_fields(cls)] 704 705 if serialize_class_var: 706 for name, typ in get_type_hints(cls).items(): 707 if is_class_var(typ): 708 fields.append(field_cls(typ, name, default=getattr(cls, name))) 709 710 return fields # type: ignore
Iterate fields of the dataclass and returns serde.core.Field
.
Flatten options. Currently not used.
713def conv(f: Field[Any], case: Optional[str] = None) -> str: 714 """ 715 Convert field name. 716 """ 717 name = f.name 718 if case: 719 casef = getattr(casefy, case, None) 720 if not casef: 721 raise SerdeError( 722 f"Unkown case type: {f.case}. Pass the name of case supported by 'casefy' package." 723 ) 724 name = casef(name) 725 if f.rename: 726 name = f.rename 727 if name is None: 728 raise SerdeError("Field name is None.") 729 return name
Convert field name.
732def union_func_name(prefix: str, union_args: Sequence[Any]) -> str: 733 """ 734 Generate a function name that contains all union types 735 736 * `prefix` prefix to distinguish between serializing and deserializing 737 * `union_args`: type arguments of a Union 738 739 >>> from ipaddress import IPv4Address 740 >>> union_func_name("union_se", [int, list[str], IPv4Address]) 741 'union_se_int_list_str__IPv4Address' 742 """ 743 return re.sub(r"[^A-Za-z0-9]", "_", f"{prefix}_{'_'.join([typename(e) for e in union_args])}")
Generate a function name that contains all union types
prefix
prefix to distinguish between serializing and deserializingunion_args
: type arguments of a Union
>>> from ipaddress import IPv4Address
>>> union_func_name("union_se", [int, list[str], IPv4Address])
'union_se_int_list_str__IPv4Address'