Composio SDK tools.

class IntegrationParams

No description provided

Properties

integration_id
str

Class property

auth_scheme
str

Class property

expected_params
t.List[ExpectedFieldInput]

Class property


class ProcessorsType

Request and response processors.

Properties

pre
te.NotRequired[t.Dict[_KeyType, _CallableType]]

Class property

post
te.NotRequired[t.Dict[_KeyType, _CallableType]]

Class property

schema
te.NotRequired[t.Dict[_KeyType, _CallableType]]

Class property


class VersionLock

Lock file representing action->version mapping

Methods

def update

No description provided

Parameters

versions
t.Optional[t.Dict[str, str]]

No description provided

override
boolDefaults to False

No description provided

1def update(self, versions: t.Optional[t.Dict[str, str]]=None, override: bool=False, **_versions: str):
2 for action, version in {**(versions or {}), **_versions}.items():
3 if action in self._versions and (not override):
4 continue
5 self._versions[action] = version

def apply

No description provided

Parameters

actions
t.List[Action]Required

No description provided

Returns

returns
t.List[Action]

No description provided

1def apply(self, actions: t.List[Action]) -> t.List[Action]:
2 return list(map(self._apply, actions))

def get

No description provided

Parameters

action
ActionTypeRequired

No description provided

Returns

returns
t.Optional[str]

No description provided

1def get(self, action: ActionType) -> t.Optional[str]:
2 return self._versions.get(str(action))

def lock

No description provided

1def lock(self) -> None:
2 self.__store()

class ProcessorHelper

No description provided

Properties

__processors
ProcessorsType

Class property

__metadata
MetadataType

Class property

Methods

def merge_processors

No description provided

Parameters

processors
ProcessorsTypeRequired

No description provided

1def merge_processors(self, processors: ProcessorsType) -> None:
2 for ptype in t.cast(t.Iterator[ProcessorType], self.__processors.keys()):
3 if ptype not in processors:
4 continue
5 if ptype not in self.__processors:
6 self.__processors[ptype] = {}
7 self.__processors[ptype].update(processors[ptype])

def process_request

No description provided

Parameters

action
ActionRequired

No description provided

request
t.DictRequired

No description provided

Returns

returns
t.Dict

No description provided

1def process_request(self, action: Action, request: t.Dict) -> t.Dict:
2 processed = self._process(key=App(action.app), data=request, type_='pre')
3 if isinstance(processed, _Retry):
4 raise ProcessorError('Received RETRY from App preprocessor function. Preprocessors cannot be retried.')
5 processed = self._process(key=action, data=processed, type_='pre')
6 if isinstance(processed, _Retry):
7 raise ProcessorError('Received RETRY from Action preprocessor function. Preprocessors cannot be retried.')
8 return processed

def process_respone

No description provided

Parameters

action
ActionRequired

No description provided

response
t.DictRequired

No description provided

Returns

returns
t.Union[t.Dict, _Retry]

No description provided

1def process_respone(self, action: Action, response: t.Dict) -> t.Union[t.Dict, _Retry]:
2 processed = self._process(key=App(action.app), data=response, type_='post')
3 if isinstance(processed, _Retry):
4 return RETRY
5 processed = self._process(key=action, data=processed, type_='post')
6 if isinstance(processed, _Retry):
7 return RETRY
8 return processed

def process_schema_properties

No description provided

Parameters

action
ActionRequired

No description provided

properties
t.DictRequired

No description provided

Returns

returns
t.Dict

No description provided

1def process_schema_properties(self, action: Action, properties: t.Dict) -> t.Dict:
2 processed = self._process(key=App(action.app), data=properties, type_='schema')
3 if isinstance(processed, _Retry):
4 raise ProcessorError('Received RETRY from App schema processor function. Schema pprocessors cannot be retried.')
5 processed = self._process(key=action, data=processed, type_='schema')
6 if isinstance(processed, _Retry):
7 raise ProcessorError('Received RETRY from Action preprocessor function. Schema processors cannot be retried.')
8 return processed

def add_metadata

No description provided

Parameters

action
ActionRequired

No description provided

metadata
t.Optional[t.Dict]

