Skip to main content

Communication

Modules communicate with each other. The communication service is the part of a module where it happens (self.com). With CommunicationServiceAttributes, different abilities of a service can be defined. Different CommunicationServices can be implemented to use different exchange methods, like broker backend, etc.

The CommunicationService instances are set during runtime before the module initialization. This allows the replacement of the communication service via the config.

To integrate different services in one module, a service mediator can be a placed instead of an individual service in the module. The mediator delegates the function calls to the responsible service.

Messages are atomic exchange units that can be sent and received via communication services. IUs are incremental units that can be updated (so not atomic), see ipaaca(r).

tip

Reminder async functions need to be called with await or utility functions from asyncio.

info

The examples are from the PingPongModule and PingPongIUModule.

Units and Promises

A wrapper class is a dataclass-like class that defines the attributes of the payload. It is a utility class that provides auto-generated init-methods, at best also type checking, and default values. At the moment the msgspec.Struct is the used base class, because of the promoted performance for serialization and type checking. Compared to attrs and dataclasses it sadly does not implement type hints for the init method.

CommunicationUnits contain information about the content of the data send over a topic, e.g., the wrapper class that defines the structure, a description, default leaf topic, and required service (attribute) categories. There should be one unit per topic.

CommunicationPromises (or just promises) encapsulate the information related to the requested and provided exchange of a module to a specific topic. So, the reference to the unit, the requested or provided frequency a message or IU is sent, wanted to be received, or updated. Further, required communication service attributes, and maybe more in the future. On the one hand, promises should be individual for every module they can be copied for convenience. In the future, a promise converter could help to convert promises to other promises so that different definitions can be used (e.g., FaceFeaturesComplex could be converted/shrank down to FaceFeatureSimple). How this would work with the feature system needs to be explored.

The topic in all promises are set on run time in the settings part of the promises.

info

There are utility methods to create units and promises (and communication features). The create command provides the generation of units and promises for a named exchange.

aaambos create com_promise -n MyNewMsg
Example
from msgspec import Struct
from aaambos.std.communications.utils import create_promise_and_unit

Counting = "Counting"
"""The name and leaf topic of the message, etc."""

WAIT_UNTIL_SEND = 0.2

class CountingMsg(Struct):
"""The Counting Wrapper, the count and the last module that increased the count"""
value: int
last_module: str

CountingPromise, CountingUnit = create_promise_and_unit(
name=Counting,
payload_wrapper=CountingMsg,
unit_description="Msg that contains increased value.",
frequency=1/WAIT_UNTIL_SEND, # Wrong but currently not used anyway.
)
"""The promise of the counting."""

How to create CommunicationFeatures for a promise see features.

Topics

Topics are the references to the channel on that messages and IUs are published and subscribed. aaambos generates these topics during the architecture setup. This is useful for multi-agent runs and general controllability. It uses HierarchicalTopic that can depend on the agent name and other useful subdivisions. Example: agent_name/instance_name/leaf_topic. The LeafTopic depends on the Promise and is often just the specific message name, e.g., Counting. But the module has to specify the whole topic object for sending messages/IUs and registering callbacks. This can be done via the stored topics in the module self.tpc["MyLeafTopic"] (only set for the enabled features). In other places, you might see MyPromise.settings.topic which is set during architecture setup.

Communication Service

The base class of the communication service is very empty because of the usage of CommunicationServiceAttributes that specify different abilities of a service. Therefore, the service base class only has an initialize and the init method.

A CommunicationServiceInfo contains the reference to the name, import path, and CommunicationSetup. Further, it contains a dict of the inherited CommunicationServiceAttributes and the extra attribute categories. These attribute categories encapsulate referencable and reusable categories to which an attribute belongs. A category is just a string. (These categories have nothing to do with topics.)

tip

You can add better type hints for the communication service via the class attributes of a module:

com: MsgTopicSendingAttribute | MsgPayloadWrapperCallbackReceivingAttribute | CommunicationService

Attributes

The aaambos standard library defines some standard communication service attributes related to messages, and IUs, and the usage of promises and wrappers (API docs). The IPAACARService implements all the following attributes.

MsgTopicSendingAttribute

Sending a message via the send method in any async method of the class:

async def send(self, topic: Topic, msg):
...

You should only send messages on topics that you specified as a CommunicationFeature which is enabled.

