add ws token api

This commit is contained in:
rmanach 2024-12-18 10:22:28 +01:00
parent 557a544d47
commit a99621c42b
3 changed files with 78 additions and 4 deletions

29
main.py
View File

@ -16,7 +16,7 @@ logging.basicConfig(
if __name__ == "__main__":
"""
This a sample script to demonstrate how works WhereIs API client.
This a sample script to demonstrate how to deal with WhereIs API client.
All you need is to provide some environments variables describe in the `.env.example`.
You can:
@ -28,6 +28,7 @@ if __name__ == "__main__":
cli = Client.from_env()
and use all the available methods to interact with the WhereIs API.
Sessions:
- create_session
- get_sessions
@ -38,6 +39,10 @@ if __name__ == "__main__":
- close_session
- watch_session_events
- stop_watch_session
WS token:
- get_wstokens
- create_wstoken
- delete_wstoken
"""
logging.info(f"WhereIs client v{VERSION}")
@ -49,6 +54,22 @@ if __name__ == "__main__":
except Exception as e:
logging.error("unexpected error occurred while initializing client", exc_info=True)
# get ws tokens
tokens = cli.get_wstokens()
print(json.dumps(tokens, indent=2))
# delete ws token
if len(tokens) > 0:
token = cli.delete_wstoken(tokens[0].get("id"))
print(json.dumps(tokens, indent=2))
# create ws token
try:
token = cli.create_wstoken()
except WhereIsException as e:
logging.error(f"error occurred while creating a ws token, status code: {e.error_code}")
print(json.dumps(e.content, indent=2))
# retrieve all user/public sessions
sessions = cli.get_sessions()
print(json.dumps(sessions, indent=2))
@ -84,14 +105,14 @@ if __name__ == "__main__":
try:
session = cli.close_session("does-not-exist")
except WhereIsException as e:
logging.error(f"error occurred while updating users session, status code: {e.error_code}")
logging.error(f"error occurred while closing session, status code: {e.error_code}")
print(json.dumps(e.content, indent=2))
# get session events
cli.watch_session_events(session.get("id"))
# doing your stuff...
time.sleep(5)
time.sleep(2)
# close the session
session = cli.close_session(session.get("id"))
@ -100,4 +121,4 @@ if __name__ == "__main__":
cli.delete_session(session.get("id"))
# stop session events watcher
cli.stop_watch_session(session.get("id"))
cli.stop_watch_session(session.get("id"), force=True)

View File

@ -437,3 +437,54 @@ class Client:
sw.stop(force)
del self.sessions_watcher[id_]
logging.info(f"session events (id: {id_}) watcher stopped")
@refresh()
def get_wstokens(self) -> list[dict[str, Any]] | WhereIsException:
wstoken_url = urljoin(self.base_url, "/auth/ws-token/")
logging.info(f"get ws token: {wstoken_url}")
res = self.session.get(wstoken_url)
if res.status_code == 401:
raise UnauthorizedException()
if res.status_code >= 400:
raise WhereIsException(wstoken_url, res)
return res.json()
@refresh()
def create_wstoken(self) -> dict[str, Any] | WhereIsException:
"""
Create a websocket JWT to authenticate your real time connection.
NOTE: only one, and only one ws token per user is allowed.
If it expired, delete it and create a new one.
"""
wstoken_url = urljoin(self.base_url, "/auth/ws-token/")
logging.info(f"create ws token: {wstoken_url}")
res = self.session.post(wstoken_url)
if res.status_code == 401:
raise UnauthorizedException()
if res.status_code >= 400:
raise WhereIsException(wstoken_url, res)
return res.json()
@refresh()
def delete_wstoken(self, id_: UUID) -> None | WhereIsException:
wstoken_url = urljoin(self.base_url, f"/auth/ws-token/{id_}/")
logging.info(f"delete ws token: {wstoken_url}")
res = self.session.delete(wstoken_url)
if res.status_code == 401:
raise UnauthorizedException()
if res.status_code >= 400:
raise WhereIsException(wstoken_url, res)
return None

View File

@ -1,5 +1,7 @@
from requests import Response
__all__ = ["WhereIsException"]
class WhereIsException(Exception):
"""Handle all WhereIs API errors."""