No description provided

Returns

returns
t.Dict

No description provided

1def add_metadata(self, action: Action, metadata: t.Optional[t.Dict]) -> t.Dict:
2 metadata = metadata or {}
3 metadata.update(self._get_metadata(key=App(action.app)))
4 metadata.update(self._get_metadata(key=action))
5 return metadata

class FileIOHelper

No description provided

Methods

def write_output

Write output to a file.

Parameters

action
ActionRequired

No description provided

output
t.DictRequired

No description provided

entity_id
strDefaults to DEFAULT_ENTITY_ID

No description provided

Returns

returns
t.Dict

No description provided

1def write_output(self, action: Action, output: t.Dict, entity_id: str=DEFAULT_ENTITY_ID) -> t.Dict:
2 """Write output to a file."""
3 filename = hashlib.sha256(f'{action.name}-{entity_id}-{time.time()}'.encode()).hexdigest()
4 outfile = self.outdir / filename
5 self.logger.debug(f'Writing output to: {outfile}')
6 self._write_file(outfile, json.dumps(output))
7 return {'message': f'output written to {outfile.resolve()}', 'file': str(outfile.resolve())}

def write_downloadable

No description provided

Parameters

prefix
strRequired

No description provided

response_model
SuccessExecuteActionResponseModelRequired

No description provided

Returns

returns
dict

No description provided

1def write_downloadable(self, prefix: str, response_model: SuccessExecuteActionResponseModel) -> dict:
2 error = response_model.error
3 resp_data = response_model.data
4 is_invalid_file = False
5 for key, val in resp_data.items():
6 try:
7 file_model = FileType.model_validate(val)
8 local_filepath = self.outdir / f"{prefix}_{file_model.name.replace('/', '_')}"
9 self._write_file(path=local_filepath, content=base64.urlsafe_b64decode(file_model.content))
10 resp_data[key] = str(local_filepath)
11 except binascii.Error:
12 is_invalid_file = True
13 resp_data[key] = 'Invalid File! Unable to decode.'
14 except Exception:
15 pass
16 if is_invalid_file is True and error is None:
17 response_model.error = 'Execution failed'
18 response_model.data = resp_data
19 return response_model.model_dump()

class SchemaHelper

No description provided

Methods

def client

No description provided

Returns

returns
Composio

No description provided

1@property
2def client(self) -> Composio:
3 return self._client()

def get_runtime_action_schemas

No description provided

Parameters

actions
t.List[Action]Required

No description provided

Returns

returns
t.List[ActionModel]

No description provided

1def get_runtime_action_schemas(self, actions: t.List[Action]) -> t.List[ActionModel]:
2 items: list[ActionModel] = []
3 for action in actions:
4 if not action.is_runtime:
5 continue
6 _action = action_registry['runtime'][action.slug]
7 schema = _action.schema()
8 schema['name'] = _action.enum
9 items.append(ActionModel(**schema, version=VERSION_LATEST, available_versions=[VERSION_LATEST]).model_copy(deep=True))
10 self._schema_cache.update({item.name: item.model_copy() for item in items})
11 return items

def get_local_action_schemas

No description provided

Parameters

apps
t.List[App]Required

No description provided

actions
t.List[Action]Required

No description provided

tags
t.Optional[t.Sequence[TagType]]

No description provided

Returns

returns
t.List[ActionModel]

No description provided

1def get_local_action_schemas(self, apps: t.List[App], actions: t.List[Action], tags: t.Optional[t.Sequence[TagType]]=None) -> t.List[ActionModel]:
2 actions = [action for action in actions if action.is_local and (not action.is_runtime)]
3 apps = [app for app in apps if app.is_local]
4 if len(actions) == 0 and len(apps) == 0:
5 return []
6 items = [ActionModel(**item, version=VERSION_LATEST, available_versions=[VERSION_LATEST]) for item in self._local_client.get_action_schemas(apps=apps, actions=actions, tags=tags)]
7 self._schema_cache.update({item.name: item.model_copy() for item in items})
8 return items

def get_remote_actions_schemas

No description provided

Parameters

apps
t.List[App]Required

No description provided

