"""
Client for managing existing POI data integration workflows
"""
import json
import http
import requests
try:
from urllib.parse import urljoin
except ImportError:
from urlparse import urljoin
from .exceptions import SlipoException
from .utils import json_response, file_response
API_VERSION = "v1"
API_QUERY = '/api/{api_version}/process/'
API_STATUS = '/api/{api_version}/process/{id}/{version}/'
API_SAVE = '/api/{api_version}/process/{id}/save'
API_DOWNLOAD = '/api/{api_version}/process/{id}/{version}/file/{fileId}'
API_START = '/api/{api_version}/process/{id}/{version}/start'
API_STOP = '/api/{api_version}/process/{id}/{version}/stop'
[docs]class ProcessClient(object):
"""ProcessClient provides methods for managing existing POI data integration
workflows.
Details about the API responses are available at the `SLIPO`_ site.
Args:
base_url (str): Base URL for SLIPO API endpoints. The default
value is ``https://app.dev.slipo.eu/``.
api_key (str): SLIPO API key. An application key can be generated using
the SLIPO Workbench application.
Returns:
A :py:class:`ProcessClient <slipo.process.ProcessClient>` object.
.. _SLIPO:
https://app.dev.slipo.eu/docs/webapp-api/index.html#api-Workflow
"""
def __init__(self, base_url, api_key):
self.base_url = base_url
self.api_key = api_key
self.auth_headers = {
'X-API-Key': api_key,
}
self.content_headers = {
'X-API-Key': api_key,
'Content-type': 'application/json',
}
[docs] @json_response
def query(self, term: str = None, pageIndex: int = 0, pageSize: int = 10) -> dict:
"""Query workflow instances.
Args:
term (str, optional): A term for filtering workflows. If specified,
only the workflows whose name contains the term are returned.
pageIndex (str, optional): Page index for data pagination.
pageSize (str, optional): Page size for data pagination.
Returns:
A :obj:`dict` representing the parsed JSON response.
Raises:
SlipoException: If a network or server error has occurred.
"""
endpoint = API_QUERY.format(api_version=API_VERSION)
url = urljoin(self.base_url, endpoint)
query = {
'pagingOptions': {
'pageIndex': pageIndex,
'pageSize': pageSize,
},
'query': {
'name': term,
},
}
return requests.post(
url,
headers=self.content_headers,
data=json.dumps(query)
)
[docs] @json_response
def save(self, process_id: int) -> None:
"""Creates a new version for the specified workflow. The most recent version of
the workflow is copied.
Args:
process_id (int): The process id.
Returns:
A :obj:`dict` representing the parsed JSON response.
Raises:
SlipoException: If a network or server error has occurred.
"""
endpoint = API_SAVE.format(
api_version=API_VERSION,
id=process_id,
)
url = urljoin(self.base_url, endpoint)
return requests.post(
url,
headers=self.content_headers,
data=json.dumps({})
)
[docs] @json_response
def start(self, process_id: int, process_version: int) -> dict:
"""Start or resume the execution of a workflow instance.
Args:
process_id (int): The process id.
process_version (int): The process revision.
Returns:
A :obj:`dict` representing the parsed JSON response.
Raises:
SlipoException: If a network or server error has occurred.
"""
endpoint = API_START.format(
api_version=API_VERSION,
id=process_id,
version=process_version,
)
url = urljoin(self.base_url, endpoint)
return requests.post(url, headers=self.content_headers)
[docs] @json_response
def stop(self, process_id: int, process_version: int) -> None:
"""Stop a running workflow execution instance.
Args:
process_id (int): The process id.
process_version (int): The process revision.
Raises:
SlipoException: If a network or server error has occurred.
"""
endpoint = API_STOP.format(
api_version=API_VERSION,
id=process_id,
version=process_version,
)
url = urljoin(self.base_url, endpoint)
return requests.post(url, headers=self.content_headers)
[docs] @json_response
def status(self, process_id: int, process_version: int) -> dict:
"""Check the status of a workflow execution instance.
Args:
process_id (int): The process id.
process_version (int): The process revision.
Returns:
A :obj:`dict` representing the parsed JSON response.
Raises:
SlipoException: If a network or server error has occurred.
"""
endpoint = API_STATUS.format(
api_version=API_VERSION,
id=process_id,
version=process_version,
)
url = urljoin(self.base_url, endpoint)
return requests.get(url, headers=self.content_headers)
[docs] @file_response('target')
def download(self, process_id: int, process_version: int, file_id: int, target: str) -> None:
"""Download an input or output file for a specific workflow execution instance.
During the execution of a workflow the following file types may be created:
- ``CONFIGURATION``: Tool configuration
- ``INPUT``: Input file
- ``OUTPUT``: Output file
- ``SAMPLE``: Sample data collected during step execution
- ``KPI``: Tool specific or aggregated KPI data
- ``QA``: Tool specific QA data
- ``LOG``: Logs recorded during step execution
Args:
process_id (int): The process id.
process_version (int): The process revision.
file_id (int): The file id.
target (str): The path where to save the file.
Raises:
SlipoException: If a network, server error or I/O error has occurred.
"""
endpoint = API_DOWNLOAD.format(
api_version=API_VERSION,
id=process_id,
version=process_version,
fileId=file_id,
)
url = urljoin(self.base_url, endpoint)
return requests.get(url, headers=self.auth_headers)