add session api endpoints

This commit is contained in:
rmanach 2024-12-17 16:41:46 +01:00
parent 3f4557e547
commit 6ce9a2cba5
5 changed files with 422 additions and 1 deletions

View File

@ -0,0 +1,5 @@
WHEREIS_API_BASE_URL=https://api-whereis.thegux.fr
WHEREIS_API_EMAIL=
WHEREIS_API_PASSWORD=
WHEREIS_CERT_VERIFY=true

72
main.py
View File

@ -1,7 +1,9 @@
import logging import logging
import sys import sys
import json
from src import VERSION from src import VERSION, Client, OrderField
from src.exceptions import WhereIsException, UnauthorizedException
stdout_handler = logging.StreamHandler(stream=sys.stdout) stdout_handler = logging.StreamHandler(stream=sys.stdout)
@ -12,5 +14,73 @@ logging.basicConfig(
) )
if __name__ == "__main__": if __name__ == "__main__":
"""
This a sample script to demonstrate how works WhereIs API client.
All you need is to provide some environments variables describe in the `.env.example`.
You can:
- Copy `.env.example` into `.env` and feed env variables or,
- Directly export those mandatories variables
Once its done, you can initialize the Client:
cli = Client.from_env()
and use all the available methods to interact with the WhereIs API.
Sessions:
- create_session
- get_sessions
- get_session
- update_session
- delete_session
"""
logging.info(f"WhereIs client v{VERSION}") logging.info(f"WhereIs client v{VERSION}")
# initialize the client
try:
cli = Client.from_env()
except WhereIsException as e:
logging.error(f"unable to initialize WhereIs API client: {e}")
except Exception as e:
logging.error("unexpected error occurred while initializing client", exc_info=True)
# retrieve all user/public sessions
sessions = cli.get_sessions()
print(json.dumps(sessions, indent=2))
# create and update a session (must have `Streamer` role)
try:
session = cli.create_session(name="RUN-01")
session = cli.update_session(session.get("id"), name="RUN-02", description="My second run")
except WhereIsException as e:
logging.error(f"error occurred while creating/updating a session, status code: {e.error_code}")
print(json.dumps(e.content, indent=2))
# retrieve all user/public sessions with ordering
sessions = cli.get_sessions(ordering=OrderField.AscDateStart)
print(json.dumps(sessions, indent=2))
# retrieve all user/public sessions with search
sessions = cli.get_sessions(search="run", ordering=OrderField.AscDateStart)
print(json.dumps(sessions, indent=2))
# retrieve session by id
session = cli.get_session(session.get("id"))
print(json.dumps(session, indent=2))
# update session users
try:
session = cli.update_session_users(session.get("id"), [])
except WhereIsException as e:
logging.error(f"error occurred while updating users session, status code: {e.error_code}")
print(json.dumps(e.content, indent=2))
# close a session
try:
session = cli.close_session("fqsfsdqf")
except WhereIsException as e:
logging.error(f"error occurred while updating users session, status code: {e.error_code}")
print(json.dumps(e.content, indent=2))
# delete a session
cli.delete_session(session.get("id"))

View File

@ -1 +1,5 @@
from .client import Client, OrderField
__all__ = ["Client", "OrderField"]
VERSION = "0.1.0" VERSION = "0.1.0"

321
src/client.py Normal file
View File

