files

get_md5

No description provided

Parameters

file
PathRequired

No description provided

1def get_md5(file: Path):
2 obj = hashlib.md5()
3 with file.open("rb") as fp:
4 while True:
5 line = fp.read(_DEFAULT_CHUNK_SIZE)
6 if not line:
7 break
8 obj.update(line)
9 return obj.hexdigest()

upload

No description provided

Parameters

url
strRequired

No description provided

file
PathRequired

No description provided

Returns

returns
bool

No description provided

1def upload(url: str, file: Path) -> bool:
2 with file.open("rb") as data:
3 return requests.put(url=url, data=data).status_code in (200, 403)

class FileUploadable

No description provided

Properties

name
str

Class property

mimetype
str

Class property

s3key
str

Class property

Methods

def from_path

No description provided

Parameters

file
t.Union[str, Path]Required

No description provided

client
ComposioRequired

No description provided

action
strRequired

No description provided

app
strRequired

No description provided

Returns

returns
te.Self

No description provided

1@classmethod
2def from_path(cls, file: t.Union[str, Path], client: Composio, action: str, app: str) -> te.Self:
3 file = Path(file)
4 if not file.exists():
5 raise SDKFileNotFoundError(f'File not found: {file}. Please provide a valid file path.')
6 if not file.is_file():
7 raise SDKFileNotFoundError(f'Not a file: {file}. Please provide a valid file path.')
8 if not os.access(file, os.R_OK):
9 raise SDKFileNotFoundError(f'File not readable: {file}. Please check the file permissions.')
10 mimetype = mimetypes.guess(file=file)
11 s3meta = client.actions.create_file_upload(app=app, action=action, filename=file.name, mimetype=mimetype, md5=get_md5(file=file))
12 if not s3meta.exists and (not upload(url=s3meta.url, file=file)):
13 raise ErrorUploadingFile(f'Error uploading file: {file}')
14 return cls(name=file.name, mimetype=mimetype, s3key=s3meta.key)

class FileDownloadable

No description provided

Properties

name
str

Class property

mimetype
str

Class property

s3url
str

Class property

Methods

def download

No description provided

Parameters

outdir
PathRequired

No description provided

chunk_size
intDefaults to _DEFAULT_CHUNK_SIZE

No description provided

Returns

returns
Path

No description provided

1def download(self, outdir: Path, chunk_size: int=_DEFAULT_CHUNK_SIZE) -> Path:
2 outfile = outdir / self.name
3 outdir.mkdir(exist_ok=True, parents=True)
4 response = requests.get(url=self.s3url, stream=True)
5 if response.status_code != 200:
6 raise ErrorDownloadingFile(f'Error downloading file: {self.s3url}')
7 with outfile.open('wb') as fd:
8 for chunk in response.iter_content(chunk_size=chunk_size):
9 fd.write(chunk)
10 return outfile