Skip to main content

· 3 min read
Florian Schröder

Subprocess Problem

Using the qibullet-sim or naoqi-embodiment modules is different. Both require a different Python version than AAAMBOS (3.9 and 2.7 vs 3.10). Our solution is that we put the library calls into a subprocess and the modules communicate with them via the stdin and stdout-put pipes. This results in an additional overhead in the development process. But we tried to come up with a at least handy solution.

The Subprocess

The subprocess in the qibullet_simulation module executes the run_sim_39.sh shell script. It needs 4 arguments to call it:
  1. The path to the conda shell script
  2. Conda environment name
  3. Robot name, e.g., "pepper", "nao"
  4. Path to the exchange directory. (e.g., for images)
E.g.,

sh run_sim_39.sh "/home/USER/miniconda3/etc/profile.d/conda.sh" qibullet pepper .

The python script that is started is the simulator_py39.py The python script is for python 3.9. So the match statement is not available.

The Module

The AAAMBOS module that starts the subprocess is inside the qibullet_sim.py)

From the Module to the Subprocess

You can send a message to the subprocess from the module via the extension:

await self.subprocess.msg_subprocess({"topic": "moveTo", "msg": {"x": 0.5, "y": 0.5, "theta": 0.0}})
Receiving that message is done in the simulator_py39.py in the msg_callback method

From the Subprocess to the Module

You can send a message from the subprocess to the module via the send_msg:

self.send_msg("position", {"time": datetime.now().isoformat(), "x": x, "y": y, "theta": theta})

Receiving that message is done in the qibullet_sim.py in the sim_callback method

· 7 min read
Florian Schröder

Scope of the Tutorial

We will learn how to create an aaambos pkg and a module inside it. This module should be able to communicate with other modules. More specifically, we will see an example of the usage of the aaambos PySimpleGUIWindowExtension. How to add a button and a textfield in such a GUI. We will also create a communication promise and corresponding communication features that we will use in the new module and can be used in another module.

Create a package or a new module file.

aaambos comes with some useful tools for creating packages and files. Simply create a new package with the command

aaambos create pkg

and answer the questions.

note

For the most part the default answers will fit. (Except the personal information part but at least this should be created in parts also automatically.)

We do not need to include extensions or guis.

The 18th question about the project structure depends on the later repo location in gitlab.

Rename the created module file to a suitable module name and find and replace the according parts in the python script.

If you want to create just a module file (in an already cloned or created aaambos pkg) based on a template, do so:

aaambos create module -n "GuiComTutorialModule"

The file will be created in the current directory. Either change it using cd or use the cli argument -l xyz/modules. In aaambos pkgs new modules are located in the modules directory of the package.

The created pkg should be installed in the develop mode of conde conda develop . (requires the conda-build package).

GUI Window Extension

aaambos currently supports GUI creation with the PySimpleGUI library. The PySimpleGUIWindowExtension creates a new window for a module. An example of how to use it can be found in the ModuleStatusManager in the std library of aaambos (Link).

After creating it with the module file creation process, your new module file gui_com_tutorial_module.py should look like this:

gui_com_tutorial_module.py
from __future__ import annotations

from typing import Type

from aaambos.core.supervision.run_time_manager import ControlMsg
from attrs import define

from aaambos.core.configuration.module_config import ModuleConfig
from aaambos.core.module.base import Module, ModuleInfo
from aaambos.core.module.feature import FeatureNecessity, Feature, SimpleFeatureNecessity


class GuiComTutorialModule(Module, ModuleInfo):
config: GuiComTutorialModuleConfig

@classmethod
def provides_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]:
pass

@classmethod
def requires_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]:
pass

@classmethod
def get_module_config_class(cls) -> Type[ModuleConfig]:
return GuiComTutorialModuleConfig

async def initialize(self):
pass

async def step(self):
pass

def terminate(self, control_msg: ControlMsg = None) -> int:
exit_code = super().terminate()
return exit_code


@define(kw_only=True)
class GuiComTutorialModuleConfig(ModuleConfig):
module_path: str = "{{ cookiecutter.__package_name }}.modules.gui_com_tutorial_module"
module_info: Type[ModuleInfo] = GuiComTutorialModule
restart_after_failure: bool = True
expected_start_up_time: float | int = 0


def provide_module():
return GuiComTutorialModule