actions
t.List[Action]Required

No description provided

tags
t.Optional[t.Sequence[TagType]]

No description provided

check_connected_account
t.Optional[t.Callable]

No description provided

Returns

returns
t.List[ActionModel]

No description provided

1def get_remote_actions_schemas(self, apps: t.List[App], actions: t.List[Action], tags: t.Optional[t.Sequence[TagType]]=None, check_connected_account: t.Optional[t.Callable]=None) -> t.List[ActionModel]:
2 actions = [action for action in actions if not action.is_local]
3 apps = [app for app in apps if not app.is_local]
4 if len(actions) == 0 and len(apps) == 0:
5 return []
6 versioned_actions = [a for a in actions if a.is_version_set]
7 none_versioned_actions = [a for a in actions if not a.is_version_set]
8 items = [self.client.actions.get(a) for a in versioned_actions]
9 if len(none_versioned_actions) > 0 or len(apps) > 0:
10 items += self.client.actions.get(apps=apps, actions=none_versioned_actions, tags=tags)
11 self._schema_cache.update({item.name: item.model_copy() for item in items})
12 if check_connected_account is None:
13 warnings.warn('Not verifying connected accounts for apps. Actions may fail when the Agent tries to use them.', UserWarning)
14 return items
15 for item in items:
16 check_connected_account(action=item.name)
17 return items

def process_schema

No description provided

Parameters

action
ActionModelRequired

No description provided

processor_helper
ProcessorHelperRequired

No description provided

description_char_limit
intRequired

No description provided

action_name_char_limit
t.Optional[int]

No description provided

Returns

returns
ActionModel

No description provided

1def process_schema(self, action: ActionModel, processor_helper: ProcessorHelper, description_char_limit: int, action_name_char_limit: t.Optional[int]=None) -> ActionModel:
2 action.parameters = action.parameters.model_validate(obj=self._process_schema_recursively(schema=action.parameters.model_dump()))
3 action.response = action.response.model_validate(obj=self._process_schema_recursively(schema=action.response.model_dump()))
4 if action.description is not None:
5 action.description = action.description[:description_char_limit]
6 action.parameters.properties = processor_helper.process_schema_properties(action=Action(action.name.upper()), properties=action.parameters.properties)
7 if action_name_char_limit is not None:
8 action.name = action.name[:action_name_char_limit]
9 if action.name == Action.ANTHROPIC_BASH_COMMAND.slug:
10 action.name = 'bash'
11 if action.name == Action.ANTHROPIC_COMPUTER.slug:
12 action.name = 'computer'
13 if action.name == Action.ANTHROPIC_TEXT_EDITOR.slug:
14 action.name = 'str_replace_editor'
15 return action

def substitute_file_uploads

No description provided

Parameters

action
ActionRequired

No description provided

request
t.DictRequired

No description provided

Returns

returns
t.Dict

No description provided

1def substitute_file_uploads(self, action: Action, request: t.Dict) -> t.Dict:
2 model = self._get_single_action_schema(action=action)
3 return self._substitute_file_uploads_recursively(schema=model.parameters.model_dump(), request=request, action=action)

def substitute_file_downloads

No description provided

Parameters

action
ActionRequired

No description provided

request
t.DictRequired

No description provided

file_helper
FileIOHelperRequired

No description provided

Returns

returns
t.Dict

No description provided

1def substitute_file_downloads(self, action: Action, request: t.Dict, file_helper: FileIOHelper) -> t.Dict:
2 model = self._get_single_action_schema(action=action)
3 return self._substitute_file_downloads_recursively(schema=model.response.model_dump(), request=request, action=action, file_helper=file_helper)

class CustomAuthHelper

No description provided

Methods

def add

No description provided

Parameters

app
AppTypeRequired

No description provided

parameters
t.List[CustomAuthParameter]Required

No description provided

base_url
t.Optional[str]

No description provided

body
t.Optional[t.Dict]

No description provided

1def add(self, app: AppType, parameters: t.List[CustomAuthParameter], base_url: t.Optional[str]=None, body: t.Optional[t.Dict]=None) -> None:
2 self._custom_auth[App(app)] = CustomAuthObject(body=body or {}, base_url=base_url, parameters=parameters)

