|
1 | | -from __future__ import annotations |
2 | | - |
3 | | -from dataclasses import dataclass, field |
4 | | -from typing import Any, Dict, Iterable, List |
5 | | - |
6 | | -from avm2.abc.enums import MethodFlags |
7 | | -from avm2.abc.instructions import read_instruction, undefined |
8 | | -from avm2.abc.types import ABCFile, ASMethod, ASMethodBody, ASScript, ASOptionDetail |
9 | | -from avm2.io import MemoryViewReader |
10 | | -from avm2.swf.enums import DoABCTagFlags |
11 | | -from avm2.swf.types import DoABCTag, Tag, TagType |
12 | | - |
13 | | - |
14 | | -class VirtualMachine: |
15 | | - def __init__(self, abc_file: ABCFile): |
16 | | - self.abc_file = abc_file |
17 | | - self.method_bodies: Dict[int, int] = {method_body.method: i for i, method_body in enumerate(abc_file.method_bodies)} |
18 | | - |
19 | | - def execute_entry_point(self): |
20 | | - """ |
21 | | - Execute the entry point, that is the last script in ABCFile. |
22 | | - """ |
23 | | - self.execute_script(self.abc_file.scripts[-1]) |
24 | | - |
25 | | - def execute_script(self, script: ASScript): |
26 | | - """ |
27 | | - Execute the specified script. |
28 | | - """ |
29 | | - self.execute_method(script.init, this=...) # FIXME: what is `this`? Looks like a scope. |
30 | | - |
31 | | - def execute_method(self, index: int, *, this: Any, arguments: Iterable[Any] = ()): |
32 | | - """ |
33 | | - Execute the specified method. |
34 | | - """ |
35 | | - self.execute_method_body(self.method_bodies[index], this=this, arguments=arguments) |
36 | | - |
37 | | - def execute_method_body(self, index: int, *, this: Any, arguments: Iterable[Any] = ()): |
38 | | - """ |
39 | | - Execute the method body. |
40 | | - """ |
41 | | - method_body: ASMethodBody = self.abc_file.method_bodies[index] |
42 | | - method: ASMethod = self.abc_file.methods[method_body.method] |
43 | | - environment = self.create_method_environment(method, method_body, this, arguments) |
44 | | - self.execute_code(method_body.code, environment) |
45 | | - |
46 | | - def create_method_environment(self, method: ASMethod, method_body: ASMethodBody, this: Any, arguments: Iterable[Any]) -> MethodEnvironment: |
47 | | - """ |
48 | | - Create method execution environment: registers and stacks. |
49 | | - """ |
50 | | - arguments = list(arguments) |
51 | | - # There are `method_body_info.local_count` registers. |
52 | | - registers: List[Any] = [...] * method_body.local_count |
53 | | - # Register 0 holds the "this" object. This value is never null. |
54 | | - registers[0] = this |
55 | | - # Registers 1 through `method_info.param_count` holds parameter values coerced to the declared types |
56 | | - # of the parameters. |
57 | | - assert len(arguments) <= method.param_count |
58 | | - registers[1:len(arguments) + 1] = arguments |
59 | | - # If fewer than `method_body_info.local_count` values are supplied to the call then the remaining values are |
60 | | - # either the values provided by default value declarations (optional arguments) or the value `undefined`. |
61 | | - assert not method.options or len(method.options) <= method.param_count |
62 | | - for i in range(len(arguments) + 1, method_body.local_count): |
63 | | - registers[i] = self.get_optional_value(method.options[i - 1]) if i <= len(method.options) else undefined |
64 | | - # If `NEED_REST` is set in `method_info.flags`, the `method_info.param_count + 1` register is set up to |
65 | | - # reference an array that holds the superflous arguments. |
66 | | - if MethodFlags.NEED_REST in method.flags: |
67 | | - registers[method.param_count + 1] = arguments[method.param_count:] |
68 | | - # If `NEED_ARGUMENTS` is set in `method_info.flags`, the `method_info.param_count + 1` register is set up |
69 | | - # to reference an "arguments" object that holds all the actual arguments: see ECMA-262 for more |
70 | | - # information. |
71 | | - if MethodFlags.NEED_ARGUMENTS in method.flags: |
72 | | - registers[method.param_count + 1] = arguments |
73 | | - assert len(registers) == method_body.local_count |
74 | | - return MethodEnvironment(registers=registers) |
75 | | - |
76 | | - def get_optional_value(self, option: ASOptionDetail) -> Any: |
77 | | - """ |
78 | | - Get actual optional value. |
79 | | - """ |
80 | | - raise NotImplementedError('get_optional_value') |
81 | | - |
82 | | - def execute_code(self, code: memoryview, environment: MethodEnvironment): |
83 | | - """ |
84 | | - Execute the byte-code. |
85 | | - """ |
86 | | - reader = MemoryViewReader(code) |
87 | | - while True: |
88 | | - # FIXME: cache already read instructions. |
89 | | - read_instruction(reader).execute(environment) |
90 | | - |
91 | | - |
92 | | -@dataclass |
93 | | -class MethodEnvironment: |
94 | | - registers: List[Any] |
95 | | - operand_stack: List[Any] = field(default_factory=list) |
96 | | - scope_stack: List[Any] = field(default_factory=list) |
97 | | - |
98 | | - |
99 | | -def execute_tag(tag: Tag) -> VirtualMachine: |
100 | | - """ |
101 | | - Parse and execute DO_ABC tag. |
102 | | - """ |
103 | | - assert tag.type_ == TagType.DO_ABC |
104 | | - return execute_do_abc_tag(DoABCTag(tag.raw)) |
105 | | - |
106 | | - |
107 | | -def execute_do_abc_tag(do_abc_tag: DoABCTag) -> VirtualMachine: |
108 | | - """ |
109 | | - Create a virtual machine and execute the tag. |
110 | | - """ |
111 | | - return VirtualMachine(ABCFile(MemoryViewReader(do_abc_tag.abc_file))) |
0 commit comments