We still need to replace the {{ cookiecutter.__package_name }} part with the name of the pkg we included the module file, e.g., qibullet_simulation).

In the initialize function we can do things during initialisation (also calling asychnronous functions that require the await keyword). The step function is called iteratively. We can implement it as a normal function that is called several times or as a generator (which yields in each step). In the provide_features method we can specify features to add to the architecture. On the other hand, requires_feature method says what we expect/need from the architecture, including extensions.

We can tell aaambos to include the GUI extension by replacing the requires_features method with

@classmethod
def requires_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]:
return {
PySimpleGUIWindowExtensionFeature.name: (PySimpleGUIWindowExtensionFeature, SimpleFeatureNecessity.Required),
}

We need also to import the extension and the extension feature:

from aaambos.std.guis.pysimplegui.pysimplegui_window_ext import PySimpleGUIWindowExtensionFeature

if TYPE_CHECKING:
from aaambos.std.guis.pysimplegui.window_extension import PySimpleGUIWindowExtension

Also we should import

import PySimpleGUI as sg
from typing import Type, TYPE_CHECKING

It is now automatically added to the module in the ext attribute. It is also passed as a key word argument to the init method. Use it by implementing the init method:

def __init__(self, config: ModuleConfig, com, log, ext, gui_window: PySimpleGUIWindowExtension, *args,
**kwargs):
super().__init__(config, com, log, ext, *args, **kwargs)
self.gui_window = gui_window

In the initialize-method, we can now set up the basics for our window:

self.gui_window.set_event_handler(self.handle_window_event)
self.gui_window.setup_window(window_title="My Window GUI", layout=self.create_layout())

We have already mentioned two methods handle_window_event and create_layout. We need to implement both of them.

def create_layout(self) -> list:
return [[sg.Text("text"), sg.InputText("my content", key="payload_input_text"), sg.Submit("Send", key="send_msg")]]

and

async def handle_window_event(self, event, values):
if event == "send_msg":
await self.com.send(None, UserInputText(text=values["payload_input_text"]))

Here we send a message with the text entered in the text box using our communication service when the button is pressed. We need to replace the None with a valid Topic later. The UserInputText class is also undefined.

Setting up a communication promise and features

For how the general concept of communication works see Message Broker (Wikipedia).

To tell aaambos what topic the module wants to listen to and send messages about, and in what format (which dataclass, attrs class or msgspec struct - instead of just dicts) we need CommunicationPromises. Using the create_promise_and_unit utility function, we can easily create a promise (outside the module class, e.g., above it):

class UserInputText(Struct):
text: str
time: datetime = m_field(default_factory=lambda: datetime.now())


USER_INPUT_TEXT = "UserInputText"
UserInputTextPromise, _ = create_promise_and_unit(
name=USER_INPUT_TEXT,
payload_wrapper=UserInputText,
unit_description="Contains text entered by the user with the time of the submit.",
frequency=0,
version_str="0.1.0",
required_service_categories=[{MSGSPEC_STRUCT_TYPE}],
)

We have defined a msgspec Struct that will hold the information. The promise will later hold the assigned topic from aaambos, which will be unique for the running agent (relevant for multi-agent applications). The created unit is not that relevant for us here.

Imports:

from datetime import datetime
from msgspec import Struct, field as m_field
from aaambos.std.communications.utils import create_promise_and_unit, create_in_out_com_feature
from aaambos.std.communications.categories import MSGSPEC_STRUCT_TYPE

For better method suggestion by your IDE, we can assign some attributes to the communication service (so it can send and receive msgs with wrappers like msgspec Struct).

from aaambos.std.communications.attributes import MsgTopicSendingAttribute, MsgTopicCallbackReceivingAttribute, \
RunTimeReceivingNewTopicsAttribute

...

class GuiComTutorialModule(Module, ModuleInfo):
config: GuiComTutorialModuleConfig
com: CommunicationService | MsgTopicSendingAttribute | MsgTopicCallbackReceivingAttribute | RunTimeReceivingNewTopicsAttribute

Feature creation

We now need some features that we can use in our module to indicate that we are sending the promise. At the same time, we create a feature that can be used by another module to indicate that it wants to receive the promise.

UserInputTextComFeatureIn, UserInputTextComFeatureOut = create_in_out_com_feature(USER_INPUT_TEXT, promise=UserInputTextPromise, version_str="0.1.0", requirements=[], version_compatibility_str=">=0.1.0")