Example
await self.com.send(self.tpc[Counting], CountingMsg(self.counter, self.name))

It uses the topic stored in the module.

MsgTopicCallbackReceivingAttribute

Receive a message via registering a callback. Also, it contains a utility method to register a callback via a promise (topic + wrapper).

The callback is in the current version a method with two arguments: the topic and the message (in the wrapper class).

async def register_callback(self, topic: Topic, callback: Callable, *args, **kwargs):
...

async def unregister_callback(self, topic: Topic, callback: Callable):
...

async def register_callback_for_promise(self, promise: CommunicationPromise, callback: Callable):
await self.register_callback(promise.settings.topic, callback, promise.communication_unit.payload_wrapper)
Example

The callback is registered in the initialize-method.

if self.config.ft_info.com_features[CountingFeature.name]:
self.log.info("Counting Feature enabled")
await self.com.register_callback_for_promise(CountingPromise, self.handle_increased)

A callback method can look like the following:

async def handle_increased(self, topic: Topic, msg: CountingMsg):
...

PayloadWrapperAttribute

Maybe unnecessary, but maybe for the future to check if a payload suits to a wrapper class.

def check_wrapping(self, payload, wrapper: Type[Any]) -> bool:
...

MsgPayloadWrapperCallbackReceivingAttribute

Definitely use a wrapper class for the callback argument. Inherits from MsgTopicCallbackReceivingAttribute.

async def register_callback(self, topic: Topic, callback: Callable, wrapper: Type[Any] = ..., *args, **kwargs):
...

IncrementalUnitTopicWrapperAttribute

Create and receive IUs with wrapper classes.

async def create_and_send_iu(self, topic: Topic, payload) -> 'IU':
...

async def register_callback_iu(self, topic: Topic, callback: Callable[[Topic, Any, 'IUEvents', 'IU'], None], wrapper: Type[Any] = ..., *args, **kwargs):
...

async def register_callback_iu_for_promise(self, promise: CommunicationPromise, callback: Callable):
await self.register_callback_iu(promise.settings.topic, callback, promise.communication_unit.payload_wrapper)

async def set_payload_iu(self, iu, payload):
...
Example

Registering the callback is similar to the message callback registration.

await self.com.register_callback_iu_for_promise(CountingPromise, self.handle_increased)

Creating and sending an iu is done once for a IU. It can then be updated.

await self.com.create_and_send_iu(self.tpc[Counting], CountingMsg(self.counter, self.name))

In ipaacaR set Payload already communicates the update:

await self.com.set_payload_iu(self.iu, CountingMsg(self.counter, self.name))

The callback method of a IU has different arguments:

async def handle_increased(self, topic: Topic, msg: CountingMsg, event: IUEvent, iu: 'IU'):
...

IncrementalUnitAccessAttribute

Cover the access to all received and sent IUs.

async def get_all_ius(self) -> list['IU']:
...

async def get_iu_by_id(self, uid=str) -> 'IU':
...

RunTimeReceivingNewTopicsAttribute

If a service can new input (receiving) topics during runtime (after setup of the service).

async def add_input_topic(self, topic: Topic, callback: Callable[[Topic, Any], Awaitable[None]] = ..., wrapper: Any = ...):
...

RunTimeSendingNewTopicsAttribute

If a service can send to new topics that are not mentioned during setup.

async def add_output_topic(self, topic: Topic):
...

Configure the Service

You can specify which communication service you want to use via referencing the CommunicationServiceInfo in your arch_config.yml

arch_config.yml
communication:
communication_prefs:
- !name:ipaacar_com_service.communications.ipaacar_com.IPAACARInfo

It specifies just the communication preference, because modules, promises, and units could require other services based on their specified attributes and categories. Therefore, you can specify the preferred service infos as a list with the first element with the highest priority, etc.

In the general.plus part of the run_config.yml you can specify service dependent configuration. For example, for ipaacaR the ipaacar_address (the broker location), the default is "localhost:1883"

run_config.yml
general:
plus:
ipaacar_address: "localhost:1883"

In the future, there might be a more general CommunicationServiceConfig.

Internals

A CommunicationService class has a CommunicationSetup class associated that handles the setup of each service instance.

CommunicationServiceInfo contains information about the implemented attributes, categories, and the import path. This class should be referneced in the arch_config.yml