@ -0,0 +1,321 @@
import json
import logging
import os
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from urllib.parse import urljoin
from uuid import UUID
import urllib3
from dotenv import dotenv_values
from requests import Session
from .exceptions import UnauthorizedException, WhereIsException
API_DEFAULT_URL = "https://api-whereis.thegux.fr"
__all__ = ["Client", "OrderField"]
def refresh():
"""
Catch 401 status code (UnauthorizedException)
and refresh the access token and retry.
"""
def decorator(func):
def wrapper(*args, **kwargs):
if len(args) > 0 and isinstance(args[0], Client):
try:
return func(*args, **kwargs)
except UnauthorizedException:
logging.warning("refresh access token...")
args[0]._refresh()
return func(*args, **kwargs)
return wrapper
return decorator
class OrderField(Enum):
"""
Ordering query param available fields.
- AscDateStart: date start ascending order
- DesDateStart: date start descending order
- AscDateEnd: date end ascending order
- DesDateEnd: date end descending order
"""
AscDateStart = "date_start"
DesDateStart = "-date_start"
AscDateEnd = "date_end"
DesDateEnd = "-date_end"
@dataclass(slots=True)
class Client:
"""
WhereIs API Client main class.
Example:
--------
cli = Client.from_env()
sessions = cli.get_sessions()
"""
base_url: str
email: str
password: str
session: Session = field(init=False)
def _login(self) -> WhereIsException | None:
"""Get the access token and store it in the `Session` header"""
data = {
"email": self.email,
"password": self.password,
}
login_url = urljoin(self.base_url, "/auth/token/")
logging.info(f"login: {login_url}")
res = self.session.post(login_url, json=data)
if res.status_code >= 400:
raise WhereIsException(login_url, res)
access_token = res.json()["access"]
self.session.headers.update({"Authorization": f"Bearer {access_token}"})
return None
def _refresh(self) -> WhereIsException | None:
"""Refresh the access token and store it in the `Session` header"""
refresh_url = urljoin(self.base_url, "/auth/refresh/")
logging.info(f"refresh: {refresh_url}")
res = self.session.post(refresh_url)
if res.status_code >= 400:
raise WhereIsException(refresh_url, res)
access_token = res.json()["access"]
self.session.headers.update({"Authorization": f"Bearer {access_token}"})
return None
@classmethod
def from_env(cls) -> "Client":
"""
Initialize the client from env variables (.env & global) and
log in into the application.
If the login fails, and exception is raised.
"""
env_data = {
"WHEREIS_API_EMAIL": os.getenv("WHEREIS_API_EMAIL", ""),
"WHEREIS_API_PASSWORD": os.getenv("WHEREIS_API_PASSWORD", ""),
"WHEREIS_API_BASE_URL": os.getenv("WHEREIS_API_BASE_URL", API_DEFAULT_URL),
}
dotenv_data = dotenv_values()
env_data.update(dotenv_data) # type: ignore
cli = Client(
env_data.get("WHEREIS_API_BASE_URL", ""),
env_data.get("WHEREIS_API_EMAIL", ""),
env_data.get("WHEREIS_API_PASSWORD", ""),
)
cli.session = Session()
cli.session.headers.update({"content-type": "application/json"})
is_cert_verify = env_data.get("WHEREIS_CERT_VERIFY", "") != "false"
if not is_cert_verify:
urllib3.disable_warnings()
cli.session.verify = is_cert_verify
cli._login()
logging.info(f"client successfully initialized for session: {cli.email}")
return cli
@refresh()
def _get_sessions_page(self, url: str) -> dict[str, Any] | WhereIsException:
"""Get paginate sessions."""
res = self.session.get(url)
if res.status_code == 401:
raise UnauthorizedException()
if res.status_code >= 400:
print(json.dumps(res.json(), indent=2))
if res.status_code >= 400:
raise WhereIsException(url, res)
return res.json()
def get_sessions(
self, search: str | None = None, ordering: OrderField | None = None
) -> list[dict[str, Any]] | WhereIsException:
"""
Get all user and public sessions.
If an error occurred duning the API call an exception is raised.
Params:
-------
search: str, search sessions over username and description
ordering: OrderField, ordering sessions by dates
"""
sessions_url = urljoin(self.base_url, "/sessions/")
has_query_param = False
if search is not None:
if not has_query_param:
sessions_url += "?"
has_query_param = True
sessions_url += f"search={search}"
if ordering is not None:
if not has_query_param:
sessions_url += "?"
else:
sessions_url += "&"
sessions_url += f"ordering={ordering.value}"
logging.info(f"get sessions: {sessions_url}")
lst_sessions = []
while sessions_url is not None:
data = self._get_sessions_page(sessions_url)
sessions_url = data.get("next")
lst_sessions.extend(data.get("results", []))
return lst_sessions
@refresh()
def get_session(self, id_: UUID) -> list[dict[str, Any]] | WhereIsException:
session_url = urljoin(self.base_url, f"/sessions/{id_}/")
logging.info(f"get session: {session_url}")
res = self.session.get(session_url)
if res.status_code == 401:
raise UnauthorizedException()
if res.status_code >= 400:
raise WhereIsException(session_url, res)
return res.json()
@refresh()
def create_session(
self, name: str, description: str | None = None, is_public: bool = False
) -> dict[str, Any] | WhereIsException:
sessions_url = urljoin(self.base_url, "/sessions/")
logging.info(f"create session: {sessions_url}")
data = {"name": name, "description": description, "is_public": is_public}
if name is None:
data.pop("name")
if description is None:
data.pop("description")
if is_public is None:
data.pop("is_public")
res = self.session.post(
sessions_url,
json=data,
)
if res.status_code == 401:
raise UnauthorizedException()
if res.status_code >= 400:
raise WhereIsException(sessions_url, res)
return res.json()
@refresh()
def update_session(
self,
id_: UUID,
name: str | None = None,
description: str | None = None,
is_public: bool | None = None,
) -> dict[str, Any] | WhereIsException:
session_url = urljoin(self.base_url, f"/sessions/{id_}/")
logging.info(f"update session: {session_url}")
data = {"name": name, "description": description, "is_public": is_public}
if name is None:
data.pop("name")
if description is None:
data.pop("description")
if is_public is None:
data.pop("is_public")
res = self.session.patch(session_url, json=data)
if res.status_code == 401:
raise UnauthorizedException()
if res.status_code >= 400:
raise WhereIsException(session_url, res)
return res.json()
@refresh()
def delete_session(self, id_: UUID) -> None | WhereIsException:
"""
Close and delete the session. Users can't be added anymore.
NOTE: The GPS positions associated to the session are not deleted !
"""
session_url = urljoin(self.base_url, f"/sessions/{id_}/")
logging.info(f"delete session: {session_url}")
res = self.session.delete(session_url)
if res.status_code == 401:
raise UnauthorizedException()
if res.status_code >= 400:
raise WhereIsException(session_url, res)
return None
@refresh()
def update_session_users(
self, id_: UUID, users: list[UUID]
) -> dict[str, Any] | WhereIsException:
"""
Update users sessions.
WARN: An empty users list parameter cleans all users.
"""
session_url = urljoin(self.base_url, f"/sessions/{id_}/users/")
logging.info(f"update session users: {session_url}")
res = self.session.post(session_url, json={"users": users})
if res.status_code == 401:
raise UnauthorizedException()
if res.status_code >= 400:
raise WhereIsException(session_url, res)
return res.json()
@refresh()
def close_session(self, id_: UUID) -> dict[str, Any] | WhereIsException:
session_url = urljoin(self.base_url, f"/sessions/{id_}/close/")
logging.info(f"close session: {session_url}")
res = self.session.post(session_url)
if res.status_code == 401:
raise UnauthorizedException()
if res.status_code >= 400:
raise WhereIsException(session_url, res)
return res.json()

21
src/exceptions.py Normal file
View File

@ -0,0 +1,21 @@
from requests import Response
class WhereIsException(Exception):
"""Handle all WhereIs API errors."""
def __init__(self, url: str, response: Response):
self.url = url
try:
self.content = response.json()
except Exception:
self.content = response.content.decode()
super().__init__(self.content)
self.error_code = response.status_code
def __str__(self):
return f"error calling: {self.url} - {self.error_code} - {self.content}"
class UnauthorizedException(Exception):
pass