def get_custom_params_for_local_execution

No description provided

Parameters

action
ActionRequired

No description provided

Returns

returns
t.Dict

No description provided

1def get_custom_params_for_local_execution(self, action: Action) -> t.Dict:
2 custom_auth = self._custom_auth.get(App(action.app))
3 if custom_auth is None:
4 return {}
5 if action.is_runtime:
6 return self._get_custom_params_for_runtime_action(custom_auth=custom_auth)
7 return self._get_custom_params_for_local_action(custom_auth=custom_auth, app=action.app)

def get_custom_params_for_remote_execution

No description provided

Parameters

action
ActionRequired

No description provided

Returns

returns
t.Optional[CustomAuthObject]

No description provided

1def get_custom_params_for_remote_execution(self, action: Action) -> t.Optional[CustomAuthObject]:
2 return self._custom_auth.get(App(action.app))

def has_custom_auth

No description provided

Parameters

app
AppRequired

No description provided

Returns

returns
bool

No description provided

1def has_custom_auth(self, app: App) -> bool:
2 return app in self._custom_auth

class ComposioToolSet

Composio toolset.

Properties

_connected_accounts
t.Optional[t.List[ConnectedAccountModel]]

Class property

_remote_client
t.Optional[Composio]

Class property

_workspace
t.Optional[Workspace]

Class property

_runtime
str

Class property

_description_char_limit
int

Class property

_action_name_char_limit
t.Optional[int]

Class property

_log_ingester_client
t.Optional[LogIngester]

Class property

Methods

def api_key

No description provided

Returns

returns
str

No description provided

1@property
2def api_key(self) -> str:
3 if self._api_key is None:
4 raise ApiKeyNotProvidedError
5 return self._api_key

def client

No description provided

Returns

returns
Composio

No description provided

1@property
2def client(self) -> Composio:
3 return self._init_client()

def workspace

Workspace for this toolset instance.

Returns

returns
Workspace

No description provided

1@property
2def workspace(self) -> Workspace:
3 """Workspace for this toolset instance."""
4 if self._workspace is not None:
5 return self._workspace
6 if self._workspace_id is not None:
7 self._workspace = WorkspaceFactory.get(id=self._workspace_id)
8 return self._workspace
9 workspace_config = self._workspace_config or HostWorkspaceConfig()
10 if workspace_config.composio_api_key is None:
11 try:
12 workspace_config.composio_api_key = self.api_key
13 except ApiKeyNotProvidedError:
14 warnings.warn('Running without a Composio API key', UserWarning, stacklevel=2)
15 if workspace_config.composio_base_url is None:
16 workspace_config.composio_base_url = self._base_url
17 if workspace_config.github_access_token is None and workspace_config.composio_api_key is not None:
18 workspace_config.github_access_token = self._try_get_github_access_token_for_current_entity()
19 self._workspace = WorkspaceFactory.new(config=workspace_config)
20 return self._workspace

def check_connected_account

Check if connected account is required and if required it exists or not.

Parameters

action
ActionTypeRequired

No description provided

entity_id
t.Optional[str]

No description provided

1def check_connected_account(self, action: ActionType, entity_id: t.Optional[str]=None) -> None:
2 """Check if connected account is required and if required it exists or not."""
3 action = Action(action)
4 if action.no_auth or action.is_runtime:
5 return
6 if self._custom_auth_helper.has_custom_auth(App(action.app)):
7 return
8 if self._connected_accounts is None:
9 self._connected_accounts = t.cast(t.List[ConnectedAccountModel], self.client.connected_accounts.get())
10 if action.app not in [connection.appUniqueId.upper() for connection in self._connected_accounts if entity_id is None or connection.clientUniqueUserId == entity_id]:
11 raise ConnectedAccountNotFoundError(f'No connected account found for app `{action.app}`; Run `composio add {action.app.lower()}` to fix this')

def set_workspace_id

No description provided

Parameters

workspace_id
strRequired

No description provided

1def set_workspace_id(self, workspace_id: str) -> None:
2 self._workspace_id = workspace_id
3 if self._workspace is not None:
4 self._workspace = WorkspaceFactory.get(id=workspace_id)