Now we just need to add the feature to the provides_features method:

@classmethod
def provides_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]:
return {UserInputTextComFeatureOut.name: (UserInputTextComFeatureOut, SimpleFeatureNecessity.Required)}

We also need to require the FeatureIn to be provided by the architecture (requires_feature):

UserInputTextComFeatureIn.name: (UserInputTextComFeatureIn,  SimpleFeatureNecessity.Required),

But wait! We need to change the None from the start in the send call to UserInputTextPromise.settings.topic. settings means that it is "set" by aaambos during architecture setup.

So without another modul, this is completely useless. If you somehow have another module, there you would mention the ComFeatureOut as a required feature and the Input as a provided feature.

other_module.py
from {{ cookiecutter.__package_name }}.modules.gui_com_tutorial_module import UserInputTextComFeatureIn, UserInputTextComFeatureOut, UserInputTextPromise, UserInputText
from aaambos.core.communication.topic import Topic

...

@classmethod
def provides_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]:
return {UserInputTextComFeatureIn.name: (UserInputTextComFeatureIn, SimpleFeatureNecessity.Required)}

@classmethod
def requires_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]:
return {
UserInputTextComFeatureOut.name: (UserInputTextComFeatureOut, SimpleFeatureNecessity.Required),
}

You also need to register the callback method (which is called when such message is received). Put it in the initialize method:

other_module.py
await self.com.register_callback_for_promise(UserInputTextPromise, self.handle_user_input_text)

We mentioned the callback method handle_user_input_text. We need to implement this function as well.

other_module.py
async def handle_user_input_text(self, topic: Topic, msg: UserInputText):
self.log.info(f"User Input from other module received {msg.text!r}")

That's it! Here we also see how to use the logger part of the module: Different levels are trace, info, warining, error.

Include it in an arch_config

In an arch_config, you need to give your module instance a name (relevant if there are more instances of the same module in an architecture/agent). For example, under the modules key:

user_input_module:
module_info: !name:{{ cookiecutter.__package_name }}.modules.gui_com_tutorial_module.GuiComTutorialModule

And again replacing the brackets part with the package name.

You can run your defined architecture with:

aaambos run --run_config=qibullet_simulation/configs/run_config_pg.yml --arch_config=qibullet_simulation/configs/arch_config_qibullet_example.yml

If your configs are in the qibullet_simulation pkg. Otherwise you need to replace that string.

· 2 min read
Florian Schröder
info

There is now a dedicated Installation Page for aaambos in the Docs.

How to install AAAMBOS on your machine?

First of all you need:

  • (recommended) a Linux operating system, e.g., Ubuntu. The guide will use the Linux/Ubuntu shell commands.
  • conda: You can install miniconda (recommended) or Anaconda following these guides.
tip

For python programming, debugging and execution use an IDE like PyCharm.

If you do a lot with conda, consider install mamba. It's much faster!

The installation process:

  • Create a new conda environment aaambos with Python 3.10:
conda create -n aaambos python=3.10
  • Activate the environment:
conda activate aaambos
note

You can deactivate the environment with conda deactivate.

  • Install aaambos via pip.
pip install aaambos@git+https://gitlab.ub.uni-bielefeld.de/scs/aaambos/aaambos@main
  • You probably need also a communication service. At the moment, the ipaacar com is the only available. Install it with
pip install ipaacar_com_service@git+https://gitlab.ub.uni-bielefeld.de/scs/aaambos/aaambos_pkg_ipaacar_com_service.git
caution

For ipaacar communication you need a running mqtt broker when running an aaambos architecture. You can use nanomq if you not already have one running (e.g. Mosquitto). Apt-get installation of nanaomq here. You need to start the broker before starting any architecture with nanomq start inside a shell/terminal. Just keep the shell open.

caution

It looks like that the referenced ipaacar wheel (via a dependency in the setup.py in the ipaacar communication service) only runs on Ubuntu 22.04. You can build ipaacar from scratch (repo, following the instructions in the README.md). After that and the installation of the created wheel in the aaambos conda environment, you need to install the aaambos_pkg_ipaacar_com_service without dependencies pip install ipaacar_com_service@git+https://gitlab.ub.uni-bielefeld.de/scs/aaambos/aaambos_pkg_ipaacar_com_service.git --no-deps.

That's it for the installation part of aaambos!

Read more in the Introduction part.