actor_initialization.py

actor_initialization.py
""" This example demonstrates the initialization procedure of every available actor.
"""

# pylint: disable=unused-variable
# pylint: disable=invalid-name
import argparse
import json
from concurrent.futures import Future

from geckordp.actors.accessibility.accessibility import AccessibilityActor
from geckordp.actors.accessibility.accessible import AccessibleActor
from geckordp.actors.accessibility.accessible_walker import AccessibleWalkerActor
from geckordp.actors.accessibility.parent_accessibility import ParentAccessibilityActor
from geckordp.actors.accessibility.simulator import SimulatorActor
from geckordp.actors.addon.addons import AddonsActor
from geckordp.actors.addon.web_extension_inspected_window import (
    WebExtensionInspectedWindowActor,
)
from geckordp.actors.descriptors.process import ProcessActor
from geckordp.actors.descriptors.tab import TabActor
from geckordp.actors.descriptors.web_extension import WebExtensionActor
from geckordp.actors.descriptors.worker import WorkerActor
from geckordp.actors.device import DeviceActor
from geckordp.actors.events import Events
from geckordp.actors.heap_snapshot import HeapSnapshotActor
from geckordp.actors.inspector import InspectorActor
from geckordp.actors.memory import MemoryActor
from geckordp.actors.network_content import NetworkContentActor
from geckordp.actors.network_event import NetworkEventActor
from geckordp.actors.network_parent import NetworkParentActor
from geckordp.actors.node import NodeActor
from geckordp.actors.node_list import NodeListActor
from geckordp.actors.preference import PreferenceActor
from geckordp.actors.resources import Resources
from geckordp.actors.root import RootActor
from geckordp.actors.screenshot import ScreenshotActor
from geckordp.actors.source import SourceActor
from geckordp.actors.storage import (
    CacheStorageActor,
    CookieStorageActor,
    IndexedDBStorageActor,
    LocalStorageActor,
    SessionStorageActor,
)
from geckordp.actors.string import StringActor
from geckordp.actors.target_configuration import TargetConfigurationActor
from geckordp.actors.targets.content_process import ContentProcessActor
from geckordp.actors.targets.window_global import WindowGlobalActor
from geckordp.actors.thread import ThreadActor
from geckordp.actors.thread_configuration import ThreadConfigurationActor
from geckordp.actors.walker import WalkerActor
from geckordp.actors.watcher import WatcherActor
from geckordp.actors.web_console import WebConsoleActor
from geckordp.firefox import Firefox
from geckordp.profile import ProfileManager
from geckordp.rdp_client import RDPClient

""" Uncomment to enable debug output
"""
# from geckordp.settings import GECKORDP
# GECKORDP.DEBUG = 1
# GECKORDP.DEBUG_REQUEST = 1
# GECKORDP.DEBUG_RESPONSE = 1