def execute_action

Execute an action on a given entity.

Parameters

action
ActionTypeRequired

Action to execute

params
dictRequired

The parameters to pass to the action

metadata
t.Optional[t.Dict]

Metadata for executing local action

entity_id
t.Optional[str]

The ID of the entity to execute the action on. Defaults to “default”

connected_account_id
t.Optional[str]

Connection ID for executing the remote action

text
t.Optional[str]

Extra text to use for generating function calling metadata

Returns

returns
t.Dict

Output object from the function call

1@_record_action_if_available
2def execute_action(self, action: ActionType, params: dict, metadata: t.Optional[t.Dict]=None, entity_id: t.Optional[str]=None, connected_account_id: t.Optional[str]=None, text: t.Optional[str]=None, *, processors: t.Optional[ProcessorsType]=None, _check_requested_actions: bool=False) -> t.Dict:
3 """
4 Execute an action on a given entity.
5
6 :param action: Action to execute
7 :param params: The parameters to pass to the action
8 :param entity_id: The ID of the entity to execute the action on. Defaults to "default"
9 :param text: Extra text to use for generating function calling metadata
10 :param metadata: Metadata for executing local action
11 :param connected_account_id: Connection ID for executing the remote action
12 :return: Output object from the function call
13 """
14 action = Action(action)
15 if self._version_lock is not None:
16 action, = self._version_lock.apply(actions=[action])
17 if _check_requested_actions and action.slug not in self._requested_actions:
18 raise InvalidParams(f'Action {action.slug} is being called, but was never requested by the toolset. Make sure that the actions you are trying to execute are requested in your `get_tools()` call.')
19 params = self._serialize_execute_params(param=params)
20 if processors is not None:
21 self._processor_helpers.merge_processors(processors)
22 params = self._schema_helper.substitute_file_uploads(action=action, request=params)
23 if not action.is_runtime:
24 params = self._processor_helpers.process_request(action=action, request=params)
25 metadata = self._processor_helpers.add_metadata(action=action, metadata=metadata)
26 connected_account_id = connected_account_id or self._connected_account_ids.get(App(Action(action).app))
27 self.logger.debug(f'Executing `{action.slug}@{action.version}` with params={params!r} and metadata={metadata!r} connected_account_id={connected_account_id!r}')
28 failed_responses = []
29 for _ in range(self.max_retries):
30 response = self._execute_local(action=action, params=params, metadata=metadata, entity_id=entity_id) if action.is_local else self._execute_remote(action=action, params=params, entity_id=entity_id or self.entity_id, connected_account_id=connected_account_id, text=text, session_id=self.session_id, allow_tracing=self.allow_tracing)
31 processed_response = response if action.is_runtime else self._processor_helpers.process_respone(action=action, response=response)
32 if isinstance(processed_response, _Retry):
33 self.logger.debug(f'Got processed_response={processed_response!r} from action={action!r} with params={params!r}, retrying...')
34 failed_responses.append(response)
35 continue
36 response = processed_response
37 self.logger.debug(f'Got response={response!r} from action={action!r} with params={params!r}')
38 return response
39 return SuccessExecuteActionResponseModel(successfull=False, data={'failed_responses': failed_responses}, error=f'Execution failed after {self.max_retries} retries.').model_dump()

def execute_request

No description provided

Parameters

endpoint
strRequired

No description provided

method
strRequired

No description provided

Returns

returns
t.Dict

No description provided

1@t.overload
2def execute_request(self, endpoint: str, method: str, *, body: t.Optional[t.Dict]=None, parameters: t.Optional[t.List[CustomAuthParameter]]=None, connection_id: t.Optional[str]=None) -> t.Dict:
3 pass

def execute_request

No description provided

Parameters

endpoint
strRequired

No description provided

method
strRequired

No description provided

Returns

returns
t.Dict

No description provided

1@t.overload
2def execute_request(self, endpoint: str, method: str, *, body: t.Optional[t.Dict]=None, parameters: t.Optional[t.List[CustomAuthParameter]]=None, app: t.Optional[AppType]=None) -> t.Dict:
3 pass

