utils

filter_non_beta_items

No description provided

Parameters

items
t.Sequence[EnumModels]Required

No description provided

Returns

returns
t.List

No description provided

1def filter_non_beta_items(items: t.Sequence[EnumModels]) -> t.List:
2 filtered_items: t.List[EnumModels] = []
3 for item in items:
4 if not item.name.lower().endswith("beta"):
5 filtered_items.append(item)
6
7 seen: t.Set[str] = set()
8 unique_items: t.List[EnumModels] = []
9 for item in filtered_items:
10 if item.name not in seen:
11 unique_items.append(item)
12 seen.add(item.name)
13
14 return unique_items

update_apps

Update apps.

Parameters

client
ComposioRequired

No description provided

beta
boolDefaults to False

No description provided

Returns

returns
t.List[AppModel]

No description provided

References

1def update_apps(client: Composio, beta: bool = False) -> t.List[AppModel]:
2 """Update apps."""
3 apps = sorted(
4 client.apps.get(),
5 key=lambda x: x.key,
6 )
7 if not beta:
8 apps = filter_non_beta_items(apps)
9
10 _update_apps(apps=apps)
11 return apps

update_actions

Update actions and tags.

Parameters

client
ComposioRequired

No description provided

apps
t.List[AppModel]Required

No description provided

beta
boolDefaults to False

No description provided

References

1def update_actions(
2 client: Composio, apps: t.List[AppModel], beta: bool = False
3) -> None:
4 """Update actions and tags."""
5 actions = sorted(
6 client.actions.get(allow_all=True),
7 key=lambda x: f"{x.appName}_{x.name}",
8 )
9 if not beta:
10 actions = filter_non_beta_items(actions)
11
12 _update_tags(apps=apps, actions=actions)
13 _update_actions(apps=apps, actions=actions)

update_triggers

Update triggers.

Parameters

client
ComposioRequired

No description provided

apps
t.List[AppModel]Required

No description provided

beta
boolDefaults to False

No description provided

References

1def update_triggers(
2 client: Composio, apps: t.List[AppModel], beta: bool = False
3) -> None:
4 """Update triggers."""
5 triggers = sorted(
6 client.triggers.get(),
7 key=lambda x: f"{x.appKey}_{x.name}",
8 )
9 if not beta:
10 triggers = filter_non_beta_items(triggers)
11
12 _update_triggers(apps=apps, triggers=triggers)

check_cache_refresh

Check if the actions have a ‘replaced_by’ field and refresh the cache if not.

Parameters

client
ComposioRequired

No description provided

References

1def check_cache_refresh(client: Composio) -> None:
2 """
3 Check if the actions have a 'replaced_by' field and refresh the cache if not.
4 This is a workaround to invalidate local caches from older Composio versionos
5 that do not have the 'replaced_by' field.
6
7 Before this version, checking if an action is deprecated or not depended on the
8 SDK version, and didn't come from the API. We need to start storing the data
9 from the API and invalidate the cache if the data is not already stored.
10 """
11 if NO_CACHE_REFRESH:
12 return
13
14 if enums.base.ACTIONS_CACHE.exists():
15 first_file = next(enums.base.ACTIONS_CACHE.iterdir(), None)
16 if first_file is not None:
17 first_action = json.loads(first_file.read_text())
18 if "replaced_by" in first_action:
19 logger.debug("Actions cache is up-to-date")
20 return
21
22 logger.info("Actions cache is outdated, refreshing cache...")
23 apps = update_apps(client)
24 update_actions(client, apps)
25 update_triggers(client, apps)