def main():
    # parse arguments
    parser = argparse.ArgumentParser(description="")
    parser.add_argument(
        "--host", type=str, default="localhost", help="The host to connect to"
    )
    parser.add_argument(
        "--port", type=int, default="6000", help="The port to connect to"
    )
    args, _ = parser.parse_known_args()

    # clone default profile to 'geckordp'
    pm = ProfileManager()
    profile_name = "geckordp"
    pm.clone("default-release", profile_name)
    profile = pm.get_profile_by_name(profile_name)
    profile.set_required_configs()

    # start firefox with specified profile
    Firefox.start("https://example.com/", args.port, profile_name, ["-headless"])

    # RDPClient
    ###################################################
    client = RDPClient()
    client.connect(args.host, args.port)

    # RootActor
    ###################################################
    ROOT = RootActor(client)
    root_actor_ids = ROOT.get_root()

    # DeviceActor
    ###################################################
    DEVICE = DeviceActor(client, root_actor_ids["deviceActor"])

    # ContentProcessActor
    ###################################################
    process_descriptors = ROOT.list_processes()
    for descriptor in process_descriptors:
        actor_id = descriptor["actor"]
        is_parent = descriptor["isParent"]

        PROCESS = ProcessActor(client, actor_id)
        target_ctx = PROCESS.get_target()

        CONSOLE = WebConsoleActor(client, target_ctx["consoleActor"])

        if not is_parent:
            CONTENT_PROCESS = ContentProcessActor(client, target_ctx["actor"])

    # WebExtensionActor
    ###################################################
    for descriptor in ROOT.list_addons():
        EXTENSION = WebExtensionActor(client, descriptor["actor"])

    # WorkerActor
    ###################################################
    for descriptor in ROOT.list_workers():
        WORKER = WorkerActor(client, descriptor["actor"])

    # AddonsActor
    ###################################################
    ADDONS = AddonsActor(client, root_actor_ids["addonsActor"])

    # TabActor
    ###################################################
    tab_ctx = ROOT.list_tabs()[0]
    TAB = TabActor(client, tab_ctx["actor"])
    actor_ids = TAB.get_target()

    # AccessibilityActor
    ###################################################
    ACCESSIBILITY = AccessibilityActor(client, actor_ids["accessibilityActor"])
    ACCESSIBILITY.bootstrap()

    # ParentAccessibilityActor
    ###################################################
    PARENT_ACCESSIBILITY = ParentAccessibilityActor(
        client, root_actor_ids["parentAccessibilityActor"]
    )
    PARENT_ACCESSIBILITY.bootstrap()
    PARENT_ACCESSIBILITY.enable()

    # AccessibleWalkerActor
    ###################################################
    ACCESSIBILITY_WALKER = AccessibleWalkerActor(
        client, ACCESSIBILITY.get_walker()["actor"]
    )

    # AccessibleActor
    ###################################################
    accessible_children = ACCESSIBILITY_WALKER.children()
    ACCESSIBLE = AccessibleActor(client, accessible_children[0]["actor"])

    # SimulatorActor
    ###################################################
    simulator = SimulatorActor(client, ACCESSIBILITY.get_simulator())

    # WindowGlobalActor
    ###################################################
    WEB = WindowGlobalActor(client, actor_ids["actor"])

    # WebConsoleActor
    ###################################################
    CONSOLE = WebConsoleActor(client, actor_ids["consoleActor"])
    CONSOLE.start_listeners([])

    # ThreadActor
    ###################################################
    THREAD = ThreadActor(client, actor_ids["threadActor"])
    THREAD.attach()

    # WatcherActor
    ###################################################
    watcher_ctx = TAB.get_watcher()
    WATCHER = WatcherActor(client, watcher_ctx["actor"])
    WATCHER.watch_resources(
        [
            Resources.NETWORK_EVENT,
            Resources.NETWORK_EVENT_STACKTRACE,
            Resources.DOCUMENT_EVENT,
        ]
    )

    # NetworkParentActor
    ###################################################
    network_parent_ctx = WATCHER.get_network_parent_actor()
    NETWORK_PARENT = NetworkParentActor(client, network_parent_ctx["network"]["actor"])

    # NetworkContentActor
    ###################################################
    NETWORK_CONTENT = NetworkContentActor(client, actor_ids["networkContentActor"])

    # WebExtensionInspectedWindowActor
    ###################################################
    WEB_EXT = WebExtensionInspectedWindowActor(
        client, actor_ids["webExtensionInspectedWindowActor"]
    )

    # InspectorActor
    ###################################################
    INSPECTOR = InspectorActor(client, actor_ids["inspectorActor"])

    # WalkerActor
    ###################################################
    walker_ctx = INSPECTOR.get_walker()
    WALKER = WalkerActor(client, walker_ctx["actor"])

    # NodeListActor
    ###################################################
    document = WALKER.document()
    dom_node_list = WALKER.query_selector_all(document["actor"], "body h1")
    NODE_LIST = NodeListActor(client, dom_node_list["actor"])

    # NodeActor
    ###################################################
    node_element = WALKER.query_selector(document["actor"], "body h1")["node"]
    NODE = NodeActor(client, node_element["actor"])

    # MemoryActor
    ###################################################
    MEMORY = MemoryActor(client, actor_ids["memoryActor"])
    MEMORY.attach()

    # SourceActor
    ###################################################
    for descriptor in THREAD.sources():
        if descriptor.get("actor", None) is not None:
            SOURCE = SourceActor(client, descriptor["actor"])

    # HeapSnapshotActor
    ###################################################
    SNAPSHOT = HeapSnapshotActor(client, root_actor_ids["heapSnapshotFileActor"])

    # PreferenceActor
    ###################################################
    PERFERENCE = PreferenceActor(client, root_actor_ids["preferenceActor"])

    # TargetConfigurationActor
    ###################################################
    target_config_ctx = WATCHER.get_target_configuration_actor()
    TARGET_CONFIG = TargetConfigurationActor(client, target_config_ctx["actor"])

    # ThreadConfigurationActor
    ###################################################
    thread_config_ctx = WATCHER.get_thread_configuration_actor()
    THREAD_CONFIG = ThreadConfigurationActor(client, thread_config_ctx["actor"])

    # Get global target process
    ###################################################
    global_target_ctx = {}
    target_fut = Future()

    async def on_target(data: dict):
        if "target" not in data or "browsingContextID" not in data["target"]:
            return
        if tab_ctx["browsingContextID"] == data["target"]["browsingContextID"]:
            target_fut.set_result(data["target"])

    client.add_event_listener(
        WATCHER.actor_id, Events.Watcher.TARGET_AVAILABLE_FORM, on_target
    )

    try:
        WATCHER.watch_targets(WatcherActor.Targets.FRAME)
        global_target_ctx = target_fut.result(3.0)
    finally:
        client.remove_event_listener(
            WATCHER.actor_id, Events.Watcher.TARGET_AVAILABLE_FORM, on_target
        )

    # CacheStorageActor
    ###################################################
    cache_resource = {}
    cache_fut = Future()

    async def on_cache_resource(data: dict):
        array = data.get("array", [])
        for sub_array in array:
            sub_array: list
            for i, item in enumerate(sub_array):
                item: str | list
                if isinstance(item, str) and "Cache" in item:
                    # obj[i + 1] = next item in array
                    for obj in sub_array[i + 1]:
                        obj: dict
                        if "Cache" in obj.get("actor", ""):
                            # just get the first one for example purposes
                            try:
                                cache_fut.set_result(obj)
                            except:
                                pass
                            break

    client.add_event_listener(
        global_target_ctx["actor"],
        Events.Watcher.RESOURCES_AVAILABLE_ARRAY,
        on_cache_resource,
    )

    WATCHER.watch_resources([Resources.CACHE_STORAGE])

    cache_resource = cache_fut.result(3.0)

    cache_storage_id = cache_resource.get("actor", "")
    CACHE = CacheStorageActor(client, cache_storage_id)

    # CookieStorageActor
    ###################################################
    cookie_resource = {}
    cookie_fut = Future()

    async def on_cookie_resource(data: dict):
        array = data.get("array", [])
        for sub_array in array:
            sub_array: list
            for i, item in enumerate(sub_array):
                item: str | list
                if isinstance(item, str) and "cookies" in item:
                    # obj[i + 1] = next item in array
                    for obj in sub_array[i + 1]:
                        obj: dict
                        if "cookie" in obj.get("actor", ""):
                            # just get the first one for example purposes
                            try:
                                cookie_fut.set_result(obj)
                            except:
                                pass
                            break

    client.add_event_listener(
        WATCHER.actor_id, Events.Watcher.RESOURCES_AVAILABLE_ARRAY, on_cookie_resource
    )

    WATCHER.watch_resources([Resources.COOKIE])

    cookie_resource = cookie_fut.result(3.0)

    cookie_storage_id = cookie_resource.get("actor", "")
    COOKIE = CookieStorageActor(client, cookie_storage_id)

    # ExtensionStorageActor
    ###################################################
    """ At the moment not possible.
    See also tests/actors/test_storage_extension. """

    # IndexedDBStorageActor
    ###################################################
    indexed_resource = {}
    indexed_fut = Future()

    async def on_indexed_resource(data: dict):
        array = data.get("array", [])
        for sub_array in array:
            sub_array: list
            for i, item in enumerate(sub_array):
                item: str | list
                if isinstance(item, str) and "indexed-db" in item:
                    # obj[i + 1] = next item in array
                    for obj in sub_array[i + 1]:
                        obj: dict
                        if "indexedDB" in obj.get("actor", ""):
                            # just get the first one for example purposes
                            try:
                                indexed_fut.set_result(obj)
                            except:
                                pass
                            break

    client.add_event_listener(
        WATCHER.actor_id, Events.Watcher.RESOURCES_AVAILABLE_ARRAY, on_indexed_resource
    )

    WATCHER.watch_targets(WatcherActor.Targets.FRAME)
    WATCHER.watch_resources([Resources.INDEXED_DB])

    indexed_resource = indexed_fut.result(3.0)

    indexed_storage_id = indexed_resource.get("actor", "")
    INDEXED = IndexedDBStorageActor(client, indexed_storage_id)

    # LocalStorageActor
    ###################################################
    local_resource = {}
    local_fut = Future()

    async def on_local_resource(data: dict):
        array = data.get("array", [])
        for sub_array in array:
            sub_array: list
            for i, item in enumerate(sub_array):
                item: str | list
                if isinstance(item, str) and "local-storage" in item:
                    # obj[i + 1] = next item in array
                    for obj in sub_array[i + 1]:
                        obj: dict
                        if "local" in obj.get("actor", ""):
                            # just get the first one for example purposes
                            try:
                                local_fut.set_result(obj)
                            except:
                                pass
                            break

    client.add_event_listener(
        global_target_ctx["actor"],
        Events.Watcher.RESOURCES_AVAILABLE_ARRAY,
        on_local_resource,
    )

    WATCHER.watch_resources([Resources.LOCAL_STORAGE])

    local_resource = local_fut.result(3.0)

    local_storage_id = local_resource.get("actor", "")
    LOCAL = LocalStorageActor(client, local_storage_id)

    # SessionStorageActor
    ###################################################
    session_resource = {}
    session_fut = Future()

    async def on_session_resource(data: dict):
        array = data.get("array", [])
        for sub_array in array:
            sub_array: list
            for i, item in enumerate(sub_array):
                item: str | list
                if isinstance(item, str) and "session-storage" in item:
                    # obj[i + 1] = next item in array
                    for obj in sub_array[i + 1]:
                        obj: dict
                        if "session" in obj.get("actor", ""):
                            # just get the first one for example purposes
                            try:
                                session_fut.set_result(obj)
                            except:
                                pass
                            break

    client.add_event_listener(
        global_target_ctx["actor"],
        Events.Watcher.RESOURCES_AVAILABLE_ARRAY,
        on_session_resource,
    )

    WATCHER.watch_targets(WatcherActor.Targets.FRAME)
    WATCHER.watch_resources([Resources.SESSION_STORAGE])

    session_resource = session_fut.result(3.0)

    session_storage_id = session_resource.get("actor", "")
    SESSION = SessionStorageActor(client, session_storage_id)

    # ScreenshotActor
    ###################################################
    SCREENSHOT = ScreenshotActor(client, root_actor_ids["screenshotActor"])

    # StringActor
    ###################################################

    def on_evaluation_result(data: dict):
        """
        - return a large enough string with 'evaluate_js_async()'
        to trigger a longString response from server
        """
        result = data.get("result", data)
        if not isinstance(result, dict):
            return
        if result.get("type", "") != "longString":
            return
        STRING = StringActor(client, result["actor"])
        final_string_result = STRING.substring(0, result["length"])
        print(f"final_string_result(truncated): {final_string_result:5.5}...")

    client.add_event_listener(
        CONSOLE.actor_id, Events.WebConsole.EVALUATION_RESULT, on_evaluation_result
    )

    CONSOLE.evaluate_js_async(
        """
        (() => {
            longString = '';
            for (let i = 0; i < 10000; i++) {
                longString += 'x';
            }
            return longString;
        })();
    """
    )

    # NetworkEventActor
    ###################################################

    def on_resource_available(data):
        """
        the 'actor_id' here represents a connection to a specified url or resource
        after receiving it is possible to also receive updates for this ID with 'resource-updated-form'
        if a host is visited, multiple connections are be established each with its own 'actor_id'
        see also the network tab in the developer tools to see how it works
        """
        array = data.get("array", [])
        for sub_array in array:
            sub_array: list
            for i, item in enumerate(sub_array):
                item: str | list
                if isinstance(item, str) and "network-event" in item:
                    # obj[i + 1] = next item in array
                    for obj in sub_array[i + 1]:
                        obj: dict
                        actor_id = obj.get("actor", "")
                        resource_id = obj.get("resourceId", -1)
                        url = obj.get("url", "N/A")
                        if "netEvent" in actor_id and resource_id != -1:
                            NETWORK_EVENT = NetworkEventActor(client, actor_id)
                            request = NETWORK_EVENT.get_request_headers()
                            print(
                                f"request headers:\n{json.dumps(request['headers'], indent=2)}"
                            )

    """
    - register handler and trigger it by visiting a new page
    - received actor IDs can be also stored temporary
      and initiated later
    """
    client.add_event_listener(
        watcher_ctx["actor"],
        Events.Watcher.RESOURCES_AVAILABLE_ARRAY,
        on_resource_available,
    )

    WEB.navigate_to("https://example.com/")

    input()


if __name__ == "__main__":
    main()