def execute_request

Execute a proxy request to a connected account.

Parameters

endpoint
strRequired

API endpoint to call

method
strRequired

HTTP method to use (GET, POST, etc.)

Returns

returns
t.Dict

:returns: Response from the proxy request

1def execute_request(self, endpoint: str, method: str, *, body: t.Optional[t.Dict]=None, parameters: t.Optional[t.List[CustomAuthParameter]]=None, connection_id: t.Optional[str]=None, app: t.Optional[AppType]=None) -> t.Dict:
2 """
3 Execute a proxy request to a connected account.
4
5 :param endpoint: API endpoint to call
6 :param method: HTTP method to use (GET, POST, etc.)
7 :param body: Request body data
8 :param parameters: Additional auth parameters
9 :param connection_id: ID of the connected account
10 :param app: App type to use for connection lookup
11
12 :returns: Response from the proxy request
13 :raises: InvalidParams: If neither connection_id nor app is provided
14 """
15 if app is not None and connection_id is None:
16 connection_id = self.get_entity(id=self.entity_id).get_connection(app=app).id
17 if connection_id is None:
18 raise InvalidParams('Please provide connection id or app name to execute a request')
19 self.logger.debug(f'Executing request to {endpoint} with method={method}, connection_id={connection_id}')
20 response = self.client.actions.request(connection_id=connection_id, body=body, method=method, endpoint=endpoint, parameters=parameters)
21 self.logger.debug(f'Got response={response!r}')
22 return response

def validate_tools

No description provided

Parameters

apps
t.Optional[t.Sequence[AppType]]

No description provided

actions
t.Optional[t.Sequence[ActionType]]

No description provided

tags
t.Optional[t.Sequence[TagType]]

No description provided

1def validate_tools(self, apps: t.Optional[t.Sequence[AppType]]=None, actions: t.Optional[t.Sequence[ActionType]]=None, tags: t.Optional[t.Sequence[TagType]]=None) -> None:
2 if not apps and (not actions) and (not tags):
3 return
4 self.workspace.check_for_missing_dependencies(apps=apps, actions=actions, tags=tags)

def get_action_schemas

No description provided

Parameters

apps
t.Optional[t.Sequence[AppType]]

No description provided

actions
t.Optional[t.Sequence[ActionType]]

No description provided

tags
t.Optional[t.Sequence[TagType]]

No description provided

Returns

returns
t.List[ActionModel]

No description provided

1def get_action_schemas(self, apps: t.Optional[t.Sequence[AppType]]=None, actions: t.Optional[t.Sequence[ActionType]]=None, tags: t.Optional[t.Sequence[TagType]]=None, *, check_connected_accounts: bool=True, _populate_requested: bool=False) -> t.List[ActionModel]:
2 apps = t.cast(t.List[App], _map_enums(App, apps or []))
3 actions = t.cast(t.List[Action], _map_enums(Action, actions or []))
4 if self._version_lock is not None:
5 actions = self._version_lock.apply(actions=actions)
6 items: t.List[ActionModel] = [*self._schema_helper.get_runtime_action_schemas(actions=actions), *self._schema_helper.get_local_action_schemas(apps=apps, actions=actions, tags=tags), *self._schema_helper.get_remote_actions_schemas(apps=apps, actions=actions, tags=tags, check_connected_account=self.check_connected_account if check_connected_accounts else None)]
7 items = list(map(lambda x: self._schema_helper.process_schema(action=x, processor_helper=self._processor_helpers, description_char_limit=self._description_char_limit, action_name_char_limit=self._action_name_char_limit), items))
8 if _populate_requested:
9 self._requested_actions += [item.name for item in items]
10 if self._version_lock is not None:
11 self._version_lock.lock()
12 return items

def create_trigger_listener

Create trigger subscription.

Parameters

timeout
floatDefaults to 15.0

No description provided

Returns

returns
TriggerSubscription

No description provided

1def create_trigger_listener(self, timeout: float=15.0) -> TriggerSubscription:
2 """Create trigger subscription."""
3 return self.client.triggers.subscribe(timeout=timeout)