diff --git a/MANIFEST.in b/MANIFEST.in index 2410222d..ab83a7e0 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -2,5 +2,5 @@ exclude netsecgame/game/worlds/CYSTCoordinator.py exclude netsecgame/game/worlds/RealWorldNetSecGame.py exclude netsecgame/utils/trajectory_analysis.py exclude netsecgame/utils/actions_parser.py -exclude netsecgame/utils/gamaplay_graphs.py +exclude netsecgame/utils/gameplay_graphs.py exclude netsecgame/utils/log_parser.py diff --git a/README.md b/README.md index 4dcd348c..a334212e 100755 --- a/README.md +++ b/README.md @@ -69,7 +69,6 @@ coordinator: max_steps: 25 # timout set for the role `Attacker` goal: # Definition of the goal state description: "Exfiltrate data from Samba server to remote C&C server." - is_any_part_of_goal_random: True known_networks: [] known_hosts: [] controlled_hosts: [] @@ -87,7 +86,6 @@ coordinator: Defender: goal: description: "Block all attackers" - is_any_part_of_goal_random: False known_networks: [] known_hosts: [] controlled_hosts: [] @@ -127,7 +125,7 @@ With the configuration ready the environment can be started in selected port docker run -d --rm --name nsg-server\ -v $(pwd)/examples/example_task_configuration.yaml:/netsecgame/netsecenv_conf.yaml \ -v $(pwd)/logs:/netsecgame/logs \ - -p 9000:9000 stratosphereips/netsecgame + -p 9000:9000 stratosphereips/netsecgame \ --debug_level="INFO" ``` `--name nsg-server`: specifies the name of the container @@ -146,7 +144,7 @@ docker run -d --rm --name netsecgame-server ^ -p 9000:9000 ^ -v "%cd%\examples\example_task_configuration.yaml:/netsecgame/netsecenv_conf.yaml" ^ -v "%cd%\logs:/netsecgame/logs" ^ - stratosphereips/netsecgame:latest + stratosphereips/netsecgame:latest ^ --debug_level="INFO" ``` @@ -155,7 +153,7 @@ The environment can be started locally with from the root folder of the reposito ```bash python3 -m netsecgame.game.worlds.NetSecGame \ --task_config=./examples/example_task_configuration.yaml \ - --game_port=9000 + --game_port=9000 \ --debug_level="INFO" ``` Upon which the game server is created on `localhost:9000` to which the agents can connect to interact in the NetSecGame. @@ -164,7 +162,7 @@ Upon which the game server is created on `localhost:9000` to which the agents ca You can find user documentation at [https://stratosphereips.github.io/NetSecGame/](https://stratosphereips.github.io/NetSecGame/) ### Components of the NetSecGame Environment -The architecture of the environment can be seen [here](docs/Architecture.md). +The architecture of the environment can be seen [here](docs/architecture.md). The NetSecGame environment has several components in the following files: ``` ├── netsecgame/ @@ -176,6 +174,10 @@ The NetSecGame environment has several components in the following files: | ├── smaller_scenario_configuration.py | ├── scenario_configuration.py | ├── three_net_scenario.py +| ├── two_nets.py +| ├── two_nets_tiny.py +| | ├── two_nets_small.py +| | ├── one_net.py | ├── worlds/ | ├── NetSecGame.py # (NSG) basic simulation | ├── RealWorldNetSecGame.py # Extension of `NSG` - runs actions in the *network of the host computer* @@ -190,8 +192,9 @@ The NetSecGame environment has several components in the following files: | ├── utils/ | ├── utils.py | ├── log_parser.py -| ├── gamaplay_graphs.py +| ├── gameplay_graphs.py | ├── actions_parser.py +| ├── trajectory_recorder.py ``` #### Directory Details @@ -204,6 +207,7 @@ Modules for different world configurations: - `NetSecGame.py`: Coordinator for the Network Security Game. - `RealWorldNetSecGame.py`: Real-world NSG coordinator (actions are executed in the *real network*). - `CYSTCoordinator.py`: Coordinator for CYST-based simulations (requires CYST running). +- `WhiteBoxNetSecGame.py`: Coordinator for Whitebox NSG (full action list provided to agents). ##### **`scenarios/`** Predefined scenario configurations: @@ -211,6 +215,10 @@ Predefined scenario configurations: - `smaller_scenario_configuration.py`: A compact scenario configuration used for development and rapid testing. - `scenario_configuration.py`: The main scenario configuration. - `three_net_scenario.py`: Configuration for a three-network scenario. Used for the evaluation of the model overfitting. +- `one_net.py`: A single network scenario. +- `two_nets.py`: A two-network scenario. +- `two_nets_tiny.py`: A tiny two-network scenario. +- `two_nets_small.py`: A small two-network scenario. Implements the network game's configuration of hosts, data, services, and connections. It is taken from [CYST](https://pypi.org/project/cyst/). @@ -218,15 +226,16 @@ Implements the network game's configuration of hosts, data, services, and connec Helper modules: - `utils.py`: General-purpose utilities. - `log_parser.py`: Tools for parsing game logs. -- `gamaplay_graphs.py`: Tools for visualizing gameplay data. +- `gameplay_graphs.py`: Tools for visualizing gameplay data. - `actions_parser.py`: Parsing and analyzing game actions. +- `trajectory_recorder.py`: Tools for recording game trajectories. The [scenarios](#definition-of-the-network-topology) define the **topology** of a network (number of hosts, connections, networks, services, data, users, firewall rules, etc.) while the [task-configuration](#task-configuration) is to be used for definition of the exact task for the agent in one of the scenarios (with fix topology). - Agents compatible with the NetSecGame are located in a separate repository [NetSecGameAgents](https://github.com/stratosphereips/NetSecGameAgents/tree/main) ### Assumptions of the NetSecGame 1. NetSecGame works with the closed-world assumption. Only the defined entities exist in the simulation. -2. If the attacker does a successful action in the same step that the defender successfully detects the action, the priority goes to the defender. The reward is a penalty, and the game ends. +2. If the attacker does a successful action in the same step that the defender successfully detects the action, the priority goes to the attacker. (From commit d6d4ac9, July 18th, 2024, the new action BlockIP removes controlled hosts from the state of others. So the state can get smaller) - The action FindServices finds the new services in a host. If in a subsequent call to FindServices there are fewer services, they completely replace the list of previous services found. That is, each list of services is the final one, and no memory of previous open services is retained. @@ -238,7 +247,7 @@ The [scenarios](#definition-of-the-network-topology) define the **topology** of 4. Playing `ExfiltrateData` requires controlling **BOTH** source and target hosts 5. Playing `Find Services` can be used to discover hosts (if those have any active services) 6. Parameters of `ScanNetwork` and `FindServices` can be chosen arbitrarily (they don't have to be listed in `known_newtworks`/`known_hosts`) -7. The `BlockIP` action needs its three parameters (Source host, Target host, and Blocked host) to be in the controlled list of the Agent. +7. The `BlockIP` action needs its `source_host` and `target_host` parameters to be in the controlled list of the Agent. > [!NOTE] > The global defender, available in the previous environment versions, will not be supported in the future. To enable backward compatibility, the global defender functionality can be enabled by adding `use_global_defender: True` to the configuration YAML file in the `env` section. This option is disabled by default. @@ -319,7 +328,7 @@ This approach ensures that only repeated or excessive behavior is flagged, reduc ### Interaction with the Environment -When the game server is created, [agents](https://github.com/stratosphereips/NetSecGameAgents/tree/main) connect to it and interact with the environment. In every step of the interaction, agents submits an [Action](./AIDojoCoordinator/docs/Components.md#actions) and receive [Observation](./AIDojoCoordinator/docs/Components.md#observations) with `next_state`, `reward`, `is_terminal`, `end`, and `info` values. Once the terminal state or timeout is reached, no more interaction is possible until the agent asks for a game reset. Each agent should extend the `BaseAgent` class in [agents](https://github.com/stratosphereips/NetSecGameAgents/tree/main). +When the game server is created, [agents](https://github.com/stratosphereips/NetSecGameAgents/tree/main) connect to it and interact with the environment. In every step of the interaction, agents submits an [Action](./docs/game_components.md#netsecgame.game_components.Action) and receive [Observation](./docs/game_components.md#netsecgame.game_components.Observation) with `next_state`, `reward`, `is_terminal`, `end`, and `info` values. Once the terminal state or timeout is reached, no more interaction is possible until the agent asks for a game reset. Each agent should extend the `BaseAgent` class in [agents](https://github.com/stratosphereips/NetSecGameAgents/tree/main). ## Testing the environment diff --git a/README_pypi.md b/README_pypi.md index 692a492e..81976b6c 100644 --- a/README_pypi.md +++ b/README_pypi.md @@ -57,7 +57,6 @@ coordinator: max_steps: 25 # timeout set for the role `Attacker` goal: # Definition of the goal state description: "Exfiltrate data from Samba server to remote C&C server." - is_any_part_of_goal_random: True known_networks: [] known_hosts: [] controlled_hosts: [] @@ -75,7 +74,6 @@ coordinator: Defender: goal: description: "Block all attackers." - is_any_part_of_goal_random: False known_networks: [] known_hosts: [] controlled_hosts: [] @@ -92,7 +90,7 @@ coordinator: blocked_ips: {} known_blocks: {} -env: # Environment configuraion +env: # Environment configuration scenario: 'two_networks_tiny' # use the smallest topology for this example use_global_defender: False # Do not use global SIEM Defender use_dynamic_addresses: False # Do not randomize IP addresses diff --git a/docs/NetSecGame.md b/docs/NetSecGame.md new file mode 100644 index 00000000..975bc6a2 --- /dev/null +++ b/docs/NetSecGame.md @@ -0,0 +1,4 @@ +# NetSecGame +NetSecGame is an extension of the [`GameCoordinator`](game_coordinator.md) that implements the specific dynamics of the simulation while retaining the full functionality of the core game coordinator. + +::: netsecgame.game.worlds.NetSecGame.NetSecGame \ No newline at end of file diff --git a/docs/WhiteBoxNetSecGame.md b/docs/WhiteBoxNetSecGame.md new file mode 100644 index 00000000..286bd163 --- /dev/null +++ b/docs/WhiteBoxNetSecGame.md @@ -0,0 +1,4 @@ +# WhiteBoxNetSecGame +Whitebox version of NSG is an extension of the [`NetSecGame`](NetSecGame.md) which provides full action space for the agents upon registration in the game. This version is used for training of agents that require fixed size action space. + +::: netsecgame.game.worlds.WhiteBoxNetSecGame.WhiteBoxNetSecGame \ No newline at end of file diff --git a/docs/agent_server.md b/docs/agent_server.md index 0fba5532..97cd9d9e 100644 --- a/docs/agent_server.md +++ b/docs/agent_server.md @@ -1 +1,4 @@ +# Agent Server +The Agent Server is responsible for managing the low-level communication between the agents and the game coordinator. It handles incoming TCP connections, manages agent registration, and facilitates the asynchronous message passing required for real-time interaction in the game. + ::: netsecgame.game.agent_server.AgentServer \ No newline at end of file diff --git a/docs/architecture.md b/docs/architecture.md index 1c1fb44e..81620cdf 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -45,7 +45,7 @@ Each data instance has two parameters: Examples: ```python d1 = Data("User1", "DatabaseData") -d2 = Data("User1", "DatabaseData", size=42, type="txt", "SecretUserDatabase") +d2 = Data("User1", "DatabaseData", size=42, type="txt", description="SecretUserDatabase") ``` ### GameState @@ -74,13 +74,13 @@ The Action consists of two parts #### List of ActionTypes - **JoinGame**, params={`agent_info`:AgentInfo(``, ``)}: Used to register agent in a game with a given ``. - **QuitGame**, params={}: Used for termination of agent's interaction. -- **ResetGame**, params={`request_trajectory`:`bool` (default=`False`), `randomize_topology`=`bool` (default=`True`)}: Used for requesting reset of the game to it's initial position. If `request_trajectory = True`, the coordinator will send back the complete trajectory of the previous run in the next message. If `randomize_topology`=`True`, the agent request topology to be changed in the next episode. NOTE: the topology is changed only if (i) the `use_dynamic_ips` is set to `True` in the task configuration AND all active agents ask for the change. +- **ResetGame**, params={`request_trajectory`:`bool` (default=`False`), `randomize_topology`=`bool` (default=`True`)}: Used for requesting reset of the game to its initial position. If `request_trajectory = True`, the coordinator will send back the complete trajectory of the previous run in the next message. If `randomize_topology`=`True`, the agent request topology to be changed in the next episode. NOTE: the topology is changed only if (i) the `use_dynamic_addresses` is set to `True` in the task configuration AND all active agents ask for the change. --- - **ScanNetwork**, params{`source_host`:``, `target_network`:``}: Scans the given `` from a specified source host. Discovers ALL hosts in a network that are accessible from ``. If successful, returns set of discovered `` objects. - **FindServices**, params={`source_host`:``, `target_host`:``}: Used to discover ALL services running in the `target_host` if the host is accessible from `source_host`. If successful, returns a set of all discovered `` objects. -- **FindData**, params={`source_host`:``, `target_host`:``}: Searches `target_host` for data. If `source_host` differs from `target_host`, success depends on accessability from the `source_host`. If successful, returns a set of all discovered `` objects. +- **FindData**, params={`source_host`:``, `target_host`:``}: Searches `target_host` for data. If `source_host` differs from `target_host`, success depends on accessibility from the `source_host`. If successful, returns a set of all discovered `` objects. - **ExploitService**, params={`source_host`:``, `target_host`:``, `target_service`:``}: Exploits `target_service` in a specified `target_host`. If successful, the attacker gains control of the `target_host`. -- **ExfiltrateData**, params{`source_host`:``, `target_host`:``, `data`:``}: Copies `data` from the `source_host` to `target_host` IF both are controlled and `target_host` is accessible from `source_host`. +- **ExfiltrateData**, params{`source_host`:``, `target_host`:``, `data`:``}: Copies `data` from the `source_host` to `target_host` IF both are controlled and `target_host` is accessible from `source_host`. - **BlockIP**, params{`source_host`:``, `target_host`:``, `blocked_host`:``}: Blocks communication from/to `blocked_host` on `target_host`. Requires control of `target_host`. ### Action preconditions and effects @@ -97,8 +97,8 @@ In the following table, we describe the effects of selected actions and their pr #### Assumption and Conditions for Actions 1. When playing the `ExploitService` action, it is expected that the agent has discovered this service before (by playing `FindServices` in the `target_host` before this action) -2. The `Find Data` action finds all the available data in the host if successful. -3. The `Find Data` action requires ownership of the target host. +2. The `FindData` action finds all the available data in the host if successful. +3. The `FindData` action requires ownership of the target host. 4. Playing `ExfiltrateData` requires controlling **BOTH** source and target hosts 5. Playing `Find Services` can be used to discover hosts (if those have any active services) 6. Parameters of `ScanNetwork` and `FindServices` can be chosen arbitrarily (they don't have to be listed in `known_networks`/`known_hosts`) diff --git a/docs/config_parser.md b/docs/config_parser.md new file mode 100644 index 00000000..fff8d0b3 --- /dev/null +++ b/docs/config_parser.md @@ -0,0 +1,3 @@ +ConfigParser is a class that is responsible for parsing the YAML configuration file and providing it to the game coordinator. + +::: netsecgame.game.config_parser.ConfigParser \ No newline at end of file diff --git a/docs/configuration.md b/docs/configuration.md index ae40be5c..291325c0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -12,7 +12,7 @@ The environment part defines the properties of the environment for the task (see - `two_networks_small` - single client and 5 servers in separate local networks + remote C&C server - `two_networks` - 5 clients and 5 servers in separate local networks + remote C&C server - `three_net_scenario` - 5 clients in a local network, 5 servers split in 2 additional local networks + remote C&C server -- `save_tajectories` - if `True`, interaction of the agents is serialized and stored in a file +- `save_trajectories` - if `True`, interaction of the agents is serialized and stored in a file - `use_dynamic_addresses` - if `True`, the network and IP addresses defined in `scenario` are randomly changed at the beginning of an episode (the network topology is kept as defined in the `scenario`. Relations between networks are kept, IPs inside networks are chosen at random based on the network IP and mask). The change also depend on the input from the agents: ### Available topologies @@ -55,10 +55,10 @@ There are 5 topologies available in NSG: |Task configuration| Agent reset request | Result| |----------------------|----------------------|----------------------| -|`use_dynamic_ips = True` | `randomize_topology = True`| Changed topology | -|`use_dynamic_ips = True` | `randomize_topology = False`| SAME topology | -|`use_dynamic_ips = False` | `randomize_topology = True`| SAME topology | -|`use_dynamic_ips = False` | `randomize_topology = False`| SAME topology | +|`use_dynamic_addresses = True` | `randomize_topology = True`| Changed topology | +|`use_dynamic_addresses = True` | `randomize_topology = False`| SAME topology | +|`use_dynamic_addresses = False` | `randomize_topology = True`| SAME topology | +|`use_dynamic_addresses = False` | `randomize_topology = False`| SAME topology | In summary, the topology change (IP randomization) can't change without allowing it in the task configuration. If allowed in the task config YAML, it can still be rejected by the agents. @@ -67,7 +67,7 @@ In summary, the topology change (IP randomization) can't change without allowing - `required_players` - Minimum required players for the game to start (default 1) - `rewards`: - `success` - sets reward which agent gets when it reaches the goal (default 100) - - `fail` - sets the reward that which agent does not reach it's objective (default -10) + - `fail` - sets the reward which agent gets when it does not reach its objective (default -10) - `step` - sets reward which agent gets for every step taken (default -1) - `false_positive` - sets reward for a false positive action (default -5) - `actions` - defines the probability of success for every ActionType @@ -159,7 +159,6 @@ coordinator: max_steps: 20 goal: description: "Exfiltrate data from Samba server to remote C&C server." - is_any_part_of_goal_random: True known_networks: [] known_hosts: [] controlled_hosts: [] @@ -205,7 +204,7 @@ Example of defender configuration: blocked_ips: {} known_blocks: {} ``` -As in other agents, the description is only a text for the agent, so it can know what is supposed to do to win. In the curent implementation, the *Defender* wins, if **NO ATTACKER** reaches their goal. +As in other agents, the description is only a text for the agent, so it can know what is supposed to do to win. In the current implementation, the *Defender* wins, if **NO ATTACKER** reaches their goal. ### Trajectory storing and analysis diff --git a/docs/configuration_manager.md b/docs/configuration_manager.md index 98c22770..bd60edf6 100644 --- a/docs/configuration_manager.md +++ b/docs/configuration_manager.md @@ -1,11 +1,3 @@ -## Configuration Manager - Configuration manager is a component of the game coordinator that handles the configuration of the game. It is responsible for loading the configuration from the YAML file and providing it to the game coordinator. -::: netsecgame.game.configuration_manager.ConfigurationManager - -## ConfigParser - -ConfigParser is a class that is responsible for parsing the YAML configuration file and providing it to the game coordinator. - -::: netsecgame.game.config_parser.ConfigParser \ No newline at end of file +::: netsecgame.game.configuration_manager.ConfigurationManager \ No newline at end of file diff --git a/docs/game_components.md b/docs/game_components.md index c54cdb5a..be9f33df 100644 --- a/docs/game_components.md +++ b/docs/game_components.md @@ -1 +1,4 @@ +# Game Components +This module contains the core building blocks and data structures used throughout the NetSecGame environment. These include fundamental types such as IP addresses, networks, services, data objects, and actions. These components are used to define both the environment's state and the interactions between agents and the world. + ::: netsecgame.game_components diff --git a/docs/game_coordinator.md b/docs/game_coordinator.md index 2c639340..30b26f57 100644 --- a/docs/game_coordinator.md +++ b/docs/game_coordinator.md @@ -12,17 +12,16 @@ In detail it handles: 7. Removing agents from the game 8. Registering the GameReset requests and handling the game resets. -To facilitate the communication the coordinator uses a TCP server to which agents connect. The communication is asynchronous and depends of the +To facilitate the communication the coordinator uses a TCP server to which agents connect. The communication is asynchronous and depends on the world's implementation. ## Connection to other game components Coordinator, having the role of the middle man in all communication between the agent and the world uses several queues for message passing and handling. 1. `Action queue` is a queue in which the agents submit their actions. It provides N:1 communication channel in which the coordinator receives the inputs. -2. `Answer queues` is a separate queue **per agent** in which the results of the actions are send to the agent. +2. `Answer queues` is a separate queue **per agent** in which the results of the actions are sent to the agent. ## Episode -The episode starts with sufficient amount of agents registering in the game. Each agent role has a maximum allowed number of steps defined in the task configuration. An episode ends if all agents reach the goal +The episode starts with sufficient amount of agents registering in the game. Each agent role has a maximum allowed number of steps defined in the task configuration. An episode ends if all agents reach the goal or if the maximum number of steps is reached. -::: netsecgame.game.coordinator.GameCoordinator -::: netsecgame.game.worlds.NetSecGame.NetSecGame \ No newline at end of file +::: netsecgame.game.coordinator.GameCoordinator \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index c9ab1a13..0d37d63e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -13,7 +13,7 @@ docker build -t netsecgame:local . ``` ### Installing from source -In case you need to modify the envirment and run directly, we recommed to insall it in a virtual environemnt (Python vevn or Conda): +In case you need to modify the environment and run directly, we recommend installing it in a virtual environment (Python venv or Conda): #### Python venv 1. Create new virtual environment ```bash @@ -34,7 +34,7 @@ conda create --name aidojo python==3.12 conda activate aidojo ``` -### After preparing virutual environment, install using pip: +### After preparing virtual environment, install using pip: ```bash pip install -e . ``` @@ -56,7 +56,6 @@ coordinator: max_steps: 25 # timout set for the role `Attacker` goal: # Definition of the goal state description: "Exfiltrate data from Samba server to remote C&C server." - is_any_part_of_goal_random: True known_networks: [] known_hosts: [] controlled_hosts: [] @@ -74,7 +73,6 @@ coordinator: Defender: goal: description: "Block all attackers" - is_any_part_of_goal_random: False known_networks: [] known_hosts: [] controlled_hosts: [] @@ -91,13 +89,13 @@ coordinator: blocked_ips: {} known_blocks: {} -env: # Environment configuraion +env: # Environment configuration scenario: 'two_networks_tiny' # use the smallest topology for this example use_global_defender: False # Do not use global SIEM Defender use_dynamic_addresses: False # Do not randomize IP addresses use_firewall: True # Use firewall save_trajectories: False # Do not store trajectories - required_players: 1 # Minimal amount of agents requiered to start the game + required_players: 1 # Minimal number of agents required to start the game rewards: # Configurable reward function success: 100 step: -1 @@ -111,7 +109,7 @@ With the configuration ready the environment can be started in selected port docker run -d --rm --name nsg-server\ -v $(pwd)/examples/example_task_configuration.yaml:/netsecgame/netsecenv_conf.yaml \ -v $(pwd)/logs:/netsecgame/logs \ - -p 9000:9000 stratosphereips/netsecgame + -p 9000:9000 stratosphereips/netsecgame \ --debug_level="INFO" ``` `--name nsg-server`: specifies the name of the container @@ -122,7 +120,7 @@ docker run -d --rm --name nsg-server\ ` -p :9000`: Mapping of the port in which the server runs -`--debug_level` is an optional parameter to control the logging level `--debug_level=["DEBUG", "INFO", "WARNING", "CRITICAL"]` (defaul=`"INFO"`): +`--debug_level` is an optional parameter to control the logging level `--debug_level=["DEBUG", "INFO", "WARNING", "CRITICAL"]` (default=`"INFO"`): ##### Running on Windows (with Docker desktop) When running on Windows, Docker desktop is required. ```batch @@ -130,16 +128,16 @@ docker run -d --rm --name netsecgame-server ^ -p 9000:9000 ^ -v "%cd%\examples\example_task_configuration.yaml:/netsecgame/netsecenv_conf.yaml" ^ -v "%cd%\logs:/netsecgame/logs" ^ - stratosphereips/netsecgame:latest + stratosphereips/netsecgame:latest \ --debug_level="INFO" ``` #### Locally -The environment can be started locally with from the root folder of the repository with following command: +The environment can be started locally from the root folder of the repository with the following command: ```bash python3 -m netsecgame.game.worlds.NetSecGame \ --task_config=./examples/example_task_configuration.yaml \ - --game_port=9000 + --game_port=9000 \ --debug_level="INFO" ``` Upon which the game server is created on `localhost:9000` to which the agents can connect to interact in the NetSecGame. @@ -149,7 +147,7 @@ The NetSecGame has several components in the following files: ``` ├── netsecgame/ | ├── agents/ -| ├── base_agent.py # Basic agent class. Defines the API for agent-server communication +| ├── base_agent.py # Basic agent class. Implements the API for agent-server communication | ├── game/ | ├── scenarios/ | ├── three_net_scenario.py @@ -173,7 +171,9 @@ The NetSecGame has several components in the following files: | ├── trajectory_recorder.py | ├── trajectory_analysis.py | ├── aidojo_log_colorizer.py -| ├── gamaplay_graphs.py +| ├── gameplay_graphs.py +| ├── actions_parser.py +| ├── log_parser.py ``` Some compoments are described in detail in following sections: diff --git a/mkdocs.yml b/mkdocs.yml index 58303545..bdbbdad5 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -6,11 +6,16 @@ nav: - Home: index.md - Architecture: architecture.md - Configuration: configuration.md - - API Reference: + - Documentation: - game_components.md - - game_coordinator.md - agent_server.md - configuration_manager.md + - config_parser.md + - game_coordinator.md + - worlds: + - NetSecGame.md + - WhiteBoxNetSecGame.md + plugins: - mkdocstrings: @@ -20,11 +25,11 @@ plugins: options: heading_level: 2 show_root_heading: true - show_signature: true + show_signature: false show_source: true members_order: alphabetical merge_init_into_class: true - + filters: [] markdown_extensions: - pymdownx.arithmatex - pymdownx.superfences diff --git a/netsecgame/agents/base_agent.py b/netsecgame/agents/base_agent.py index 95142a45..d8509019 100644 --- a/netsecgame/agents/base_agent.py +++ b/netsecgame/agents/base_agent.py @@ -14,7 +14,15 @@ class BaseAgent(ABC): Basic agent for the network based NetSecGame environment. Implemenets communication with the game server. """ - def __init__(self, host, port, role:str)->None: + def __init__(self, host, port, role:AgentRole)->None: + """ + Initializes the BaseAgent and connects it to the game server. + + Args: + host (str): The host where the game server is running. + port (int): The port where the game server is running. + role (AgentRole): The assigned role of the agent (e.g., Attacker, Defender). + """ self._connection_details = (host, port) self._logger = logging.getLogger(self.__class__.__name__) self._role = role @@ -53,7 +61,8 @@ def role(self)->str: return self._role @property - def logger(self)->logging.Logger: + def logger(self) -> logging.Logger: + """Returns the logger instance for this agent.""" return self._logger def make_step(self, action: Action) -> Optional[Observation]: @@ -76,26 +85,30 @@ def make_step(self, action: Action) -> Optional[Observation]: else: return None - def communicate(self, data:Action)-> Tuple[GameStatus, Dict[str, Any], Optional[str]]: + def communicate(self, data: Action) -> Tuple[GameStatus, Dict[str, Any], Optional[str]]: """ Exchanges data with the server and returns the server's response. - This method sends an `Action` object to the server and waits for a response. + + Sends an `Action` object to the server and waits for a response. The response is expected to be a JSON-encoded string containing status, observation, and message fields. - The method returns a tuple containing the parsed status, observation, and message. + Args: - data (Action): The action to send to the server. Must be an instance of `Action`. + data (Action): The action to send to the server. + Returns: - tuple: A tuple containing: - - status (GameStatus): The status object parsed from the server response. - - observation (dict): The observation data from the server. - - message (str or None): An optional message from the server. + Tuple[GameStatus, Dict[str, Any], Optional[str]]: A tuple containing: + - status (GameStatus): The status parsed from the server response. + - observation (Dict[str, Any]): The observation data from the server. + - message (Optional[str]): An optional message from the server. + Raises: ValueError: If `data` is not of type `Action`. - ConnectionError: If the server response is incomplete or missing the end-of-message marker. - Exception: If there is an error sending data to the server. + ConnectionError: If the server response is incomplete. + Exception: If there is an error during communication. """ - def _send_data(socket, msg:str)->None: + def _send_data(socket: socket.socket, msg: str) -> None: + """Internal method to send data over the socket.""" try: self._logger.debug(f'Sending: {msg}') socket.sendall(msg.encode()) @@ -103,10 +116,8 @@ def _send_data(socket, msg:str)->None: self._logger.error(f'Exception in _send_data(): {e}') raise e - def _receive_data(socket)->Tuple[GameStatus, Dict[str, Any], Optional[str]]: - """ - Receive data from server - """ + def _receive_data(socket: socket.socket) -> Tuple[GameStatus, Dict[str, Any], Optional[str]]: + """Internal method to receive data from the socket.""" # Receive data from the server data = b"" # Initialize an empty byte string @@ -139,16 +150,12 @@ def _receive_data(socket)->Tuple[GameStatus, Dict[str, Any], Optional[str]]: _send_data(self._socket, data) return _receive_data(self._socket) - def register(self)->Optional[Observation]: + def register(self) -> Optional[Observation]: """ - Method for registering agent to the game server. - Classname is used as agent name and the role is based on the 'role' argument. - Returns initial observation if registration was successful, None otherwise. + Registers the agent with the game server. - Args: - role (str): Role of the agent, either 'attacker' or 'defender'. Returns: - Observation: Initial observation if registration was successful, None otherwise. + Optional[Observation]: Initial observation if successful, None otherwise. """ try: self._logger.info(f'Registering agent as {self.role}') diff --git a/netsecgame/game/agent_server.py b/netsecgame/game/agent_server.py index d26b2778..7ebabbd6 100644 --- a/netsecgame/game/agent_server.py +++ b/netsecgame/game/agent_server.py @@ -1,5 +1,6 @@ import logging import asyncio +from typing import Dict, Tuple from netsecgame.game_components import Action, ActionType, ProtocolConfig class AgentServer(asyncio.Protocol): @@ -13,14 +14,17 @@ class AgentServer(asyncio.Protocol): current_connections (int): Current number of connected agents. logger (logging.Logger): Logger for the AgentServer. """ - def __init__(self, actions_queue, agent_response_queues, max_connections): + def __init__(self, actions_queue: asyncio.Queue, agent_response_queues: Dict[Tuple, asyncio.Queue], max_connections: int) -> None: """ Initialize the AgentServer. Args: actions_queue (asyncio.Queue): Queue for actions from agents. - agent_response_queues (dict): Mapping of agent addresses to their response queues. + agent_response_queues (Dict[Tuple, asyncio.Queue]): Mapping of agent addresses to their response queues. max_connections (int): Maximum allowed concurrent agent connections. + + Returns: + None """ self.actions_queue = actions_queue self.answers_queues = agent_response_queues @@ -28,25 +32,31 @@ def __init__(self, actions_queue, agent_response_queues, max_connections): self.current_connections = 0 self.logger = logging.getLogger("AgentServer") - async def handle_agent_quit(self, peername:tuple): + async def handle_agent_quit(self, peername: Tuple) -> None: """ Helper function to handle agent disconnection. Args: - peername (tuple): The address of the disconnecting agent. + peername (Tuple): The address of the disconnecting agent. + + Returns: + None """ # Send a quit message to the Coordinator self.logger.info(f"\tHandling agent quit for {peername}.") quit_message = Action(ActionType.QuitGame, parameters={}).to_json() await self.actions_queue.put((peername, quit_message)) - async def handle_new_agent(self, reader, writer): + async def handle_new_agent(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: """ Handle a new agent connection. Args: reader (asyncio.StreamReader): Stream reader for the agent. writer (asyncio.StreamWriter): Stream writer for the agent. + + Returns: + None """ # get the peername of the writer peername = writer.get_extra_info("peername") @@ -113,12 +123,15 @@ async def handle_new_agent(self, reader, writer): except Exception: # swallow exceptions on close to avoid crash on cleanup pass - async def __call__(self, reader, writer): + async def __call__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: """ Allow the server instance to be called as a coroutine. Args: reader (asyncio.StreamReader): Stream reader for the agent. writer (asyncio.StreamWriter): Stream writer for the agent. + + Returns: + None """ await self.handle_new_agent(reader, writer) diff --git a/netsecgame/game/config_parser.py b/netsecgame/game/config_parser.py index efc1f126..071f2a53 100644 --- a/netsecgame/game/config_parser.py +++ b/netsecgame/game/config_parser.py @@ -6,20 +6,27 @@ import netaddr import logging from random import randint -from typing import Optional +from typing import Optional, Dict, Any, List, Set, Union from netsecgame.game_components import IP, Data, Network, Service from netsecgame.game.scenarios import SCENARIO_REGISTRY class ConfigParser(): """ - Class to deal with the configuration file of NetSecGame Coordinator - Args: - task_config_file (str|None): Path to the configuration file - config_dict (dict|None): Dictionary with configuration data + Class to deal with the configuration file of NetSecGame Coordinator. + + Provides methods to read agent-specific and environment-wide configurations + from YAML files or dictionaries. """ - def __init__(self, task_config_file:str|None=None, config_dict:dict|None=None): + def __init__(self, task_config_file:Optional[str]=None, config_dict:Optional[dict]=None)->None: """ - Initializes the configuration parser. Required either path to a confgiuration file or a dict with configuraitons. + Initializes the configuration parser. Required either path to a configuration file or a dict with configurations. + + Args: + task_config_file (Optional[str]): Path to the configuration file + config_dict (Optional[dict]): Dictionary with configuration data + + Returns: + None """ self.logger = logging.getLogger('ConfigParser') if task_config_file: @@ -29,9 +36,15 @@ def __init__(self, task_config_file:str|None=None, config_dict:dict|None=None): else: self.logger.error("You must provide either the configuration file or a dictionary with the configuration!") - def read_config_file(self, conf_file_name:str): + def read_config_file(self, conf_file_name: str) -> None: """ - reads configuration file + Reads the configuration from a YAML file. + + Args: + conf_file_name (str): Path to the configuration file. + + Returns: + None """ try: with open(conf_file_name) as source: @@ -42,7 +55,13 @@ def read_config_file(self, conf_file_name:str): def read_env_action_data(self, action_name: str) -> float: """ - Generic function to read the known data for any agent and goal of position + Reads the success probability for a specific environment action. + + Args: + action_name (str): The name of the action. + + Returns: + float: The success probability (defaults to 1.0 if not found). """ try: action_success_p = self.config['env']['actions'][action_name]['prob_success'] @@ -50,9 +69,16 @@ def read_env_action_data(self, action_name: str) -> float: action_success_p = 1 return action_success_p - def read_agents_known_data(self, type_agent: str, type_data: str) -> dict: + def read_agents_known_data(self, type_agent: str, type_data: str) -> Dict[IP, Set[Union[Data, str]]]: """ - Generic function to read the known data for any agent and goal of position + Reads the known data for a specific agent and category (goal/start_position). + + Args: + type_agent (str): The role or type of the agent. + type_data (str): The category of data (e.g., 'goal', 'start_position'). + + Returns: + Dict[IP, Set[Union[Data, str]]]: A mapping of IP addresses to sets of Data objects or 'random' keywords. """ known_data_conf = self.config['coordinator']['agents'][type_agent][type_data]['known_data'] known_data = {} @@ -75,9 +101,16 @@ def read_agents_known_data(self, type_agent: str, type_data: str) -> dict: known_data = {} return known_data - def read_agents_known_blocks(self, type_agent: str, type_data: str) -> dict: + def read_agents_known_blocks(self, type_agent: str, type_data: str) -> Dict[IP, Union[List[IP], str]]: """ - Generic function to read the known blocks for any agent and goal of position + Reads the known firewall blocks for a specific agent and category. + + Args: + type_agent (str): The role or type of the agent. + type_data (str): The category of data. + + Returns: + Dict[IP, Union[List[IP], str]]: A mapping of target IP addresses to blocked IPs or 'all_attackers'. """ known_blocks_conf = self.config["coordinator"]['agents'][type_agent][type_data]['known_blocks'] known_blocks = {} @@ -94,9 +127,16 @@ def read_agents_known_blocks(self, type_agent: str, type_data: str) -> dict: raise ValueError(f"Unsupported value in 'known_blocks': {known_blocks_conf}") return known_blocks - def read_agents_known_services(self, type_agent: str, type_data: str) -> dict: + def read_agents_known_services(self, type_agent: str, type_data: str) -> Dict[IP, List[Union[Service, str]]]: """ - Generic function to read the known services for any agent and goal of position + Reads the known services for a specific agent and category. + + Args: + type_agent (str): The role or type of the agent. + type_data (str): The category of data. + + Returns: + Dict[IP, List[Union[Service, str]]]: A mapping of IP addresses to lists of Service objects or 'random' keywords. """ known_services_conf = self.config["coordinator"]['agents'][type_agent][type_data]['known_services'] known_services = {} @@ -124,9 +164,16 @@ def read_agents_known_services(self, type_agent: str, type_data: str) -> dict: known_services = {} return known_services - def read_agents_known_networks(self, type_agent: str, type_data: str) -> set: + def read_agents_known_networks(self, type_agent: str, type_data: str) -> Set[Network]: """ - Generic function to read the known networks for any agent and goal of position + Reads the known networks for a specific agent and category. + + Args: + type_agent (str): The role or type of the agent. + type_data (str): The category of data. + + Returns: + Set[Network]: A set of known Network objects. """ known_networks_conf = self.config['coordinator']['agents'][type_agent][type_data]['known_networks'] known_networks = set() @@ -140,9 +187,16 @@ def read_agents_known_networks(self, type_agent: str, type_data: str) -> set: self.logger.error('Configuration problem with the known networks') return known_networks - def read_agents_known_hosts(self, type_agent: str, type_data: str) -> set: + def read_agents_known_hosts(self, type_agent: str, type_data: str) -> Set[Union[IP, str]]: """ - Generic function to read the known hosts for any agent and goal of position + Reads the known hosts for a specific agent and category. + + Args: + type_agent (str): The role or type of the agent. + type_data (str): The category of data. + + Returns: + Set[Union[IP, str]]: A set of host IP objects or keywords ('random', 'all_local'). """ known_hosts_conf = self.config['coordinator']['agents'][type_agent][type_data]['known_hosts'] known_hosts = set() @@ -160,9 +214,16 @@ def read_agents_known_hosts(self, type_agent: str, type_data: str) -> set: self.logger.error(f'Configuration problem with the known hosts: {e}') return known_hosts - def read_agents_controlled_hosts(self, type_agent: str, type_data: str) -> set: + def read_agents_controlled_hosts(self, type_agent: str, type_data: str) -> Set[Union[IP, str]]: """ - Generic function to read the controlled hosts for any agent and goal of position + Reads the controlled hosts for a specific agent and category. + + Args: + type_agent (str): The role or type of the agent. + type_data (str): The category of data. + + Returns: + Set[Union[IP, str]]: A set of controlled host IPs or keywords ('random', 'all_local'). """ controlled_hosts_conf = self.config['coordinator']['agents'][type_agent][type_data]['controlled_hosts'] controlled_hosts = set() @@ -180,10 +241,15 @@ def read_agents_controlled_hosts(self, type_agent: str, type_data: str) -> set: self.logger.error(f'Configuration problem with the controlled hosts: {e}') return controlled_hosts - def get_player_win_conditions(self, type_of_player:str): + def get_player_win_conditions(self, type_of_player: str) -> Dict[str, Any]: """ - Get the goal of the player - type_of_player: Can be 'attackers' or 'defenders' + Retrieves the win conditions for a specific player type. + + Args: + type_of_player (str): The player type (e.g., 'attackers', 'defenders'). + + Returns: + Dict[str, Any]: A dictionary containing goal configurations (nets, hosts, etc.). """ # Read known nets known_networks = self.read_agents_known_networks(type_of_player, 'goal') @@ -216,10 +282,15 @@ def get_player_win_conditions(self, type_of_player:str): return player_goal - def get_player_start_position(self, type_of_player:str): + def get_player_start_position(self, type_of_player: str) -> Dict[str, Any]: """ - Generate the starting position of an attacking agent - type_of_player: Can be 'attackers' or 'defenders' + Generates the starting position for a specific player type. + + Args: + type_of_player (str): The player type (e.g., 'attackers', 'defenders'). + + Returns: + Dict[str, Any]: A dictionary containing starting configuration. """ # Read known nets known_networks = self.read_agents_known_networks(type_of_player, 'start_position') @@ -245,7 +316,16 @@ def get_player_start_position(self, type_of_player:str): return player_start_position - def get_start_position(self, agent_role:str): + def get_start_position(self, agent_role: str) -> Dict[str, Any]: + """ + Returns the starting position based on the agent's role. + + Args: + agent_role (str): The role of the agent ('Attacker', 'Defender', 'Benign'). + + Returns: + Dict[str, Any]: The starting state configuration. + """ match agent_role: case "Attacker": return self.get_player_start_position(agent_role) @@ -262,8 +342,17 @@ def get_start_position(self, agent_role:str): case _: raise ValueError(f"Unsupported agent role: {agent_role}") - def get_win_conditions(self, agent_role): - match agent_role: + def get_win_conditions(self, agent_role: str) -> Dict[str, Any]: + """ + Returns the win conditions based on the agent's role. + + Args: + agent_role (str): The role of the agent. + + Returns: + Dict[str, Any]: The win conditions configuration. + """ + match agent_role: case "Attacker": return self.get_player_win_conditions(agent_role) case "Defender": @@ -281,9 +370,15 @@ def get_win_conditions(self, agent_role): case _: raise ValueError(f"Unsupported agent role: {agent_role}") - def get_max_steps(self, role=str)->Optional[int]: + def get_max_steps(self, role: str) -> Optional[int]: """ - Get the max steps based on agent's role + Retrieves the maximum steps allowed for a specific role. + + Args: + role (str): The role of the agent. + + Returns: + Optional[int]: The maximum steps, or None if no limit is set. """ try: max_steps = int(self.config['coordinator']['agents'][role]["max_steps"]) @@ -295,9 +390,15 @@ def get_max_steps(self, role=str)->Optional[int]: self.logger.warning(f"Unsupported value in 'coordinator.agents.{role}.max_steps': {e}. Setting value to default=None (no step limit)") return max_steps - def get_goal_description(self, agent_role)->str: + def get_goal_description(self, agent_role: str) -> str: """ - Get goal description per role + Retrieves the textual goal description for a specific role. + + Args: + agent_role (str): The role of the agent. + + Returns: + str: The goal description string. """ match agent_role: case "Attacker": @@ -349,8 +450,17 @@ def validate_goal_description(self, agent_role: str, description: str): f"[{agent_role}] Goal description '{description}' might be missing some actual win condition targets: {missing_elements}" ) - def get_rewards(self, reward_names:list, default_value=0)->dict: - "Reads configuration for rewards for cases listed in 'rewards_names'" + def get_rewards(self, reward_names: List[str], default_value: int = 0) -> Dict[str, Any]: + """ + Reads configuration for rewards for specific categories. + + Args: + reward_names (List[str]): List of reward keys to read from the configuration. + default_value (int): Default reward value if not specified. Defaults to 0. + + Returns: + Dict[str, Any]: A mapping of reward names to their values. + """ rewards = {} for name in reward_names: try: @@ -381,9 +491,12 @@ def get_store_trajectories(self, default_value: bool = False): store_trajectories = default_value return store_trajectories - def get_scenario(self): + def get_scenario(self) -> Any: """ - Get the scenario config objects based on the configuration. Only import objects that are selected via importlib. + Retrieves the scenario configuration objects. + + Returns: + Any: The scenario configuration (usually a list of NodeConfig, etc.). """ scenario_name = self.config['env']['scenario'] # make sure to validate the input @@ -395,9 +508,15 @@ def get_scenario(self): return SCENARIO_REGISTRY[scenario_name] - def get_seed(self, whom): + def get_seed(self, whom: str) -> int: """ - Get the seeds + Retrieves the random seed for a specific component. + + Args: + whom (str): The component name (e.g., 'coordinator', 'env'). + + Returns: + int: The random seed. """ seed = self.config[whom]['random_seed'] if seed == 'random': @@ -417,10 +536,15 @@ def get_randomize_goal_every_episode(self, default_value: bool = False) -> bool: raise DeprecationWarning("This function is deprecated.") return randomize_goal_every_episode - def get_use_firewall(self, default_value: bool = False)->bool: + def get_use_firewall(self, default_value: bool = False) -> bool: """ - Retrieves if the firewall functionality is allowed for netsecgame. - Default: False + Checks if firewall functionality is enabled. + + Args: + default_value (bool): Default value if not found. Defaults to False. + + Returns: + bool: True if firewalls should be used, False otherwise. """ try: use_firewall = self.config['env']['use_firewall'] @@ -428,14 +552,32 @@ def get_use_firewall(self, default_value: bool = False)->bool: use_firewall = default_value return use_firewall - def get_use_global_defender(self, default_value: bool = False)->bool: + def get_use_global_defender(self, default_value: bool = False) -> bool: + """ + Checks if the global defender is enabled. + + Args: + default_value (bool): Default value if not found. Defaults to False. + + Returns: + bool: True if global defender should be used, False otherwise. + """ try: use_global_defender = self.config['env']['use_global_defender'] except KeyError: use_global_defender = default_value return use_global_defender - def get_required_num_players(self, default_value: int = 1)->int: + def get_required_num_players(self, default_value: int = 1) -> int: + """ + Retrieves the required number of players. + + Args: + default_value (int): Default number of players. Defaults to 1. + + Returns: + int: The required number of players. + """ try: required_players = int(self.config['env']['required_players']) except KeyError: diff --git a/netsecgame/game/configuration_manager.py b/netsecgame/game/configuration_manager.py index 1e5b58b5..a2c1eb69 100644 --- a/netsecgame/game/configuration_manager.py +++ b/netsecgame/game/configuration_manager.py @@ -1,5 +1,5 @@ import logging -from typing import Optional, Dict, Any, List +from typing import Optional, Dict, Any, List, Iterable from aiohttp import ClientSession from netsecgame.game.config_parser import ConfigParser @@ -100,38 +100,81 @@ def _load_local_configuration(self) -> None: # Accessors # ------------------------------------------------------------------------- - def get_cyst_objects(self): + def get_cyst_objects(self) -> Iterable[Any]: + """ + Returns the loaded CYST configuration objects. + + Returns: + Any: The CYST configuration objects (usually a list of NodeConfig, etc.). + """ return self._cyst_objects def get_config_hash(self) -> Optional[str]: + """ + Returns the hash of the loaded configuration. + + Returns: + Optional[str]: The hexadecimal hash of the configuration, or None if not loaded. + """ return self._config_file_hash - def get_starting_position(self, role: str) -> dict: - """Returns the starting position configuration for a specific role.""" + def get_starting_position(self, role: str) -> Dict[str, Any]: + """Returns the starting position configuration for a specific role. + Args: + role (str): The role of the agent. + + Returns: + dict: The starting position configuration for the specified role. + """ if not self._parser: raise RuntimeError("Configuration not loaded.") return self._parser.get_start_position(agent_role=role) - def get_win_conditions(self, role: str) -> dict: - """Returns the win conditions for a specific role.""" + def get_win_conditions(self, role: str) -> Dict[str, Any]: + """Returns the win conditions for a specific role. + Args: + role (str): The role of the agent. + + Returns: + dict: The win conditions for the specified role. + """ if not self._parser: raise RuntimeError("Configuration not loaded.") return self._parser.get_win_conditions(agent_role=role) def get_goal_description(self, role: str) -> str: - """Returns the goal description for a specific role.""" + """Returns the goal description for a specific role. + Args: + role (str): The role of the agent. + + Returns: + str: The goal description for the specified role. + """ if not self._parser: raise RuntimeError("Configuration not loaded.") return self._parser.get_goal_description(agent_role=role) def get_max_steps(self, role: str) -> Optional[int]: - """Returns the max steps for a specific role.""" + """Returns the max steps for a specific role. + Args: + role (str): The role of the agent. + + Returns: + Optional[int]: The max steps for the specified role. + """ if not self._parser: raise RuntimeError("Configuration not loaded.") return self._parser.get_max_steps(role) - def get_rewards(self, reward_names: List[str] = ["step", "success", "fail", "false_positive"], default_value: int = 0) -> dict: - """Returns the rewards configuration.""" + def get_rewards(self, reward_names: List[str] = ["step", "success", "fail", "false_positive"], default_value: int = 0) -> Dict[str, Any]: + """Returns the rewards configuration. + Args: + reward_names (List[str]): The names of the rewards. + default_value (int): The default value for the rewards. + + Returns: + dict: The rewards configuration. + """ if not self._parser: raise RuntimeError("Configuration not loaded.") rewards = self._parser.get_rewards(reward_names, default_value) @@ -143,28 +186,62 @@ def get_rewards(self, reward_names: List[str] = ["step", "success", "fail", "fal self.logger.warning("Success reward is negative. This is not recommended.") return rewards - def get_use_dynamic_ips(self, default_value: bool = False) -> bool: + def get_use_dynamic_addresses(self, default_value: bool = False) -> bool: + """Returns the use dynamic addresses configuration. + Args: + default_value (bool): The default value for the use dynamic addresses. + + Returns: + bool: The use dynamic addresses configuration. + """ if not self._parser: raise RuntimeError("Configuration not loaded.") return self._parser.get_use_dynamic_addresses(default_value) def get_use_global_defender(self, default_value: bool = False) -> bool: + """Returns the use global defender configuration. + Args: + default_value (bool): The default value for the use global defender. + + Returns: + bool: The use global defender configuration. + """ if not self._parser: raise RuntimeError("Configuration not loaded.") return self._parser.get_use_global_defender(default_value) def get_required_num_players(self, default_value: int = 1) -> int: + """Returns the required number of players configuration. + + Args: + default_value (int): The default value for the required number of players. + + Returns: + int: The required number of players configuration. + """ if not self._parser: raise RuntimeError("Configuration not loaded.") return self._parser.get_required_num_players(default_value) def get_use_firewall(self, default_value: bool = True) -> bool: + """Returns the use firewall configuration. + + Args: + default_value (bool): The default value for the use firewall. + + Returns: + bool: The use firewall configuration. + """ if not self._parser: raise RuntimeError("Configuration not loaded.") return self._parser.get_use_firewall(default_value) def get_all_starting_positions(self) -> Dict[str, Any]: - """Returns starting positions for all roles.""" + """Returns starting positions for all roles. + + Returns: + Dict[str, Any]: The starting positions for all roles. + """ starting_positions = {} for agent_role in AgentRole: try: @@ -175,7 +252,11 @@ def get_all_starting_positions(self) -> Dict[str, Any]: return starting_positions def get_all_win_conditions(self) -> Dict[str, Any]: - """Returns win conditions for all roles.""" + """Returns win conditions for all roles. + + Returns: + Dict[str, Any]: The win conditions for all roles. + """ win_conditions = {} for agent_role in AgentRole: try: @@ -186,7 +267,11 @@ def get_all_win_conditions(self) -> Dict[str, Any]: return win_conditions def get_all_goal_descriptions(self) -> Dict[str, str]: - """Returns goal descriptions for all roles.""" + """Returns goal descriptions for all roles. + + Returns: + Dict[str, str]: The goal descriptions for all roles. + """ goal_descriptions = {} for agent_role in AgentRole: try: @@ -197,7 +282,12 @@ def get_all_goal_descriptions(self) -> Dict[str, str]: return goal_descriptions def get_all_max_steps(self) -> Dict[str, Optional[int]]: - """Returns max steps for all roles.""" + """ + Returns max steps for all roles. + + Returns: + Dict[str, Optional[int]]: A dictionary mapping roles to their maximum steps. + """ # Using self.get_max_steps might raise RuntimeError if checks are there, # but simpler to just call parser directly or the single accessor since we are inside the class. # However, the single accessor has the check. @@ -205,7 +295,15 @@ def get_all_max_steps(self) -> Dict[str, Optional[int]]: # Iterating over AgentRole is correct. return {role: self.get_max_steps(role) for role in AgentRole} - def get_store_trajectories(self, default_value: bool = False) -> bool: + def get_store_trajectories(self, default_value: bool = False) -> bool: + """Returns the store trajectories configuration. + + Args: + default_value (bool): The default value for the store trajectories. + + Returns: + bool: True if trajectories should be stored, False otherwise. + """ if not self._parser: raise RuntimeError("Configuration not loaded.") return self._parser.get_store_trajectories(default_value) diff --git a/netsecgame/game/coordinator.py b/netsecgame/game/coordinator.py index 5860d938..ca631046 100644 --- a/netsecgame/game/coordinator.py +++ b/netsecgame/game/coordinator.py @@ -1,23 +1,27 @@ import logging import json import asyncio -from datetime import datetime -from typing import Optional -import signal -import os import re import uuid - +import signal +import os +from datetime import datetime +from typing import Optional, Dict, Any, Set, Tuple, Coroutine from netsecgame.game_components import Action, Observation, ActionType, GameStatus, GameState, AgentStatus, AgentRole from netsecgame.game.global_defender import GlobalDefender from netsecgame.utils.utils import observation_as_dict,store_trajectories_to_jsonl from netsecgame.game.agent_server import AgentServer from netsecgame.game.configuration_manager import ConfigurationManager - -def convert_msg_dict_to_json(msg_dict: dict) -> str: +def convert_msg_dict_to_json(msg_dict: Dict[str, Any]) -> str: """ - Helper function to create text-base messge from a dictionary. Used in the Agent-Game communication. + Helper function to create text-base message from a dictionary. Used in the Agent-Game communication. + + Args: + msg_dict (Dict[str, Any]): The dictionary containing the message data. + + Returns: + str: The JSON string representation of the message. """ try: # Convert message into string representation @@ -27,9 +31,15 @@ def convert_msg_dict_to_json(msg_dict: dict) -> str: raise TypeError(f"Error when converting msg to JSON:{e}") from e return output_message -def sanitize_agent_name(name:str)->str: +def sanitize_agent_name(name: str) -> str: """ Sanitizes the agent name to be used as a filename. + + Args: + name (str): The raw agent name. + + Returns: + str: A sanitized, safe string for filenames. """ safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '_', name) safe_name = re.sub(r'_+', '_', safe_name) @@ -123,7 +133,7 @@ def __init__(self, game_host: str, game_port: int, service_host:str, service_por # trajectories per agent_addr self._agent_trajectories = {} - def _spawn_task(self, coroutine, *args, **kwargs)->asyncio.Task: + def _spawn_task(self, coroutine:Coroutine, *args:tuple, **kwargs:dict)->asyncio.Task: """ Helper function to make sure all tasks are registered for proper termination. @@ -131,7 +141,7 @@ def _spawn_task(self, coroutine, *args, **kwargs)->asyncio.Task: coroutine: The coroutine function to schedule. *args: Positional arguments to pass to the coroutine. **kwargs: Keyword arguments to pass to the coroutine. - + Returns: asyncio.Task: The created task object. """ @@ -160,9 +170,12 @@ async def create_agent_queue(self, agent_addr:tuple)->None: self._agent_response_queues[agent_addr] = asyncio.Queue() self.logger.info(f"Created queue for agent {agent_addr}. {len(self._agent_response_queues)} queues in total.") - def run(self)->None: + def run(self) -> None: """ - Wrapper for ayncio run function. Starts all tasks in AIDojo + Wrapper for asyncio run function. Starts all tasks in AIDojo. + + Returns: + None """ try: asyncio.run(self.start_tasks()) @@ -171,9 +184,12 @@ def run(self)->None: finally: self.logger.info(f"{__class__.__name__} has exited.") - async def start_tcp_server(self): + async def start_tcp_server(self) -> None: """ - Starts TPC sever for the agent communication. + Starts the TCP server for agent communication. + + Returns: + None """ server = None try: @@ -201,13 +217,12 @@ async def start_tcp_server(self): await server.wait_closed() self.logger.info("\tTCP server task stopped") - async def start_tasks(self): + async def start_tasks(self) -> None: """ - High level funciton to start all the other asynchronous tasks. - - Reads the conf of the coordinator - - Creates queues - - Start the main part of the coordinator - - Start a server that listens for agents + High level function to start all asynchronous tasks. + + Returns: + None """ loop = asyncio.get_running_loop() @@ -238,8 +253,8 @@ async def start_tasks(self): self._global_defender = GlobalDefender() else: self._global_defender = None - self._use_dynamic_ips = self.config_manager.get_use_dynamic_ips() - self.logger.info(f"Change IP every episode set to: {self._use_dynamic_ips}") + self._use_dynamic_addresses = self.config_manager.get_use_dynamic_addresses() + self.logger.info(f"Change IP every episode set to: {self._use_dynamic_addresses}") self._rewards = self.config_manager.get_rewards(["step", "success", "fail", "false_positive"]) self.logger.info(f"Rewards set to:{self._rewards}") self._min_required_players = self.config_manager.get_required_num_players() @@ -276,7 +291,7 @@ def _parse_action_message(self, agent_addr: tuple, message: str) -> Optional[Act Args: agent_addr (tuple): The address of the agent sending the message (used for logging context). message (str): The raw JSON string message received from the agent. - + Returns: Optional[Action]: The parsed Action object if successful, None otherwise. """ @@ -331,9 +346,16 @@ async def run_game(self): self._spawn_task(self._respond_on_bad_request, agent_addr, "Malformed Action") self.logger.info("\tAction processing task stopped.") - async def _respond_on_bad_request(self, agent_addr: tuple, message: str)->None: + async def _respond_on_bad_request(self, agent_addr: tuple, message: str) -> None: """ Sends a response to the agent indicating that the request was bad. + + Args: + agent_addr (tuple): The address of the agent. + message (str): The descriptive error message. + + Returns: + None """ output_message_dict = { "to_agent": agent_addr, @@ -345,13 +367,16 @@ async def _respond_on_bad_request(self, agent_addr: tuple, message: str)->None: } await self._agent_response_queues[agent_addr].put(convert_msg_dict_to_json(output_message_dict)) - async def _process_join_game_action(self, agent_addr: tuple, action: Action)->None: + async def _process_join_game_action(self, agent_addr: tuple, action: Action) -> None: """ - Method for processing Action of type ActionType.JoinGame - Inputs: - - agent_addr (tuple) - - JoinGame Action - Outputs: None (Method stores reposnse in the agent's response queue) + Processes an Action of type ActionType.JoinGame. + + Args: + agent_addr (tuple): The address of the agent. + action (Action): The JoinGame Action object. + + Returns: + None """ try: self.logger.info(f"New Join request by {agent_addr}.") @@ -417,12 +442,15 @@ async def _process_join_game_action(self, agent_addr: tuple, action: Action)->No finally: self.logger.debug(f"Cleaning up after JoinGame for {agent_addr}.") - async def _process_quit_game_action(self, agent_addr: tuple)->None: + async def _process_quit_game_action(self, agent_addr: tuple) -> None: """ - Method for processing Action of type ActionType.QuitGame - Inputs: - - agent_addr (tuple) - Outputs: None + Processes an Action of type ActionType.QuitGame. + + Args: + agent_addr (tuple): The address of the agent. + + Returns: + None """ try: if agent_addr in self._agent_states: @@ -437,12 +465,16 @@ async def _process_quit_game_action(self, agent_addr: tuple)->None: finally: self.logger.debug(f"Cleaning up after QuitGame for {agent_addr}.") - async def _process_reset_game_action(self, agent_addr: tuple, reset_action:Action)->None: + async def _process_reset_game_action(self, agent_addr: tuple, reset_action: Action) -> None: """ - Method for processing Action of type ActionType.ResetGame - Inputs: - - agent_addr (tuple) - Outputs: None + Processes an Action of type ActionType.ResetGame. + + Args: + agent_addr (tuple): The address of the agent. + reset_action (Action): The ResetGame Action object. + + Returns: + None """ self.logger.debug("Beginning the _process_reset_game_action.") async with self._reset_lock: @@ -485,13 +517,16 @@ async def _process_reset_game_action(self, agent_addr: tuple, reset_action:Actio response_msg_json = convert_msg_dict_to_json(output_message_dict) await self._agent_response_queues[agent_addr].put(response_msg_json) - async def _process_game_action(self, agent_addr: tuple, action:Action)->None: + async def _process_game_action(self, agent_addr: tuple, action: Action) -> None: """ - Method for processing Action of type ActionType.GameAction - Inputs: - - agent_addr (tuple) - - action (Action) - Outputs: None + Processes a generic game action (Scan, Exploit, etc.). + + Args: + agent_addr (tuple): The address of the agent. + action (Action): The Action object to process. + + Returns: + None """ if self._episode_ends[agent_addr]: self.logger.warning(f"Agent {agent_addr}({self.agents[agent_addr]}) is attempting to play action {action} after the end of the episode!") @@ -556,8 +591,13 @@ async def _process_game_action(self, agent_addr: tuple, action:Action)->None: response_msg_json = convert_msg_dict_to_json(output_message_dict) await self._agent_response_queues[agent_addr].put(response_msg_json) - async def _assign_rewards_episode_end(self): - """Task that waits for all agents to finish and assigns rewards.""" + async def _assign_rewards_episode_end(self) -> None: + """ + Task that waits for all agents to finish and then assigns final rewards. + + Returns: + None + """ self.logger.debug("Starting task for episode end reward assigning.") while not self.shutdown_flag.is_set(): # wait until episode is finished by all agents @@ -605,8 +645,16 @@ async def _assign_rewards_episode_end(self): self.logger.info("\tReward assignment task stopped.") - async def _handle_invalid_reset(self, error_msg:str): - """Task that handles invalid reset""" + async def _handle_invalid_reset(self, error_msg: str) -> None: + """ + Handles an invalid reset request by notifying agents and shutting down. + + Args: + error_msg (str): The error message explaining why the reset is invalid. + + Returns: + None + """ self.logger.error(error_msg) for agent in self.agents: async with self._agents_lock: @@ -621,8 +669,17 @@ async def _handle_invalid_reset(self, error_msg:str): self.shutdown_flag.set() - async def _handle_valid_reset(self, seed: Optional[int], topology_change: Optional[bool]): - """Task that handles valid reset""" + async def _handle_valid_reset(self, seed: Optional[int], topology_change: Optional[bool]) -> None: + """ + Handles a valid reset request by resetting the world and agents. + + Args: + seed (Optional[int]): The random seed to use for the new episode. + topology_change (Optional[bool]): Whether to randomize the topology. + + Returns: + None + """ self.logger.info(f"Resetting game to initial state with seed: {seed} and topology change: {topology_change}") # reset the game await self.reset(seed=seed, topology_change=topology_change) @@ -651,8 +708,13 @@ async def _handle_valid_reset(self, seed: Optional[int], topology_change: Option else: self._agent_status[agent] = AgentStatus.Playing - async def _reset_game(self): - """Task that waits for all agents to request resets""" + async def _reset_game(self) -> None: + """ + Task that waits for all agents to request resets and coordinates the process. + + Returns: + None + """ self.logger.debug("Starting task for game reset handelling.") while not self.shutdown_flag.is_set(): # wait until episode is finished by all agents @@ -706,10 +768,17 @@ async def _reset_game(self): self._reset_done_condition.notify_all() self.logger.info("\tReset game task stopped.") - def _initialize_new_player(self, agent_addr:tuple, agent_current_state:GameState, agent_current_goal_state:GameState) -> Observation: + def _initialize_new_player(self, agent_addr: tuple, agent_current_state: GameState, agent_current_goal_state: GameState) -> Observation: """ - Method to initialize new player upon joining the game. - Returns initial observation for the agent based on the agent's role + Initializes a new player's state and data upon joining the game. + + Args: + agent_addr (tuple): The address of the agent. + agent_current_state (GameState): The initial state assigned to the agent. + agent_current_goal_state (GameState): The goal state assigned to the agent. + + Returns: + Observation: The initial observation for the agent. """ self.logger.info(f"\tInitializing new player{agent_addr}") agent_name, agent_role = self.agents[agent_addr] @@ -731,24 +800,58 @@ def _initialize_new_player(self, agent_addr:tuple, agent_current_state:GameState # create initial observation return Observation(self._agent_states[agent_addr], 0, False, {}) - async def register_agent(self, agent_id:tuple, agent_role:AgentRole, agent_initial_view:dict, agent_win_condition_view:dict)->tuple[GameState, GameState]: + async def register_agent(self, agent_id: tuple, agent_role: AgentRole, agent_initial_view: Dict[str, Any], agent_win_condition_view: Dict[str, Any]) -> Tuple[GameState, GameState]: """ - Domain specific method of the environment. Creates the initial state of the agent. + Domain-specific method to register an agent and create its initial and goal states. + + Args: + agent_id (tuple): The identifier for the agent. + agent_role (AgentRole): The role of the agent. + agent_initial_view (Dict[str, Any]): The initial starting view for the agent. + agent_win_condition_view (Dict[str, Any]): The win conditions for the agent. + + Returns: + Tuple[GameState, GameState]: A tuple containing (initial_state, goal_state). """ raise NotImplementedError - async def remove_agent(self, agent_id:tuple, agent_state:GameState)->bool: + async def remove_agent(self, agent_id: tuple, agent_state: GameState) -> bool: """ - Domain specific method of the environment. Creates the initial state of the agent. + Domain-specific method to remove an agent from the environment. + + Args: + agent_id (tuple): The identifier for the agent. + agent_state (GameState): The last known state of the agent. + + Returns: + bool: True if removal was successful, False otherwise. """ raise NotImplementedError - async def reset_agent(self, agent_id:tuple, agent_role:AgentRole, agent_initial_view:dict, agent_win_condition_view:dict)->tuple[GameState, GameState]: + async def reset_agent(self, agent_id: tuple, agent_role: AgentRole, agent_initial_view: Dict[str, Any], agent_win_condition_view: Dict[str, Any]) -> Tuple[GameState, GameState]: + """ + Domain-specific method to reset an agent's state for a new episode. + + Args: + agent_id (tuple): The identifier for the agent. + agent_role (AgentRole): The role of the agent. + agent_initial_view (Dict[str, Any]): The new starting view for the agent. + agent_win_condition_view (Dict[str, Any]): The win conditions for the agent. + + Returns: + Tuple[GameState, GameState]: A tuple containing (new_state, new_goal_state). + """ raise NotImplementedError - async def _remove_agent_from_game(self, agent_addr): + async def _remove_agent_from_game(self, agent_addr: tuple) -> Dict[str, Any]: """ - Removes player from the game. Should be called AFTER QuitGame action was processed by the world. + Removes a player from the GameCoordinator's tracking and returns their final info. + + Args: + agent_addr (tuple): The address of the agent. + + Returns: + Dict[str, Any]: A dictionary containing final agent statistics and state. """ self.logger.info(f"Removing player {agent_addr} from the GameCoordinator") agent_info = {} @@ -783,10 +886,17 @@ async def _remove_agent_from_game(self, agent_addr): self.logger.info(f"\t Player {agent_addr} not present in the game!") return agent_info - async def step(self, agent_id:tuple, agent_state:GameState, action:Action): + async def step(self, agent_id: tuple, agent_state: GameState, action: Action) -> GameState: """ - Domain specific method of the environment. Creates the initial state of the agent. - Must be implemented by the domain specific environment. + Domain-specific method to perform an action in the environment. + + Args: + agent_id (tuple): The identifier for the agent. + agent_state (GameState): The current state of the agent. + action (Action): The action to perform. + + Returns: + GameState: The new state of the agent after the action. """ raise NotImplementedError @@ -800,20 +910,35 @@ async def reset(self, seed:Optional[int]=None)->bool: """ raise NotImplementedError - def _initialize(self): + def _initialize(self) -> None: """ - Initialize the game state and other necessary components. This is called at the start of the game after the configuration is loaded. - Must be implemented by the domain specific environment. + Initializes the environment state and components. + + Returns: + None """ raise NotImplementedError - def goal_check(self, agent_addr:tuple)->bool: + def goal_check(self, agent_addr: tuple) -> bool: """ - Check if the goal conditons were satisfied in a given game state + Checks if the goal conditions for specific agent were satisfied. + + Args: + agent_addr (tuple): The address of the agent. + + Returns: + bool: True if the goal is reached, False otherwise. """ - def goal_dict_satistfied(goal_dict:dict, known_dict: dict)-> bool: + def goal_dict_satistfied(goal_dict: Dict[Any, Set], known_dict: Dict[Any, Set]) -> bool: """ - Helper function for checking if a goal dictionary condition is satisfied + Helper function for checking if a goal dictionary condition is satisfied. + + Args: + goal_dict (Dict[Any, Set]): The target dictionary (IP -> set of values). + known_dict (Dict[Any, Set]): The agent's currently known values. + + Returns: + bool: True if known_dict satisfies the goal_dict. """ # check if we have all IPs that should have some values (are keys in goal_dict) if goal_dict.keys() <= known_dict.keys(): @@ -842,7 +967,16 @@ def goal_dict_satistfied(goal_dict:dict, known_dict: dict)-> bool: self.logger.debug(f"\t{goal_reached}") return all(goal_reached.values()) - def is_detected(self, agent:tuple)->bool: + def is_detected(self, agent: tuple) -> bool: + """ + Checks if the agent's last action was detected by the global defender. + + Args: + agent (tuple): The address of the agent. + + Returns: + bool: True if detected, False otherwise. + """ if self._global_defender: detection = self._global_defender.stochastic_with_threshold(self._agent_last_action[agent], self._agent_trajectories[agent]["trajectory"]["actions"]) self.logger.debug(f"Global Detection result: {detection}") @@ -851,7 +985,16 @@ def is_detected(self, agent:tuple)->bool: # No global defender return False - def is_timeout(self, agent:tuple)->bool: + def is_timeout(self, agent: tuple) -> bool: + """ + Checks if the agent has reached its maximum step limit. + + Args: + agent (tuple): The address of the agent. + + Returns: + bool: True if timeout reached, False otherwise. + """ timeout_reached = False if self._steps_limit_per_role[self.agents[agent][1]]: if self._agent_steps[agent] >= self._steps_limit_per_role[self.agents[agent][1]]: @@ -876,6 +1019,7 @@ def _update_agent_status(self, agent:tuple)->AgentStatus: Update the status of an agent based on reaching the goal, timeout or detection. Args: agent (tuple): The agent to update the status of. + Returns: AgentStatus: The new status of the agent. """ @@ -900,6 +1044,7 @@ def _update_agent_episode_end(self, agent:tuple)->bool: Update the episode end status of an agent. Args: agent (tuple): The agent to update the episode end status of. + Returns: bool: True if the episode has ended, False otherwise. """ @@ -917,7 +1062,16 @@ def _update_agent_episode_end(self, agent:tuple)->bool: episode_end = True return episode_end - def _reset_trajectory(self, agent_addr:tuple)->dict: + def _reset_trajectory(self, agent_addr: tuple) -> Dict[str, Any]: + """ + Resets and initializes a new trajectory dictionary for an agent. + + Args: + agent_addr (tuple): The address of the agent. + + Returns: + Dict[str, Any]: The initial trajectory dictionary. + """ agent_name, agent_role = self.agents[agent_addr] self.logger.debug(f"Resetting trajectory of {agent_addr}") return { @@ -931,9 +1085,19 @@ def _reset_trajectory(self, agent_addr:tuple)->dict: "agent_name":agent_name } - def _add_step_to_trajectory(self, agent_addr:tuple, action:Action, reward:float, next_state:GameState, end_reason:str|None=None)-> None: + def _add_step_to_trajectory(self, agent_addr: tuple, action: Action, reward: float, next_state: GameState, end_reason: Optional[str] = None) -> None: """ - Method for adding one step to the agent trajectory. + Adds a single step (state, action, reward) to the agent's trajectory. + + Args: + agent_addr (tuple): The address of the agent. + action (Action): The action performed. + reward (float): The reward received. + next_state (GameState): The resulting state. + end_reason (Optional[str]): An optional reason if the episode ended. + + Returns: + None """ if agent_addr in self._agent_trajectories: self.logger.debug(f"Adding step to trajectory of {agent_addr}") @@ -943,9 +1107,16 @@ def _add_step_to_trajectory(self, agent_addr:tuple, action:Action, reward:float, if end_reason: self._agent_trajectories[agent_addr]["end_reason"] = end_reason - def _store_trajectory_to_file(self, agent_addr:tuple, location="./logs/trajectories")-> None: + def _store_trajectory_to_file(self, agent_addr: tuple, location: str = "./logs/trajectories") -> None: """ - Method for storing the agent trajectory to a file. + Stores the collected trajectory for an agent to a JSONL file. + + Args: + agent_addr (tuple): The address of the agent. + location (str): The directory where the file should be saved. + + Returns: + None """ if agent_addr in self.agents: agent_name, agent_role = self.agents[agent_addr] @@ -956,9 +1127,15 @@ def _store_trajectory_to_file(self, agent_addr:tuple, location="./logs/trajector else: self.logger.warning(f"Agent {agent_addr} not found in agents list, can't store trajectory to file.") - def is_agent_benign(self, agent_addr:tuple)->bool: + def is_agent_benign(self, agent_addr: tuple) -> bool: """ - Check if the agent is benign (defender, normal) + Checks if the agent has a benign role (Defender or Benign). + + Args: + agent_addr (tuple): The address of the agent. + + Returns: + bool: True if the agent is benign, False otherwise. """ if agent_addr not in self.agents: return False diff --git a/netsecgame/game/global_defender.py b/netsecgame/game/global_defender.py index 28cf3c0c..e8f5e9b3 100644 --- a/netsecgame/game/global_defender.py +++ b/netsecgame/game/global_defender.py @@ -1,7 +1,7 @@ -# Author: Ondrej Lukas - ondrej.lukas@aic.fel.cvut.cz from itertools import groupby -from netsecgame.game_components import ActionType, Action from random import random +from typing import List, Dict, Any +from netsecgame.game_components import ActionType, Action class GlobalDefender: @@ -42,9 +42,15 @@ def __init__(self): ActionType.FindData: 2, } - def stochastic(self, action_type:ActionType)->bool: + def stochastic(self, action_type: ActionType) -> bool: """ - Simple random detection based on predefied probability and ActionType + Performs a simple random detection based on predefined probability for an ActionType. + + Args: + action_type (ActionType): The type of action to check for detection. + + Returns: + bool: True if detected, False otherwise. """ roll = random() if roll < self._DEFAULT_DETECTION_PROBS[action_type]: @@ -52,9 +58,17 @@ def stochastic(self, action_type:ActionType)->bool: else: return False - def stochastic_with_threshold(self, action: Action, episode_actions:list, tw_size:int=5)-> bool: + def stochastic_with_threshold(self, action: Action, episode_actions: List[Dict[str, Any]], tw_size: int = 5) -> bool: """ - Only detect based on set probabilities if pre-defined thresholds are crossed. + Determines detection based on set probabilities, but only if pre-defined thresholds are crossed. + + Args: + action (Action): The current action being performed. + episode_actions (List[Dict[str, Any]]): The history of actions in the current episode. + tw_size (int): The size of the time window to consider for thresholds. Defaults to 5. + + Returns: + bool: True if the action is detected, False otherwise. """ # extend the episode with the latest action # We need to copy the list before the copying, so we avoid modifying it when it is returned. Modifycation of passed list is the default behavior in Python diff --git a/netsecgame/game/scenarios/__init__.py b/netsecgame/game/scenarios/__init__.py index 459ebed7..f48114d6 100644 --- a/netsecgame/game/scenarios/__init__.py +++ b/netsecgame/game/scenarios/__init__.py @@ -5,6 +5,8 @@ one_net, three_net_scenario, two_nets, + two_nets_tiny, + two_nets_small, ) # Static Registry @@ -15,4 +17,6 @@ "one_network": one_net.configuration_objects, "three_net_scenario": three_net_scenario.configuration_objects, "two_networks": two_nets.configuration_objects, + "two_networks_tiny": two_nets_tiny.configuration_objects, + "two_networks_small": two_nets_small.configuration_objects, } \ No newline at end of file diff --git a/netsecgame/game/worlds/CYSTCoordinator.py b/netsecgame/game/worlds/CYSTCoordinator.py index 0e023ba6..788eace2 100644 --- a/netsecgame/game/worlds/CYSTCoordinator.py +++ b/netsecgame/game/worlds/CYSTCoordinator.py @@ -20,6 +20,7 @@ def get_starting_position_from_cyst_config(cyst_objects): Args: cyst_objects (list): List of CYST configuration objects. + Returns: dict: A dictionary mapping agent identifiers to their starting known hosts and networks. """ diff --git a/netsecgame/game/worlds/NetSecGame.py b/netsecgame/game/worlds/NetSecGame.py index 684b6039..6539d011 100644 --- a/netsecgame/game/worlds/NetSecGame.py +++ b/netsecgame/game/worlds/NetSecGame.py @@ -9,7 +9,7 @@ import json from faker import Faker from pathlib import Path -from typing import Iterable, Any, Set, Dict, Optional +from typing import Iterable, Any, Set, Dict, Optional, Tuple, List from collections import defaultdict from netsecgame.game_components import GameState, Action, ActionType, IP, Network, Data, Service, AgentRole @@ -18,9 +18,16 @@ from netsecgame.utils.utils import get_logging_level -def state_parts_deep_copy(state:GameState)->tuple: +def state_parts_deep_copy(state: GameState) -> Tuple[Set[Network], Set[IP], Set[IP], Dict[IP, Set[Service]], Dict[IP, Set[Data]], Dict[IP, Set[IP]]]: """ Deep copies the relevant parts of the GameState. + + Args: + state (GameState): The game state to copy from. + + Returns: + Tuple[Set[Network], Set[IP], Set[IP], Dict[IP, Set[Service]], Dict[IP, Set[Data]], Dict[IP, Set[IP]]]: + The copied parts (known_networks, known_hosts, controlled_hosts, known_services, known_data, known_blocks). """ new_nets = copy.deepcopy(state.known_networks) new_known_h = copy.deepcopy(state.known_hosts) @@ -32,15 +39,24 @@ def state_parts_deep_copy(state:GameState)->tuple: class NetSecGame(GameCoordinator): - def __init__(self, game_host, game_port, task_config:str, seed=None): + def __init__(self, game_host: str, game_port: int, task_config: str, seed: Optional[int] = None): + """ + Initializes the NetSecGame world. + + Args: + game_host (str): The host for the game server. + game_port (int): The port for the game server. + task_config (str): Path to the task configuration file. + seed (Optional[int]): Random seed for reproducibility. + """ super().__init__(game_host, game_port, service_host=None, service_port=None, task_config_file=task_config) # Internal data structure of the NSG self._ip_to_hostname = {} # Mapping of `IP`:`host_name`(str) of all nodes in the environment self._networks = {} # A `dict` of the networks present in the environment. Keys: `Network` objects, values `set` of `IP` objects. self._services = {} # Dict of all services in the environment. Keys: hostname (`str`), values: `set` of `Service` objetcs. - self._data = {} # Dict of all services in the environment. Keys: hostname (`str`), values `set` of `Service` objetcs. - self._data_content = {} # ??? Not sure. Added by by sebas to fix error in reading config file + self._data_content = {} + self._data = {} self._firewall = {} # dict of all the allowed connections in the environment. Keys `IP` ,values: `set` of `IP` objects. self._fw_blocks = {} self._agent_fw_rules = {} @@ -60,6 +76,9 @@ def _set_random_seed(self, seed)->None: Args: seed (int): The random seed to set. + + Returns: + None """ self._seed = seed if seed is not None: @@ -86,8 +105,8 @@ def _initialize(self)->None: # Load CYST configuration if self._cyst_objects is not None: self._process_cyst_config(self._cyst_objects) - # Check if dynamic network and ip adddresses are required - if self._use_dynamic_ips: + # Check if dynamic network and ip addresses are required + if self._use_dynamic_addresses: self.logger.info("Dynamic change of the IP and network addresses enabled") self._faker_object = Faker() Faker.seed(self._seed) @@ -104,9 +123,10 @@ def _get_hosts_from_view(self, view_hosts:Iterable, allowed_hosts=None)->Set[IP] Parses view and translates all keywords. Produces set of controlled host (IP) Args: view_hosts (Iterable): The view containing host information. - allowed_hosts (list, optional): A list of host to start from if 'random' is specified. Defaults to None. + allowed_hosts (Optional[List[IP]]): A list of hosts to start from if 'random' is specified. + Returns: - set: A set of controlled hosts. + Set[IP]: A set of controlled hosts. """ hosts = set() self.logger.debug(f'\tParsing hosts from view: {view_hosts}') @@ -138,10 +158,10 @@ def _get_services_from_view(self, view_known_services:dict)->Dict[IP, Set[Servic Parses view and translates all keywords. Produces dict of known services {IP: set(Service)} Args: - view_known_services (dict): The view containing known services information. + view_known_services (Dict[IP, Iterable]): The view containing known services information. Returns: - dict: A dictionary mapping IP addresses to sets of known services. + Dict[IP, Set[Service]]: A dictionary mapping IP addresses to sets of known services. """ # TODO: Add keyword scope parameter (like in _get_data_from_view) known_services = {} @@ -177,11 +197,12 @@ def _get_data_from_view(self, view_known_data:dict, keyword_scope:str="host", ex Parses view and translates all keywords. Produces dict of known data {IP: set(Data)} Args: - view_known_data (dict): The view containing known data information. - keyword_scope (str, optional): Scope of keywords like 'random' or 'all'. Defaults to "host" (i.e., only data from the specified host are considered). - exclude_types (list, optional): List of data types to exclude when selecting data. Defaults to ["log"]. + view_known_data (Dict[IP, Iterable]): The view containing known data information. + keyword_scope (str): Scope of keywords like 'random' or 'all'. Defaults to "host". + exclude_types (List[str]): List of data types to exclude. Defaults to ["log"]. + Returns: - dict: A dictionary mapping IP addresses to sets of known data. + Dict[IP, Set[Data]]: A dictionary mapping IP addresses to sets of known data. """ known_data = {} for ip, data_list in view_known_data.items(): @@ -228,8 +249,9 @@ def _get_networks_from_view(self, view_known_networks:Iterable)->Set[Network]: Parses view and translates all keywords. Produces set of known networks (Network). Args: view_known_networks (Iterable): The view containing known networks information. + Returns: - set: A set of known networks. + Set[Network]: A set of known networks. """ known_networks = set() for net in view_known_networks: @@ -255,8 +277,9 @@ def _create_goal_state_from_view(self, view:dict, allowed_hosts=None)->GameState """ Builds a GameState from given view (dict). All keywords are replaced by valid options. Args: - view (dict): The view containing goal state information. - allowed_hosts (set, optional): A set of allowed hosts for random selection. Defaults to None. + view (Dict[str, Any]): The view containing goal state information. + allowed_hosts (Optional[Set[IP]]): A set of allowed hosts for random selection. + Returns: GameState: The generated goal state. """ @@ -280,7 +303,12 @@ def _create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->Ga Builds a GameState from given view. If there is a keyword 'random' used, it is replaced by a valid option at random. - Currently, we artificially extend the knonw_networks with +- 1 in the third octet. + Args: + view (Dict[str, Any]): The view containing state information. + add_neighboring_nets (bool): Whether to add neighboring networks. + + Returns: + GameState: The generated state. """ self.logger.info(f'Generating state from view:{view}') # re-map all networks based on current mapping in self._network_mapping @@ -325,9 +353,15 @@ def _create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->Ga self.logger.info(f"Generated GameState:{game_state}") return game_state - def _process_cyst_config(self, configuration_objects:list[Any])-> None: + def _process_cyst_config(self, configuration_objects: List[Any]) -> None: """ - Process the cyst configuration file + Processes the CYST configuration objects to set up the environment. + + Args: + configuration_objects (List[Any]): List of configuration objects from CYST. + + Returns: + None """ nodes = [] node_to_id = {} @@ -347,7 +381,13 @@ def _process_cyst_config(self, configuration_objects:list[Any])-> None: elif isinstance(o, ExploitConfig): exploits.append(o) - def process_node_config(node_obj:NodeConfig) -> None: + def process_node_config(node_obj: NodeConfig) -> None: + """ + Processes a single NodeConfig object. + + Args: + node_obj (NodeConfig): The node configuration to process. + """ self.logger.info(f"\tProcessing config of node '{node_obj.id}'") #save the complete object node_objects[node_obj.id] = node_obj @@ -397,7 +437,16 @@ def process_node_config(node_obj:NodeConfig) -> None: # Service does not contain any data pass - def process_router_config(router_obj:RouterConfig)->None: + def process_router_config(router_obj: RouterConfig) -> Optional[bool]: + """ + Processes a single RouterConfig object. + + Args: + router_obj (RouterConfig): The router configuration to process. + + Returns: + Optional[bool]: False if the router should be skipped, None otherwise. + """ self.logger.info(f"\tProcessing config of router '{router_obj.id}'") # Process a router # Add the router to the list of nodes. This goes @@ -427,7 +476,13 @@ def process_router_config(router_obj:RouterConfig)->None: for rule in chain.rules: fw_rules.append(rule) - def process_firewall()->dict: + def process_firewall() -> Dict[IP, Set[IP]]: + """ + Processes firewall rules and generates the connectivity mapping. + + Returns: + Dict[IP, Set[IP]]: Mapping of IP to sets of allowed destination IPs. + """ # process firewall rules all_ips = set() for ips in self._networks.values(): @@ -508,8 +563,9 @@ def _dynamic_ip_change(self, max_attempts:int=10, seed=None)-> None: """ Changes the IP and network addresses in the environment Args: - max_attempts (int, optional): Maximum number of attempts to find a valid mapping. Defaults to 10. - seed (int, optional): Seed for random number generator. Defaults to None. + max_attempts (int): Maximum number of attempts to find a valid mapping. + seed (Optional[int]): Seed for random number generator. + Returns: None """ @@ -633,10 +689,17 @@ def replacer(match): self._ip_mapping[ip] = mapping_ips[mapping] self.logger.debug(f"self._ip_mapping: {self._ip_mapping}") - def _create_new_network_mapping(self, max_attempts: int = 10, seed=None) -> tuple[Dict[Network, Network], Dict[IP, IP]]: - """ + def _create_new_network_mapping(self, max_attempts: int = 10, seed: Optional[int] = None) -> Tuple[Dict[Network, Network], Dict[IP, IP]]: + """ Generates new network addresses (preserving relative distance between networks) and maps host IPs by preserving their relative offset within the subnet. + + Args: + max_attempts (int): Maximum number of mapping attempts. + seed (Optional[int]): Random seed. + + Returns: + Tuple[Dict[Network, Network], Dict[IP, IP]]: The network and IP mappings. """ #self.logger.info(f"Generating new network and IP address mapping with seed {seed} (max attempts: {max_attempts})") @@ -763,9 +826,16 @@ def _create_new_network_mapping(self, max_attempts: int = 10, seed=None) -> tupl self.logger.info(f"Mapping IPs done: {mapping_ips}") return mapping_nets, mapping_ips - def _get_services_from_host(self, host_ip:str, controlled_hosts:set)-> Set[Service]: + def _get_services_from_host(self, host_ip: IP, controlled_hosts: Set[IP]) -> Set[Service]: """ - Returns set of Service tuples from given hostIP + Returns a set of services found on a given host. + + Args: + host_ip (IP): The IP of the host. + controlled_hosts (Set[IP]): Set of controlled host IPs. + + Returns: + Set[Service]: Set of services found on the host. """ found_services = set() if host_ip in self._ip_to_hostname: #is it existing IP? @@ -780,9 +850,15 @@ def _get_services_from_host(self, host_ip:str, controlled_hosts:set)-> Set[Servi self.logger.debug("\tServices not found because target IP does not exists.") return found_services - def _get_networks_from_host(self, host_ip)->Set[Network]: + def _get_networks_from_host(self, host_ip: IP) -> Set[Network]: """ - Returns set of IPs the host has access to + Returns the set of networks the host is part of. + + Args: + host_ip (IP): The IP of the host. + + Returns: + Set[Network]: Set of networks. """ networks = set() for net, values in self._networks.items(): @@ -790,10 +866,16 @@ def _get_networks_from_host(self, host_ip)->Set[Network]: networks.add(net) return networks - def _get_data_in_host(self, host_ip:str, controlled_hosts:set)->Set[Data]: + def _get_data_in_host(self, host_ip: IP, controlled_hosts: Set[IP]) -> Set[Data]: """ - Returns set of Data tuples from given host IP - Check if the host is in the list of controlled hosts + Returns a set of data objects found on a given host if it is controlled. + + Args: + host_ip (IP): The IP of the host. + controlled_hosts (Set[IP]): Set of controlled host IPs. + + Returns: + Set[Data]: Set of data objects found. """ data = set() if host_ip in controlled_hosts: #only return data if the agent controls the host @@ -804,7 +886,17 @@ def _get_data_in_host(self, host_ip:str, controlled_hosts:set)->Set[Data]: self.logger.debug("\t\t\tCan't get data in host. The host is not controlled.") return data - def _get_known_blocks_in_host(self, host_ip:str, controlled_hosts:set)->set: + def _get_known_blocks_in_host(self, host_ip: IP, controlled_hosts: Set[IP]) -> Set[IP]: + """ + Returns a set of known firewall blocks from a host if it is controlled. + + Args: + host_ip (IP): The IP of the host. + controlled_hosts (Set[IP]): Set of controlled host IPs. + + Returns: + Set[IP]: Set of blocked IPs. + """ known_blocks = set() if host_ip in controlled_hosts: #only return data if the agent controls the host if host_ip in self._ip_to_hostname: @@ -814,9 +906,16 @@ def _get_known_blocks_in_host(self, host_ip:str, controlled_hosts:set)->set: self.logger.debug("\t\t\tCan't get data in host. The host is not controlled.") return known_blocks - def _get_data_content(self, host_ip:str, data_id:str)->str|None: + def _get_data_content(self, host_ip: IP, data_id: str) -> Optional[str]: """ - Returns content of data identified by a host_ip and data_ip. + Returns the content of data identified by a host IP and data ID. + + Args: + host_ip (IP): The IP of the host. + data_id (str): The identifier of the data. + + Returns: + Optional[str]: The content string if found, else None. """ content = None if host_ip in self._ip_to_hostname: #is it existing IP? @@ -829,16 +928,17 @@ def _get_data_content(self, host_ip:str, data_id:str)->str|None: self.logger.debug("Data content not found because target IP does not exists.") return content - def _execute_action(self, current_state:GameState, action:Action, agent_id:tuple)-> GameState: + def _execute_action(self, current_state: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState: """ - Execute the action and update the values in the state - Before this function it was checked if the action was successful - So in here all actions were already successful. + Executes the given action and updates the game state. - - actions_type: Define if the action is simulated in netsecenv or in the real world - - agent_id: is the name or type of agent that requested the action + Args: + current_state (GameState): The current game state. + action (Action): The action to execute. + agent_id (Tuple[str, int]): identifier of the agent requesting the action. - Returns: A new GameState + Returns: + GameState: The new game state after execution. """ next_state = None match action.type: @@ -858,7 +958,18 @@ def _execute_action(self, current_state:GameState, action:Action, agent_id:tuple raise ValueError(f"Unknown Action type or other error: '{action.type}'") return next_state - def _record_false_positive(self, src_ip:IP, dst_ip:IP, agent_id:tuple)-> None: + def _record_false_positive(self, src_ip: IP, dst_ip: IP, agent_id: Tuple[str, int]) -> None: + """ + Records a false positive if a connection block affects a benign agent. + + Args: + src_ip (IP): Source host IP. + dst_ip (IP): Destination host IP. + agent_id (Tuple[str, int]): Identifier of the author agent. + + Returns: + None + """ # only record false positive if the agent is benign if self.is_agent_benign(agent_id): # find agent(s) that created the rule @@ -874,17 +985,34 @@ def _record_false_positive(self, src_ip:IP, dst_ip:IP, agent_id:tuple)-> None: else: self.logger.debug(f"False positive for blocking {src_host} -> {dst_host} caused by the system configuration.") - def _firewall_check(self, src_ip:IP, dst_ip:IP)->bool: - """Checks if firewall allows connection from 'src_ip to ''dst_ip'""" + def _firewall_check(self, src_ip: IP, dst_ip: IP) -> bool: + """ + Checks if the firewall allows a connection from source to destination. + + Args: + src_ip (IP): Source host IP. + dst_ip (IP): Destination host IP. + + Returns: + bool: True if connection is allowed, False otherwise. + """ try: connection_allowed = dst_ip in self._firewall[src_ip] except KeyError: connection_allowed = False return connection_allowed - def _execute_scan_network_action(self, current_state:GameState, action:Action, agent_id:tuple)->GameState: + def _execute_scan_network_action(self, current_state: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState: """ - Executes the ScanNetwork action in the environment + Executes the ScanNetwork action in the environment. + + Args: + current_state (GameState): The current game state. + action (Action): The ScanNetwork action to execute. + agent_id (Tuple[str, int]): Identifier of the requesting agent. + + Returns: + GameState: The updated game state. """ next_nets, next_known_h, next_controlled_h, next_services, next_data, next_blocked = state_parts_deep_copy(current_state) self.logger.debug(f"\t\tScanning {action.parameters['target_network']}") @@ -905,9 +1033,17 @@ def _execute_scan_network_action(self, current_state:GameState, action:Action, a self.logger.debug(f"\t\t\t Invalid source_host:'{action.parameters['source_host']}'") return GameState(next_controlled_h, next_known_h, next_services, next_data, next_nets, next_blocked) - def _execute_find_services_action(self, current_state:GameState, action:Action, agent_id:tuple)->GameState: + def _execute_find_services_action(self, current_state: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState: """ - Executes the FindServices action in the environment + Executes the FindServices action in the environment. + + Args: + current_state (GameState): The current game state. + action (Action): The FindServices action to execute. + agent_id (Tuple[str, int]): Identifier of the requesting agent. + + Returns: + GameState: The updated game state. """ next_nets, next_known_h, next_controlled_h, next_services, next_data, next_blocked = state_parts_deep_copy(current_state) self.logger.debug(f"\t\tSearching for services in {action.parameters['target_host']}") @@ -932,9 +1068,17 @@ def _execute_find_services_action(self, current_state:GameState, action:Action, self.logger.debug(f"\t\t\t Invalid source_host:'{action.parameters['source_host']}'") return GameState(next_controlled_h, next_known_h, next_services, next_data, next_nets, next_blocked) - def _execute_find_data_action(self, current:GameState, action:Action, agent_id:tuple)->GameState: + def _execute_find_data_action(self, current: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState: """ - Executes the FindData action in the environment + Executes the FindData action in the environment. + + Args: + current (GameState): The current game state. + action (Action): The FindData action to execute. + agent_id (Tuple[str, int]): Identifier of the requesting agent. + + Returns: + GameState: The updated game state. """ next_nets, next_known_h, next_controlled_h, next_services, next_data, next_blocked = state_parts_deep_copy(current) self.logger.debug(f"\t\tSearching for data in {action.parameters['target_host']}") @@ -963,9 +1107,17 @@ def _execute_find_data_action(self, current:GameState, action:Action, agent_id:t self.logger.debug(f"\t\t\t Invalid source_host:'{action.parameters['source_host']}'") return GameState(next_controlled_h, next_known_h, next_services, next_data, next_nets, next_blocked) - def _execute_exfiltrate_data_action(self, current_state:GameState, action:Action, agent_id:tuple)->GameState: + def _execute_exfiltrate_data_action(self, current_state: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState: """ - Executes the ExfiltrateData action in the environment + Executes the ExfiltrateData action in the environment. + + Args: + current_state (GameState): The current game state. + action (Action): The ExfiltrateData action to execute. + agent_id (Tuple[str, int]): Identifier of the requesting agent. + + Returns: + GameState: The updated game state. """ next_nets, next_known_h, next_controlled_h, next_services, next_data, next_blocked = state_parts_deep_copy(current_state) self.logger.info(f"\t\tAttempting to Exfiltrate {action.parameters['data']} from {action.parameters['source_host']} to {action.parameters['target_host']}") @@ -1010,9 +1162,17 @@ def _execute_exfiltrate_data_action(self, current_state:GameState, action:Action self.logger.debug("\t\t\tCan not exfiltrate. Target host is not controlled.") return GameState(next_controlled_h, next_known_h, next_services, next_data, next_nets, next_blocked) - def _execute_exploit_service_action(self, current_state:GameState, action:Action, agent_id:tuple)->GameState: + def _execute_exploit_service_action(self, current_state: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState: """ - Executes the ExploitService action in the environment + Executes the ExploitService action in the environment. + + Args: + current_state (GameState): The current game state. + action (Action): The ExploitService action to execute. + agent_id (Tuple[str, int]): Identifier of the requesting agent. + + Returns: + GameState: The updated game state. """ next_nets, next_known_h, next_controlled_h, next_services, next_data, next_blocked = state_parts_deep_copy(current_state) # We don't check if the target is a known_host because it can be a blind attempt to attack @@ -1050,19 +1210,17 @@ def _execute_exploit_service_action(self, current_state:GameState, action:Action self.logger.debug(f"\t\t\t Invalid source_host:'{action.parameters['source_host']}'") return GameState(next_controlled_h, next_known_h, next_services, next_data, next_nets, next_blocked) - def _execute_block_ip_action(self, current_state:GameState, action:Action, agent_id:tuple)->GameState: + def _execute_block_ip_action(self, current_state: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState: """ - Executes the BlockIP action - - The action has BlockIP("target_host": IP object, "source_host": IP object, "blocked_host": IP object) - - The target host is the host where the blocking will be applied (the FW) - - The source host is the host that the agent uses to connect to the target host. A host that must be controlled by the agent - - The blocked host is the host that will be included in the FW list to be blocked. + Executes the BlockIP action in the environment. + + Args: + current_state (GameState): The current game state. + action (Action): The BlockIP action to execute. + agent_id (Tuple[str, int]): Identifier of the requesting agent. - Logic: - - Check if the agent controls the source host - - Check if the agent controls the target host - - Add the rule to the FW list - - Update the state + Returns: + GameState: The updated game state. """ next_nets, next_known_h, next_controlled_h, next_services, next_data, next_blocked = state_parts_deep_copy(current_state) # Is the src in the controlled hosts? @@ -1123,7 +1281,13 @@ def _execute_block_ip_action(self, current_state:GameState, action:Action, agent self.logger.debug(f"\t\t\t Invalid source_host:'{action.parameters['source_host']}'") return GameState(next_controlled_h, next_known_h, next_services, next_data, next_nets, next_blocked) - def _get_all_local_ips(self)->Set[IP]: + def _get_all_local_ips(self) -> Set[IP]: + """ + Returns all private IP addresses present in the environment. + + Returns: + Set[IP]: A set of private IPs. + """ local_ips = set() for net, ips in self._networks.items(): if netaddr.IPNetwork(str(net)).ip.is_private(): @@ -1132,13 +1296,17 @@ def _get_all_local_ips(self)->Set[IP]: self.logger.info(f"\t\t\tLocal ips: {local_ips}") return local_ips - def update_log_file(self, known_data:set, action, target_host:IP)->None: + def update_log_file(self, known_data: Dict[IP, Set[Data]], action: Action, target_host: IP) -> None: """ - Updates the log file in the target host. + Updates the log file on the target host with the provided action details. + Args: - known_data (set): Set of known data. - action (Action): Action to be recorded. - target_host (IP): Target host. + known_data (Dict[IP, Set[Data]]): Current known data mappings. + action (Action): The action to record in the log. + target_host (IP): The IP of the host where the log is updated. + + Returns: + None """ hostaname = self._ip_to_hostname[target_host] self.logger.debug(f"Updating log file in host {hostaname}") @@ -1157,34 +1325,85 @@ def update_log_file(self, known_data:set, action, target_host:IP)->None: new_content = json.dumps(new_content) self._data[hostaname].add(Data(owner="system", id="logfile", type="log", size=len(new_content) , content= new_content)) - async def register_agent(self, agent_id, agent_role:AgentRole, agent_initial_view:dict, agent_win_condition_view:dict)->tuple[GameState, GameState]: + async def register_agent(self, agent_id: Tuple[str, int], agent_role: AgentRole, agent_initial_view: Dict[str, Any], agent_win_condition_view: Dict[str, Any]) -> Tuple[GameState, GameState]: + """ + Registers an agent and creates its initial and goal states. + + Args: + agent_id (Tuple[str, int]): Identifier of the agent. + agent_role (AgentRole): Role assigned to the agent. + agent_initial_view (Dict[str, Any]): View for initial state generation. + agent_win_condition_view (Dict[str, Any]): View for goal state generation. + + Returns: + Tuple[GameState, GameState]: (initial_state, goal_state). + """ start_game_state = self._create_state_from_view(agent_initial_view) goal_state = self._create_goal_state_from_view(agent_win_condition_view) return start_game_state, goal_state - async def remove_agent(self, agent_id, agent_state)->bool: + async def remove_agent(self, agent_id: Tuple[str, int], agent_state: GameState) -> bool: + """ + Removes an agent from the game. + + Args: + agent_id (Tuple[str, int]): Identifier of the agent. + agent_state (GameState): Final state of the agent. + + Returns: + bool: Always True. + """ # No action is required return True - async def step(self, agent_id, agent_state, action)->GameState: + async def step(self, agent_id: Tuple[str, int], agent_state: GameState, action: Action) -> GameState: + """ + Processes a single game step for an agent. + + Args: + agent_id (Tuple[str, int]): Identifier of the agent. + agent_state (GameState): Current state of the agent. + action (Action): Action to perform. + + Returns: + GameState: The resulting game state. + """ return self._execute_action(agent_state, action, agent_id) - async def reset_agent(self, agent_id, agent_role:AgentRole, agent_initial_view:dict, agent_win_condition_view:dict)->tuple[GameState, GameState]: - game_state = self._create_state_from_view(agent_initial_view) - goal_state = self._create_goal_state_from_view(agent_win_condition_view) - return game_state, goal_state + async def reset_agent(self, agent_id: Tuple[str, int], agent_role: AgentRole, agent_initial_view: Dict[str, Any], agent_win_condition_view: Dict[str, Any]) -> Tuple[GameState, GameState]: + """ + Resets an agent's state for a new episode. + + Args: + agent_id (Tuple[str, int]): Identifier of the agent. + agent_role (AgentRole): Role assigned to the agent. + agent_initial_view (Dict[str, Any]): View for initial state generation. + agent_win_condition_view (Dict[str, Any]): View for goal state generation. + + Returns: + Tuple[GameState, GameState]: (reset_state, goal_state). + """ + game_state = self._create_state_from_view(agent_initial_view) + goal_state = self._create_goal_state_from_view(agent_win_condition_view) + return game_state, goal_state - async def reset(self, seed:Optional[int]=None, topology_change:Optional[bool]=None)->bool: + async def reset(self, seed: Optional[int] = None, topology_change: Optional[bool] = None) -> bool: """ - Function to reset the state of the game - and prepare for a new episode + Resets the entire world to its initial state for a new episode. + + Args: + seed (Optional[int]): New random seed if provided. + topology_change (Optional[bool]): Whether a dynamic topology change should occur. + + Returns: + bool: Always True. """ # write all steps in the episode replay buffer in the file self.logger.info('--- Reseting NSG Environment to its initial state ---') if seed is not None: self._set_random_seed(seed) - if self.config_manager.get_use_dynamic_ips(): #topology change is allowed + if self.config_manager.get_use_dynamic_addresses(): #topology change is allowed if topology_change: # agents agree on topology change self._dynamic_ip_change(seed=seed) # reset self._data to orignal state diff --git a/netsecgame/game/worlds/WhiteBoxNetSecGame.py b/netsecgame/game/worlds/WhiteBoxNetSecGame.py index 0b48f2ab..a1540f40 100644 --- a/netsecgame/game/worlds/WhiteBoxNetSecGame.py +++ b/netsecgame/game/worlds/WhiteBoxNetSecGame.py @@ -6,8 +6,9 @@ import json import warnings from pathlib import Path +from typing import Any, Dict, Optional, List from netsecgame.utils.utils import get_logging_level -from netsecgame.game_components import Action, ActionType +from netsecgame.game_components import GameState, Action, ActionType from netsecgame.game.worlds.NetSecGame import NetSecGame @@ -16,12 +17,28 @@ class WhiteBoxNetSecGame(NetSecGame): WhiteBoxNetSecGame is an extension for the NetSecGame environment that provides list of all possible actions to each agent that registers in the game. """ - def __init__(self, game_host, game_port, task_config, seed=None, include_block_action=True): + def __init__(self, game_host: str, game_port: int, task_config: str, seed: Optional[int] = None, include_block_action: bool = True): + """ + Initializes the WhiteBoxNetSecGame. + + Args: + game_host (str): The host for the game server. + game_port (int): The port for the game server. + task_config (str): Path to the task configuration file. + seed (Optional[int]): Random seed. + include_block_action (bool): Whether to include BlockIP actions. + """ super().__init__(game_host, game_port, task_config, seed) self._all_actions = None self._include_block_action = include_block_action - def _initialize(self): + def _initialize(self) -> None: + """ + Initializes the game state and generates all possible actions. + + Returns: + None + """ # First do the parent initialization super()._initialize() # All components are initialized, now we can set the action mapping @@ -32,11 +49,12 @@ def _initialize(self): } if self._all_actions is not None else {} - def _generate_all_actions(self)-> list[Action]: + def _generate_all_actions(self)-> List[Action]: """ - Generate a list of all possible actions for the game. + Generates a list of all possible actions for the game. + Returns: - list[Action]: List of all possible actions. + List[Action]: List of all possible actions. """ actions = [] all_ips = [self._ip_mapping[ip] for ip in self._ip_to_hostname.keys()] @@ -124,10 +142,29 @@ def _generate_all_actions(self)-> list[Action]: return actions - def _create_state_from_view(self, view, add_neighboring_nets = True): + def _create_state_from_view(self, view: Dict[str, Any], add_neighboring_nets: bool = True) -> GameState: + """ + Creates a GameState from a view. + + Args: + view (Dict[str, Any]): The view dictionary. + add_neighboring_nets (bool): Whether to add neighboring networks. + + Returns: + GameState: The generated game state. + """ return super()._create_state_from_view(view, add_neighboring_nets=add_neighboring_nets) - def _dynamic_ip_change(self, max_attempts:int=10)->None: + def _dynamic_ip_change(self, max_attempts: int = 10, seed=None) -> None: + """ + Dynamic IP change is not supported for WhiteBoxNetSecGame. + + Args: + max_attempts (int): Maximum number of attempts. + + Returns: + None + """ warnings.warn("Dynamic IP change is not supported for WhiteBoxNetSecGame.", UserWarning) self.logger.warning("Dynamic IP change is not supported for WhiteBoxNetSecGame.") return None diff --git a/netsecgame/game_components.py b/netsecgame/game_components.py index b0e0a661..bffc9bae 100755 --- a/netsecgame/game_components.py +++ b/netsecgame/game_components.py @@ -115,12 +115,12 @@ def from_dict(cls, data: Dict[str, Any]) -> IP: """ return cls(**data) - def __hash__(self)->int: + def __hash__(self) -> int: """ Compute the hash of the IP. Returns: - hash: The hash value. + int: The hash value. """ return hash(self.ip) @@ -877,10 +877,10 @@ def __eq__(self, other: object) -> bool: Compare AgentRole with another AgentRole or string. Args: - other (object): The object to compare. + other (object): The object to compare. Returns: - bool: True if equal, False otherwise. + bool: True if equal, False otherwise. """ if isinstance(other, AgentRole): return self.value == other.value diff --git a/netsecgame/utils/actions_parser.py b/netsecgame/utils/actions_parser.py index 1868c69b..979781f2 100644 --- a/netsecgame/utils/actions_parser.py +++ b/netsecgame/utils/actions_parser.py @@ -6,8 +6,18 @@ import re import csv import argparse +from typing import List, Dict, Any -def parse_actions_taken(filename): +def parse_actions_taken(filename: str) -> List[Dict[str, Any]]: + """ + Parses actions taken from a log file. + + Args: + filename (str): The path to the log file. + + Returns: + List[Dict[str, Any]]: A list of dictionaries representing actions taken. + """ with open(filename, 'r') as file: lines = file.readlines() @@ -58,7 +68,17 @@ def parse_actions_taken(filename): return all_actions -def write_actions_to_csv(actions, output_filename): +def write_actions_to_csv(actions: List[Dict[str, Any]], output_filename: str) -> None: + """ + Writes parsed actions to a CSV file. + + Args: + actions (List[Dict[str, Any]]): The list of parsed actions to write. + output_filename (str): The path to the output CSV file. + + Returns: + None + """ with open(output_filename, 'w', newline='') as csvfile: fieldnames = ['episode', 'action_number', 'action_type', 'target'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) diff --git a/netsecgame/utils/gamaplay_graphs.py b/netsecgame/utils/gameplay_graphs.py similarity index 100% rename from netsecgame/utils/gamaplay_graphs.py rename to netsecgame/utils/gameplay_graphs.py diff --git a/netsecgame/utils/trajectory_recorder.py b/netsecgame/utils/trajectory_recorder.py index bf0b4231..a2616934 100644 --- a/netsecgame/utils/trajectory_recorder.py +++ b/netsecgame/utils/trajectory_recorder.py @@ -11,7 +11,14 @@ class TrajectoryRecorder: """ Manages the recording and storage of agent trajectories. """ - def __init__(self, agent_name: str, agent_role: str): + def __init__(self, agent_name: str, agent_role: str) -> None: + """ + Initializes the TrajectoryRecorder. + + Args: + agent_name (str): The name of the agent. + agent_role (str): The role of the agent. + """ self.agent_name = agent_name self.agent_role = agent_role self.logger = logging.getLogger(f"TrajectoryRecorder-{agent_name}") @@ -19,9 +26,7 @@ def __init__(self, agent_name: str, agent_role: str): self.reset() def reset(self) -> None: - """ - Resets the trajectory data for a new episode. - """ + """Resets the trajectory data for a new episode.""" self.logger.debug(f"Resetting trajectory for {self.agent_name}") self._data = { "trajectory": { @@ -56,14 +61,19 @@ def add_step(self, action: Action, reward: float, next_state: GameState, end_rea def add_initial_state(self, state: GameState) -> None: """ - Adds the initial state to the trajectory (optional, depending on how you want to track s_0). - The original code initialized trajectory with states=[agent_state.as_dict]. + Adds the initial state to the trajectory history. + + Args: + state (GameState): The initial game state. """ self._data["trajectory"]["states"].append(state.as_dict) def get_trajectory(self) -> Dict[str, Any]: """ Returns the current trajectory data. + + Returns: + Dict[str, Any]: The trajectory dictionary. """ return self._data @@ -72,7 +82,7 @@ def save_to_file(self, location: str = "./logs/trajectories") -> None: Saves the recorded trajectory to a JSONL file. Args: - location (str): Directory to save the file. + location (str): Directory to save the file. Defaults to "./logs/trajectories". """ filename = f"{datetime.now():%Y-%m-%d}_{self.agent_name}_{self.agent_role}" try: diff --git a/netsecgame/utils/utils.py b/netsecgame/utils/utils.py index f4cd3f48..c46ce450 100644 --- a/netsecgame/utils/utils.py +++ b/netsecgame/utils/utils.py @@ -24,13 +24,15 @@ Service, ) -def get_file_hash(filepath, hash_func='sha256', chunk_size=4096): +def get_file_hash(filepath: str, hash_func: str = 'sha256', chunk_size: int = 4096) -> str: """ Computes hash of a given file. + Args: filepath (str): The path to the file to hash. hash_func (str): The hash function to use (default is 'sha256'). chunk_size (int): The size of each chunk to read from the file (default is 4096 bytes). + Returns: str: The hexadecimal hash of the file. """ @@ -42,12 +44,14 @@ def get_file_hash(filepath, hash_func='sha256', chunk_size=4096): chunk = file.read(chunk_size) return hash_algorithm.hexdigest() -def get_str_hash(string, hash_func='sha256'): +def get_str_hash(string: str, hash_func: str = 'sha256') -> str: """ Computes hash of a given string. + Args: string (str): The input string to hash. hash_func (str): The hash function to use (default is 'sha256'). + Returns: str: The hexadecimal hash of the input string. """ @@ -55,13 +59,15 @@ def get_str_hash(string, hash_func='sha256'): hash_algorithm.update(string.encode('utf-8')) return hash_algorithm.hexdigest() -def read_replay_buffer_from_csv(csvfile:str)->List[Tuple[GameState, Action, float, GameState, bool]]: +def read_replay_buffer_from_csv(csvfile: str) -> List[Tuple[GameState, Action, float, GameState, bool]]: """ - Function to read steps from a CSV file - and restore the objects in the replay buffer. + Reads steps from a CSV file and restores objects for the replay buffer. + + Args: + csvfile (str): Path to the CSV file. - expected colums in the csv: - state_t0, action_t0, reward_t1, state_t1, done_t1 + Returns: + List[Tuple[GameState, Action, float, GameState, bool]]: The restored replay buffer. """ raise DeprecationWarning("This function is deprecated and will be removed in future versions.") buffer = [] @@ -75,11 +81,17 @@ def read_replay_buffer_from_csv(csvfile:str)->List[Tuple[GameState, Action, floa pass return buffer -def store_replay_buffer_in_csv(replay_buffer:list, filename:str, delimiter:str=";")->None: +def store_replay_buffer_in_csv(replay_buffer: List[Tuple[GameState, Action, float, GameState, bool]], filename: str, delimiter: str = ";") -> None: """ - Function to store steps from a replay buffer in CSV file. - Expected format of replay buffer items: - (state_t0:GameState, action_t0:Action, reward_t1:float, state_t1:GameState, done_t1:bool) + Stores steps from a replay buffer into a CSV file. + + Args: + replay_buffer (List[Tuple[GameState, Action, float, GameState, bool]]): The buffer items to store. + filename (str): The name of the output file. + delimiter (str): The delimiter to use in the CSV (default is ';'). + + Returns: + None """ raise DeprecationWarning("This function is deprecated and will be removed in future versions.") with open(filename, 'a') as f_object: @@ -87,7 +99,16 @@ def store_replay_buffer_in_csv(replay_buffer:list, filename:str, delimiter:str=" for (s_t, a_t, r, s_t1, done) in replay_buffer: writer_object.writerow([s_t.as_json(), a_t.as_json(), r, s_t1.as_json(), done]) -def state_as_ordered_string(state:GameState)->str: +def state_as_ordered_string(state: GameState) -> str: + """ + Converts a GameState into a deterministic ordered string representation. + + Args: + state (GameState): The game state to convert. + + Returns: + str: The ordered string representation of the state. + """ ret = "" ret += f"nets:[{','.join([str(x) for x in sorted(state.known_networks)])}]," ret += f"hosts:[{','.join([str(x) for x in sorted(state.known_hosts)])}]," @@ -106,8 +127,13 @@ def state_as_ordered_string(state:GameState)->str: def observation_as_dict(observation: Observation) -> Dict[str, Any]: """ - Generates dict representation of a given Observation object. - Acts as the single source of truth for the structure. + Generates a dictionary representation of a given Observation object. + + Args: + observation (Observation): The observation object to convert. + + Returns: + Dict[str, Any]: The dictionary representation of the observation. """ return { 'state': observation.state.as_dict, @@ -119,8 +145,13 @@ def observation_as_dict(observation: Observation) -> Dict[str, Any]: def observation_to_str(observation: Observation) -> str: """ - Generates JSON string representation of a given Observation object. - Relies on observation_as_dict to define the structure. + Generates a JSON string representation of a given Observation object. + + Args: + observation (Observation): The observation object to convert. + + Returns: + str: The JSON string representation. """ try: # Clean JSON structure: {"state": {...}, "reward": 0, ...} @@ -135,8 +166,8 @@ def observation_from_dict(data: Dict[str, Any]) -> Observation: Reconstructs an Observation object from a dictionary representation. Args: - data (dict): The dictionary containing observation data. - + data (Dict[str, Any]): The dictionary containing observation data. + Returns: Observation: The reconstructed Observation namedtuple. """ @@ -182,7 +213,16 @@ def observation_from_str(json_str: str) -> Observation: logging.getLogger(__name__).error(f"Error in creating Observation from string: {e}") raise e -def parse_log_content(log_content:str)->Optional[list]: +def parse_log_content(log_content: str) -> Optional[List[Dict[str, Any]]]: + """ + Parses a JSON string of log content into a list of log entries. + + Args: + log_content (str): The raw JSON log content. + + Returns: + Optional[List[Dict[str, Any]]]: A list of log entries if successful, None otherwise. + """ try: logs = [] data = json.loads(log_content) @@ -198,9 +238,15 @@ def parse_log_content(log_content:str)->Optional[list]: logging.getLogger(__name__).error(f"Error decoding JSON: {e}") return None -def get_logging_level(debug_level): +def get_logging_level(debug_level: str) -> int: """ - Configure logging level based on the provided debug_level string. + Configures the logging level based on the provided debug_level string. + + Args: + debug_level (str): The level name (e.g., 'DEBUG', 'INFO'). + + Returns: + int: The corresponding logging level constant. """ log_levels = { "DEBUG": logging.DEBUG, @@ -213,13 +259,17 @@ def get_logging_level(debug_level): level = log_levels.get(debug_level.upper(), logging.ERROR) return level -def store_trajectories_to_jsonl(trajectories:list, dir:str, filename:str)->None: +def store_trajectories_to_jsonl(trajectories: Any, dir: str, filename: str) -> None: """ - Store trajectories to a JSONL file. + Stores trajectories to a JSONL file. + Args: - trajectories (list): List of trajectory data to store. + trajectories (Any): The trajectory data to store (usually a dict or list). dir (str): Directory where the file will be stored. filename (str): Name of the file (without extension). + + Returns: + None """ # make sure the directory exists if not os.path.exists(dir): @@ -230,13 +280,15 @@ def store_trajectories_to_jsonl(trajectories:list, dir:str, filename:str)->None: with jsonlines.open(filename, "a") as writer: writer.write(trajectories) -def read_trajectories_from_jsonl(filepath:str)->list: +def read_trajectories_from_jsonl(filepath: str) -> List[Any]: """ - Read trajectories from a JSONL file. + Reads trajectories from a JSONL file. + Args: filepath (str): Path to the JSONL file. + Returns: - list: List of trajectories read from the file. + List[Any]: A list of trajectories read from the file. """ raise NotImplementedError("This function is not yet implemented.") @@ -245,6 +297,7 @@ def generate_valid_actions(state: GameState, include_blocks=False)->Set[Action]: Args: state (GameState): The current game state. include_blocks (bool): Whether to include BlockIP actions. Defaults to False. + Returns: set: A set of valid Action objects. """ diff --git a/tests/game/test_configuration_manager.py b/tests/game/test_configuration_manager.py index aef85d09..60080cb1 100644 --- a/tests/game/test_configuration_manager.py +++ b/tests/game/test_configuration_manager.py @@ -130,7 +130,7 @@ def test_accessors_without_load(manager_local): manager_local.get_max_steps("Attacker") with pytest.raises(RuntimeError, match="Configuration not loaded."): - manager_local.get_use_dynamic_ips() + manager_local.get_use_dynamic_addresses() @pytest.fixture def loaded_manager():