15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247 | class AstSerialization:
def __init__(self, annotate: bool = False, backparsable: bool = False) -> None:
"""
Initializes AstSerialization with parser options.
Parameters
----------
annotate : bool, default False
If ``True``, annotates the AST tree
backparsable : bool, default False
If ``True``, AST tree is unparsable via self.unparse
If ``False``, Annotations deletes that are required for unparsing,
resulting in a neat tree
"""
self.annotate = annotate
self.backparsable = backparsable
@staticmethod
def del_keys(d: dict, keys: list) -> dict:
for key in keys:
if key in d:
del d[key]
for key, value in d.items():
if isinstance(value, dict):
AstSerialization.del_keys(value, keys)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
AstSerialization.del_keys(item, keys)
return d
@staticmethod
def add_key(d: dict, k: str, v) -> dict:
d[k] = v
for key, value in d.items():
if isinstance(value, dict):
AstSerialization.add_key(value, k, v)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
AstSerialization.add_key(item, k, v)
return d
def parse(self, source: str) -> dict:
ast_dict = ast2json(ast.parse(source))
rm_keywords = [
"col_offset",
"end_col_offset",
"end_lineno",
"lineno",
"type_comment",
"n",
"s",
"kind",
"ctx",
]
ast_dict = self.del_keys(ast_dict, rm_keywords) # remove annotations
self.ast_dict = ast_dict
if self.annotate:
self.annotate_ast()
return ast_dict
def unparse(self, ast_dict: dict = None) -> str:
ast_dict = self.add_key(ast_dict, "lineno", 0) # needed to unparse
ast_tree = json2ast(ast_dict)
source = ast.unparse(ast_tree)
return source
def dumps(self, format="yaml") -> str:
res = ""
if format == "json":
res = json.dumps(self.ast_dict, indent=4)
elif format == "yaml":
res = yaml.dump(self.ast_dict, indent=4)
return res
def annotate_ast(self) -> None:
"""Validate the root node and start annotation walk.
Raises
------
ASTNotAModule
If the parsed tree does **not** start with an ``ast.Module`` node.
"""
# todo add further veryfication
if self.ast_dict.get("_type") != "Module":
raise ASTNotAModule("root node is not a Module")
self._walk_json_ast(self.ast_dict, path=None)
def _walk_json_ast(self, node: list | dict | object, path: list) -> None:
"""Depth‑first traversal of *node* while keeping track of *path*.
Parameters
----------
node
Current AST sub‑node (``dict``, ``list`` or scalar).
path
Accumulated list of keys / indices leading from the root to *node*.
"""
if path is None:
path = []
# ------------------------------------------------------------------ #
# 1.Recursive walk
# ------------------------------------------------------------------ #
elif isinstance(node, list):
# print(f"Path: {path}")
for index, item in enumerate(node):
self._walk_json_ast(item, path + [index])
if isinstance(node, dict):
# print(f"Path: {path}")
for key, value in node.items():
self._walk_json_ast(value, path + [key])
# Primitive leaf – nothing to do
else:
# print(f"Path: {path} -> Value: {node}")
pass
# ------------------------------------------------------------------ #
# 2.Collapse handles the replacement logic to from leaf to "stem"
# ------------------------------------------------------------------ #
# This checks for the class constructor syntax in AST
# e.g "value":
# {"_type": "Call","args": [],"func": {"_type": "Name","id": "ClassA"}
if isinstance(node, dict):
if (
node.get("_type") == "Call" # A Constructor is a call
and node.get("func", {}).get("_type")
== "Name" # A Constructor is a call of type Name
and (
fid := node.get("func", {}).get("id")
) # fid is None if the path is missing and hence False
and fid[0].isupper() # only runs if fid is truthy,
# wont give TypeError/IndexError
):
ctor_node = AstSerialization._get_from_path(self.ast_dict, path)
ctor_node["__class_name__"] = fid
# self.ast_dict["__class_name__"] = fid
# print (fid)
for kw_node in node["keywords"]:
if isinstance(kw_node, dict):
if kw_node.get("_type") == "keyword":
ctor_node[kw_node["arg"]] = self._val(kw_node["value"])
if self.backparsable is False:
# slim notation
ctor_node = AstSerialization.slim_notation(ctor_node)
@staticmethod
def _val(node: list | dict | object) -> object | None:
"""Convert AST *value* nodes into primitives or nested constructor annotations.
Returns
-------
object | None
* ``int``, ``str`` … for ``Constant`` nodes;
* dotted ``str`` for ``Attribute`` chains;
* nested constructor annotations (dict) for embedded calls;
* ``None`` for values that are irrelevant / not serialisable.
"""
if isinstance(node, dict):
# todo currently f(a=t) and f(a="t") have same annotation,
# think about if this can lead to problems
t = node.get("_type")
ctor = node.get("__class_name__")
# f(a=1) :"value": {"_type": "Constant","value": 1}
if t == "Constant":
return node["value"]
# f(a=t) : "value": {"_type": "Name","id": "t"}
if t == "Name":
return node["id"]
# f(a = U.V) : "value":
# {"_type": "Attribute","attr": "V","value": {"_type": "Name","id": "U"}}
if t == "Attribute":
return AstSerialization._attr_to_str(node)
if ctor:
return AstSerialization.slim_notation(node.copy())
return None
# ------------------------------------------------------------------ #
# Attribute -> dotted string
# ------------------------------------------------------------------ #
@staticmethod
def _attr_to_str(node: dict) -> str:
"""Flatten a chain of ``Attribute``/``Name`` nodes into ``"U.V"``."""
# f(a = U.V) : "value":
# {"_type": "Attribute","attr": "V","value": {"_type": "Name","id": "U"}}
parts: list[str] = []
def walk(n):
if n["_type"] == "Attribute":
walk(n["value"])
parts.append(n["attr"])
elif n["_type"] == "Name":
parts.append(n["id"])
walk(node)
return ".".join(parts)
@staticmethod
def _get_from_path(node: list | dict | object, path: list) -> list | dict | object:
"""Return the sub‑node referenced by *path*."""
for key in path:
node = node[key]
return node
@staticmethod
def _dump_from_path(node: list | dict | object, path: list) -> str:
"""Pretty JSON dump of the sub‑node at *path* (debug helper)."""
node = AstSerialization._get_from_path(node, path)
res = json.dumps(node, indent=4)
return res
@staticmethod
def slim_notation(node: list | dict | object) -> list | dict | object:
"""pops the unnecessary parameters of a constructor
and returns slim notation node"""
for k in ("_type", "args", "func", "keywords"):
node.pop(k, None)
return node
def to_jsonld(self) -> dict:
res = {"@context": jsonld_context.awl_context["@context"], **self.ast_dict}
return res
|