Source code for glpilib2.basic_wrapper

#!/usr/bin/env python3
import inspect
import json
import logging
import re

from dataclasses import dataclass
from enum import Enum
from json import JSONDecodeError
from typing import Dict, Optional, Any, List, Tuple, Union, IO

import requests

logging.basicConfig(
    level=logging.WARNING, format="%(asctime)s: %(name)s: %(levelname)s: %(message)s"
)
logger = logging.getLogger("glpi_wrapper")

JSON = Dict[str, Any]


[docs] class SortOrder(Enum): """Handy :class:`~enum.Enum` for ordering queries. Attributes ---------- Ascending Desceding """ Ascending = "ASC" Descending = "DESC" def __str__(self): return self.value
[docs] @dataclass class ResponseRange: """Range of a query. Some API methods produce a response in the header that describe the range of the query. That data is collected and wrapped in this type of object. To obtain it use the :meth:`~RequestHandler.response_range` property of :class:`RequestHandler`. Attributes ---------- start: int The query was retrieved begining on the `start` number. end: int The query was retrieved with no elements after `end` number. count: int Number of items returned. max: int Maximum possible number of items for this item type. """ start: int end: int count: int max: int def __repr__(self): return f"{str(self.start)}-{str(self.start)}/{self.count} Max: {self.max}"
[docs] class GLPIError(Exception): """Base glpilib2 exception""" pass
[docs] class GLPIRequestError(GLPIError): """Request error. In some circumstances GLPI doesn't have a specific error message for a method. This exception is raised in those cases with details about how the request was made and the response that was received. Attributes ---------- error_code: int The HTTP response code error_message: str The error message returned by the API (might be empty) request_headers: dict payload: str | bytes url: str The final response URL method: str The HTTP method of the request response: requests.Response The actual response object is provided for further debugging """ def __init__(self, response: requests.Response, *args): self.error_code = response.status_code self.error_message = response.text self.request_headers = response.request.headers self.payload = response.request.body self.url = response.url self.method = response.request.method self.response = response self.args = args def __repr__(self): url = ".../" + self.url.split("/")[1] msg = f"GLPIError({url=}, method={self.method}, code={self.error_code})=" error = self.error_message[: (80 - len(msg))] return msg + error def __str__(self): url = ".../" + self.url.split("/")[1] msg = f"GLPIError({url=}, method={self.method}, code={self.error_code})=\n" error = self.error_message return msg + error
def add_criteria_to_parameters(criteria, parameters: list, father="criteria"): if isinstance(criteria, dict): for key, value in criteria.items(): path = f"{father}[{key}]" if isinstance(value, list): add_criteria_to_parameters(value, parameters, path) else: parameters.append((path, value)) elif isinstance(criteria, list): for i in range(len(criteria)): add_criteria_to_parameters(criteria[i], parameters, f"{father}[{i}]") else: raise NotImplementedError( f"__add_criteria cannot handle objects of type {type(criteria)}" )
[docs] class RequestHandler: """RequestHandler encapsulates the GLPI API in a handy class. Parameters ---------- host_url: str The URL to the GLPI instance. app_token: str The application token. user_api_token: str The user api token. verify_tls: bool, default True If your glpi server is using TLS with a bad certificate, you will need to set this to false. Methods ------- add_items: Create new items (hardware, software, tickets etc.) change_active_entity: Change active entity change_active_profile: Change the active profile of the current user delete_items: Delete a list of items by id download_document: Return a document as `bytes` download_user_profile_picture: Return a profile picture as `bytes` get_active_entities: Return active entities of current logged user get_active_profile: Return the active profile of the current user get_full_session: Return the current PHP session variable get_glpi_config: Return the current `GLPI_CONFIG` variable get_item: Return single item (hardware, software, ticket etc.) get_many_items: Return a list of items get_my_entities: Return all the entities of the current logged user. get_my_profiles: Return all the profiles associated to the current logged user. get_search_options: List the search options for an item type get_sub_items: Return subitems of an item init_session: Tries to log into glpi kill_session: Ends the session associated with this `RequestHandler` response_range: Returns the `ResponseRange` of the previous API call search_items: Search items according to some criteria session_token: Returns the current session token update_items: Update the attributes of several items upload_document: Uploads a document to GLPI Notes ----- Most methods require an active session to perform. If unsure call :meth:`~RequestHandler.init_session` after the instantiation. `host_url`, `app_token` and `user_api_token` are further described in the README.md. Examples -------- >>> handler = RequestHandler('localhost', '123456', '654321') >>> handler.init_session() >>> handler.get_my_profiles() { 'name': "Super-admin", 'entities': [ ... ] } >>> handler.kill_session() """ @property def response_range(self) -> ResponseRange: """Returns the `ResponseRange` of the previous API call Set when methods that return multiple items are called. """ if self.__response_header is None: raise GLPIError("No request made") elif ( "Content-Range" not in self.__response_header or "Accept-Range" not in self.__response_header ): raise GLPIError("The previous request did not return a range") else: content_range = self.__response_header["Content-Range"] match = re.match( r"(?P<start>\d+)-(?P<end>\d+)/(?P<count>\d+)", content_range ) s = self.__response_header["Accept-Range"].strip().split()[1] accept_range = int(s) return ResponseRange( int(match.group("start")), int(match.group("end")), int(match.group("count")), accept_range, ) @property def session_token(self) -> str: """Returns the current session token Session token is an identification token used for the current connection to the API. """ if self.__session_token is None: raise GLPIError( "Request handler was not initiated! Please call init_session" " if you want to start a new session." ) return self.__session_token def __init__( self, host_url: str, app_token: str, user_api_token: str, verify_tls: bool = True, ): """Creates a new RequestHandler instance. Notes ----- This method, doesn't call init_session(). See The class documentation for more information. """ self.host_url = host_url self.app_token = app_token self.user_api_token = user_api_token self.__session_token = None self.verify_tls = verify_tls self.__session = None self.__response_header = None def _header_dict(self, extra: Dict[str, str]) -> Dict[str, str]: d = { "Content-Type": "application/json", "App-Token": self.app_token, } d.update(extra) return d
[docs] def init_session(self): """Request a :meth:`~RequestHandler.session_token` to be used by the other methods. """ if self.__session_token is not None: raise GLPIError("Session already initialized.") auth = f"user_token {self.user_api_token}" r = self._do_get("initSession", {"Authorization": auth}) self.__session_token = r.json()["session_token"] logger.info("Session initiaded")
def _get_method_url(self, request_type: str) -> str: return f"{self.host_url}/apirest.php/{request_type}" def _do_get( self, action: str, header: Dict[str, str], parameters: Union[Dict[str, Any], List[Tuple[str, Any]]] = None, data: Union[Dict[str, Any], List[Tuple[str, Any]]] = None, ) -> requests.Response: url = self._get_method_url(action) headers = self._header_dict(header) logger.debug(f"Calling method {action}") if self.__session is None: self.__session = requests.Session() response = self.__session.get( url, headers=headers, verify=self.verify_tls, params=parameters, data=data ) self.__response_header = response.headers if response.status_code >= 400: raise GLPIRequestError(response) return response def _get_json( self, method: str, parameters: Union[JSON, List[Tuple[str, Any]]] = None, data: Union[JSON, List[Tuple[str, Any]]] = None, ) -> Union[JSON, List[JSON]]: response = self._do_get( method, {"Session-Token": self.session_token}, parameters, data ) try: return response.json() except JSONDecodeError: if len(response.text.strip()) == 0: message = "GLPI produced a blank response." else: message = f"Expected a JSON got a {response.text}" raise GLPIRequestError(response, message) @staticmethod def __keys_to_int(dict_: Dict): for k, v in list(dict_.items()): try: i = int(k) del dict_[k] dict_[i] = v except ValueError: pass def __get_request_parameters(self, rename: Dict[str, str] = None): """Build request parameters as list of tuples out of the optional values of the calling method. Parameters ---------- rename Renames the resulting parameter using the items of this dict. """ if rename is None: rename = {} currentframe = inspect.currentframe() try: frame = inspect.getouterframes(currentframe)[1] try: method = getattr(self, frame.function) sig = inspect.signature(method) request_parameters = [] for parameter in sig.parameters.values(): parameter_name = parameter.name parameter_value = frame.frame.f_locals[parameter_name] if parameter_name in rename: parameter_name = rename[parameter_name] if ( parameter.default is not inspect.Parameter.empty and parameter_value != parameter.default ): if ( type(parameter_value) == list or type(parameter_value) == tuple ): for item in parameter_value: request_parameters.append((parameter_name + "[]", item)) else: request_parameters.append((parameter_name, parameter_value)) return request_parameters finally: del frame finally: del currentframe
[docs] def kill_session(self, session_id: Optional[str] = None): """Destroy a session identified by a :meth:`~RequestHandler.session_token`. Defaults to the current open session.""" if session_id is None: if self.session_token is None: raise GLPIError( "Request handler was not initiated, nothing to be done." ) else: session_id = self.session_token self.__session_token = None try: self._do_get("killSession", {"Session-Token": session_id}) except GLPIRequestError as err: if err.error_code == 401: try: message = err.response.json() except JSONDecodeError: raise err from None if len(message) > 0 and message[0] == "ERROR_SESSION_TOKEN_INVALID": raise GLPIError("Session expired") from err else: raise err from None else: raise logger.info("Session was terminated successfully.")
def __enter__(self): self.init_session() return self def __exit__(self, exc_type, exc_val, exc_tb): # No great advantage treating exceptions in here self.kill_session() return False # Do raise any exception found within the `with` body
[docs] def get_my_profiles(self) -> List[JSON]: """Return all the profiles associated to the current logged user. Examples -------- >>> handler.get_my_profiles() [ { 'id': 1 'name': "Super-admin", 'entities': [ ... ]... },... ] """ return self._get_json("getMyProfiles")["myprofiles"]
[docs] def get_active_profile(self) -> JSON: """Return the current active profile. Examples -------- >>> handler.get_active_profile() { 'name': "Super-admin", 'entities': [ ... ] } """ return self._get_json("getActiveProfile")["active_profile"]
[docs] def change_active_profile(self, profile_id: int) -> None: """Change active profile to the one indicated by `profile_id`. Use :meth:`~RequestHandler.get_my_profiles` to obtain the possible profiles. Raises ------ GLPIError When the profile can't be found. """ r = self._do_post( "changeActiveProfile", {"profiles_id": profile_id}, on_error_raise=False ) if r.status_code == 404: raise GLPIError("Profile not found")
def _do_method( self, method: str, api_method_url: str, data: Union[JSON, List[Tuple[str, Any]]] = None, headers: Dict[str, str] = None, files=None, on_error_raise=True, ) -> requests.Response: if headers is None: headers = {} headers["Session-Token"] = self.session_token url = self._get_method_url(api_method_url) headers = self._header_dict(headers) if self.__session is None: self.__session = requests.Session() logger.debug( f"Calling method {method} on {api_method_url} with {data=} and {headers=}" ) response = getattr(self.__session, method)( url, headers=headers, verify=self.verify_tls, json=data, files=files ) if on_error_raise: if response.status_code >= 400: raise GLPIRequestError(response) return response def _do_post( self, action: str, data: Union[JSON, List[Tuple[str, Any]]], headers: Dict[str, str] = None, on_error_raise=True, files=None, ) -> requests.Response: return self._do_method( method="post", api_method_url=action, data=data, headers=headers, on_error_raise=on_error_raise, files=files, )
[docs] def get_my_entities(self, recursive: bool = False) -> List[JSON]: """Return all the entities of the current logged user. Also returns entities related to the current active profile. Examples -------- >>> handler.get_my_entities() [ { 'id': 71 'name': "my_entity" }, ... ] """ return self._get_json( "getMyEntities", parameters={"is_recursive": str(recursive).lower()} )["myentities"]
[docs] def get_active_entities(self) -> JSON: """Return active entities of current logged user. Examples -------- >>> handler.get_active_entities() { 'id': 1, 'active_entity_recursive': true, 'active_entities': [ {"id":1}, {"id":71},... ] } """ return self._get_json("getActiveEntities")["active_entity"]
[docs] def change_active_entity(self, entity_id: int): """Change active entity to `entity_id`. Use :meth:`~RequestHandler.get_my_entities` method to know that are the viable entities. Warnings -------- Due to a bug with GLPI 9.5.3 if the API cannot find a valid entity with that id it will just report a "Bad Request". """ r = self._do_post( "changeActiveEntities", {"entities_id": entity_id}, on_error_raise=False ) if r.status_code == 400: raise GLPIError(r.json()[1])
[docs] def get_full_session(self) -> JSON: """Return the current `php`'s `$_SESSION`. Examples -------- >>> handler.get_full_session() { 'valid_id': ..., 'glpi_currenttime': ..., 'glpi_use_mode': ..., ... } """ return self._get_json("getFullSession")["session"]
[docs] def get_glpi_config(self) -> JSON: """Return the current `$CFG_GLPI`. Examples -------- >>> handler.get_glpi_config() { 'languages': ..., 'glpitables': ..., 'unicity_types':..., ... } """ return self._get_json("getGlpiConfig")["cfg_glpi"]
[docs] def get_item( self, item_type: str, id_: int, expand_dropdowns: bool = False, get_hateoas: bool = True, get_sha1: bool = False, with_devices: bool = False, with_disks: bool = False, with_softwares: bool = False, with_connections: bool = False, with_networkports: bool = False, with_infocoms: bool = False, with_contracts: bool = False, with_documents: bool = False, with_tickets: bool = False, with_problems: bool = False, with_changes: bool = False, with_notes: bool = False, with_logs: bool = False, add_key_names: List[str] = None, ) -> JSON: """Return an instance of `item_type` identified by `id` and its associated fields. Documents and User pictures are retrieved with their respective methods. Parameters ---------- item_type: str Type of the item. Eg: 'Computer', 'Ticket', 'Software'... id_ : int Unique identifier of the `itemtype`. expand_dropdowns : bool, default False Show dropdown name instead of `id`. get_hateoas : bool, default True Show relations of the item in a `links` attribute. See: https://en.wikipedia.org/wiki/HATEOAS Can't be disabled due to a bug in the API. get_sha1 : bool, default False Get a sha1 signature instead of the full answer. with_devices : bool, default False Only for [Computer, NetworkEquipment, Peripheral, Phone, Printer], retrieve the associated components. with_disks : bool, default False Only for Computer, retrieve the associated file-systems. with_softwares : bool, default False Only for Computer, retrieve the associated software's installations. with_connections : bool, default False Only for Computer, retrieve the associated direct connections (like peripherals and printers). with_networkports : bool, default False Retrieve all network's connections and advanced network's information. with_infocoms : bool, default False Retrieve financial and administrative information. with_contracts : bool, default False Retrieve associated contracts. with_documents : bool, default False Retrieve associated external documents. with_tickets : bool, default False Retrieve associated ITIL tickets. with_problems : bool, default False Retrieve associated ITIL problems. with_changes : bool, default False Retrieve associated ITIL changes. with_notes : bool, default False Retrieve Notes. with_logs : bool, default False Retrieve item history. add_key_names : List[str], default None Retrieve friendly names of the keys "id" keys. Eg.: ['id', 'entities_id', 'groups_id_tech' ...] Examples -------- >>> handler.get_item(item_type='Computer',id_=71) { "id": 71, "entities_id": "Root Entity", "name": "adelaunay-ThinkPad-Edge-E320", "serial": "12345", ... } """ if not get_hateoas: get_hateoas = 0 request_parameters = self.__get_request_parameters( rename={"add_key_names": "add_keys_names"} ) try: return self._get_json(f"{item_type}/{id_}", parameters=request_parameters) except requests.HTTPError as err: raise GLPIError(f"{item_type} with id={id_} was not found") from err
[docs] def get_many_items( self, item_type, expand_dropdowns: bool = False, get_hateoas: bool = True, only_id: bool = False, range_: Tuple[int, int] = None, sort_by: str = None, order: SortOrder = None, filter_by: Dict[str, str] = None, is_deleted: bool = False, add_key_names: List[str] = None, ) -> List[JSON]: """Returns a set of items identified by `item_type`. Parameters ---------- item_type: str Type of the item. Eg: 'Computer', 'Ticket', 'Software'... expand_dropdowns: bool, default False Show dropdown name instead of `id`. get_hateoas : bool, default True Show relations of the item in a `links` attribute. See: https://en.wikipedia.org/wiki/HATEOAS Can't be disabled due to a bug in the API. only_id: bool, default False Only `id` and `links` are returned. range_: Tuple(int, int), default (0, 50) The start and end of the pagination. sort_by: str, default None Name of the field to sort by. order: SortOrder, default SortOrder.Ascending Sort the results according to the sort order of the `sort` field. filter_by: Dict[str, str], default None Filters to pass on the query. is_deleted: bool, default False Return deleted elements. add_key_names: List[str], default None Retrieve friendly names of the keys "id" keys. Eg.: ['id', 'entities_id', 'groups_id_tech' ...] Examples -------- >>> handler.get_many_items(item_type='Computer') [ { "id": 34, "entities_id": "Root Entity", "name": "glpi", ... },... ] """ if range_ is not None: range_ = "-".join(str(r) for r in range_) if is_deleted: is_deleted = 1 if not get_hateoas: get_hateoas = 0 request_parameters = self.__get_request_parameters( rename={ "range_": "range", "add_key_names": "add_keys_names", "sort_by": "sort", } ) if filter_by: for name in filter_by: request_parameters.append((f"searchText[{name}]", filter_by[name])) return self._get_json(f"{item_type}/", parameters=request_parameters)
[docs] def get_sub_items( self, item_type: str, item_id: int, sub_item_type: str, expand_dropdowns: bool = False, get_hateoas: bool = True, only_id: bool = False, range_: Tuple[int, int] = None, sort_by: str = None, order: SortOrder = None, add_key_names: List[str] = None, ) -> List[JSON]: """Return subitems of the `sub_item_type` for the identified `item_id`. Parameters ---------- item_type: str Type of the item. Eg: 'Computer', 'Ticket', 'Software'... item_id: int Unique identifier of the parent `item_type`. sub_item_type: str The type of subitems you are trying to retrieve. expand_dropdowns: bool, default False Show dropdown name instead of `id`. get_hateoas : bool, default True Show relations of the item in a `links` attribute. See: https://en.wikipedia.org/wiki/HATEOAS Can't be disabled due to a bug in the API. only_id: bool, default False Only `id` and `links` are returned. range_: Tuple(int, int), default (0, 50) The start and end of the pagination. sort_by: str, default None Name of the field to sort by. order: SortOrder, default SortOrder.Ascending Sort the results according to the sort order of the `sort` field. add_key_names: List[str], default None Retrieve friendly names of the keys "id" keys. Eg.: ['id', 'entities_id', 'groups_id_tech' ...] Examples -------- >>> handler.get_sub_items(item_type='User', item_id=2, sub_item_type='Logs') [ { "id": 22117, "itemtype": "User", ... }, ... ] """ if range_ is not None: range_ = "-".join(str(r) for r in range_) if not get_hateoas: get_hateoas = 0 request_parameters = self.__get_request_parameters( rename={ "range_": "range", "add_key_names": "add_keys_names", "sort_by": "sort", } ) return self._get_json( f"{item_type}/{item_id}/{sub_item_type}", parameters=request_parameters )
[docs] def get_search_options( self, item_type: str, raw: bool = False, pretty=False ) -> JSON: """List the search options of the provided `itemtype`. This method provides the options you need for self.search_items. Parameters ---------- item_type: str Type of the item. Eg: 'Computer', 'Ticket', 'Software'... raw: bool, default False Return `searchoption` uncleaned (as provided by `core`). pretty: bool, default False Will attempt to return the search option organized in a tree based on `uid`. Examples -------- >>> handler.get_search_items('Computer') { "common": "Characteristics", "1": { 'name': 'Name' 'table': 'glpi_computers' 'field': 'name' 'linkfield': 'name' 'datatype': 'itemlink' 'uid': 'Computer.name' },... } >>> handler.get_search_items('Computer', pretty=True) {'Computer': {'Appliance_Item': {'Appliance': {'ApplianceType': {'name': {'available_searchtypes': ['contains', ...], ... 'uid': 'Computer.Appliance_Item.Appliance.ApplianceType.name' },... },... },... },... } """ def recurse_parts( part_name: str, part_data: Dict[str, Any], rest: List[str], end, id_: int ): if part_name not in part_data: part_data[part_name] = {} if len(rest) == 0: part_data[part_name] = end end["id"] = id_ return name = rest.pop(0) recurse_parts(name, part_data[part_name], rest, end, id_) request_parameters = self.__get_request_parameters() json = self._get_json( f"listSearchOptions/{item_type}", parameters=request_parameters ) if not pretty: return json else: result = {} for k, v in json.items(): if k.isdecimal(): parts = v["uid"].split(".") head = parts.pop(0) recurse_parts(head, result, parts, v, int(k)) return result
[docs] def search_items( self, item_type: str, filters: List[Dict[str, Any]] = None, sort_by_id: int = None, order: SortOrder = None, range_: Tuple[int, int] = None, force_display: List[int] = None, raw_data: bool = False, with_indexes: bool = False, uid_cols: bool = False, give_items: bool = False, ) -> JSON: """Search items according to some criteria. Expose the GLPI searchEngine and combine filters to retrieve a list of elements of the specified `item_type`. Parameters ---------- item_type: str Type of the item. Eg: 'Computer', 'Ticket', 'Software'...[3] filters: List[Dict[str, Any]], default None A list of json-like objects the represents the query filters and their their relationships. Every filter object must have at least one `link` field (if it's not the first object). link: str A logical operator of AND, OR, AND NOT `Filter objects` keys are objects that narrow down the query. Their keys are: field: int The id of the `search_option`. meta: boolean is this criterion a meta one ? itemtype: str For meta=true criterion, define the `itemtype` to use. searchtype: str The comparison operation of the filter. It's one of, contains[1], equals[2], notequals[2], lessthan, morethan, under, notunder. value: The value that the `field` is compared against. `Sub-filter` objects can be seen as a pair of () separating a query its single unique key is: criteria: List[Dict[str, Any]] A list of `Filter objects` An example of `filters` parameter can be found in the examples section. sort_by_id: int, default None `id` of the `search_option` to sort by. order: SortOrder, default SortOrder.Ascending Sort the results according to the sort order of the `sort` field. range_: Tuple(int, int), default (0, 50) The start and end of the pagination. force_display: List[int], default None If set only the columns listed will be present in the query result. **Note**: The API documentation says that some columns will always be displayed, those are `{1: id, 2: name, 80: Entity}`. However that's not the observed behaviour. Only `{1: id}` seems to always be displayed. raw_data: bool, default False If set debug information about the query is returned in a `rawdata` field. The results contain the SQL, the search filters as interpreted by the `SearchEngine`, columns to retrive and many more attributes. with_indexes: bool, default False Instead of the resulting `data` field being a list of items found, it is a `dict` with `id` as key. Ordering cannot work with this argument. uid_cols: bool, default False Replaces the numeric `id` from the resulting `data` object with human readable names as identified by the `uid` field returned by `get_search_options`. give_items: bool, default False Returns a HTML link to the item on the portal in the first field for each returned item, inside a 'data_html' field in the returned object. Notes ----- 1. contains will use a wildcard search per default. You can restrict at the beginning using the `^` character, and/or at the end using the `$` character. 2. `equals` and `notequals` are designed to be used with dropdowns. Do not expect those operators to search for a strictly equal value (see 1. above). 3. You can use 'AllAssets' as the `item_type` to retrieve a combination of all asset's items, regardless of type. Examples -------- >>> handler.search_items("Monitor", range=(0,2)) { "totalcount": "2", "range": "0:2", "data": [ { "searchoptions_id": "value", ... },... ],... } With `give_items`. >>> handler.search_items("Monitor", give_items=True) { "data": [...], "data_html": [ { "1": "<a id='Monitor_1_1' href='/front/monitor.form.php?id=1'>monitor1</a>", "19": "2021-02-20 21:38", "23": "manufacturer1", ... }... ],... } The next example demonstrates the usage of fiters with a nested filter. >>> nested_filter = [ ... { ... "field": 34, ... "searchtype": "equals", ... "value": 1 ... }, { ... "link": "OR", ... "field": 35, ... "searchtype": "equals", ... "value": 1 ... } ... ] >>> filters = [ ... { ... "link": "AND', ... "field": 31, ... "searchtype": "equals", ... "value": 1 ... }, { ... "link": "AND", ... "meta": True, ... "itemtype": "User", ... "field": 1, ... "searchtype": "equals", ... "value": 1 ... }, { ... "link": "AND", ... "criteria" : nested_filter ... } ... ] >>> handler.search_items("Monitor", force_display=[1], filters=filters) { "totalcount": ..., "range": ..., "data": [ { "1": "W2242",... },... ],... } The above filter variable is roughly equivalent to the expression. >>> [m for m in Monitors if ( ... field[31] == 1 ... and User.field[1] == 1 ... and ( ... field[34] == 1 ... or field[35] == 1))] """ if range_ is not None: range_ = "-".join(str(r) for r in range_) criteria = filters if filters else [] filters = None request_parameters = self.__get_request_parameters( rename={ "sort_by_id": "sort", "range_": "range", "force_display": "forcedisplay", "raw_data": "rawdata", "with_indexes": "withindexes", "give_items": "giveItems", } ) add_criteria_to_parameters(criteria, request_parameters) json = self._get_json(f"search/{item_type}", parameters=request_parameters) if not with_indexes: for d in json.get("data", []): self.__keys_to_int(d) for d in json.get("data_html", []): self.__keys_to_int(d) return json
[docs] def add_items( self, item_type: str, data: Union[JSON, List[JSON]] ) -> Union[JSON, List[JSON]]: """Add one or several items Parameters ---------- item_type: str Type of the item. Eg: 'Computer', 'Ticket', 'Software'... data: Union[Dict, List[Dict]] A dict with fields of `itemtype` to be inserted. You can add several items in one action by passing a list of dict instead. Returns ------- Union[Dict, List[Dict]] In case a single object was passed will return a single dict, otherwise a list of dicts. In case of success the dicts might contain only the `id` of the inserted object, otherwise it will contain a `message` field with the error message. Examples -------- >>> handler.add_items("Software", {"name": "my software", "location": 1}) {'id': 4, 'message': 'Item Successfully Added: item name'} For several objects. >>> handler.add_items( ... "Software", ... [ ... {"name": "second software added", "location": 1}, ... {"name": "third software added", "uuid": "2313"}, ... ]) [ {"id":8, "message": ""}, {"id":false, "message": "You don't have permission to perform this action."}, {"id":9, "message": ""} ] Notes ----- So far I've been unable to trigger an error with add_items for a single item in a list. add_items fail silently whenever you provide an invalid attribute ex: an attribute that references an object that doesn't exist, a duplicated id field or even when you try to set a field that doesn't exist. """ response = self._do_post(f"{item_type}", data={"input": data}) return response.json()
[docs] def update_items(self, item_type: str, data: List[JSON]) -> List[JSON]: """Update the attributes of several items. Parameters ---------- item_type: str Type of the item. Eg: 'Computer', 'Ticket', 'Software'... data: List[Dict[str, Any]] A list of dict with fields of `item_type` to be updated. One of those fields must be `id`. Field names must be lowercase. Returns ------- List[Dict] Each field has the form `{'id': 2, 'message': ''}` Notes ----- As with the `add_items` method, this method is highly permissive and will not return an error if the item with that `id` doesn't exist, or the field is incorrect. In fact, it might create new objects in case the `id` doesn't exist. """ response = self._do_method("patch", f"{item_type}", data={"input": data}) return response.json()
[docs] def delete_items( self, item_type: str, ids: List[int], purge=False, log=True ) -> List[JSON]: """Delete a list of existing objects. Parameters ---------- item_type: str Type of the item. Eg: 'Computer', 'Ticket', 'Software'... ids: List[int] List of ids of the objects that will be deleted. purge: bool, default False If set the item will skip the trash bin (if applicable), being immediately deleted. log: bool, default True If set the deletion operation will be logged. Returns ------- List[Dict] Returns a list of objects in the form of `{'id': True, 'message': ''}` In case of failure the `id` field value will be false and `message` will be set. """ data = {"input": {"id": id_ for id_ in ids}} if purge: data["force_purge"] = True if not log: data["force_purge"] = True response = self._do_method("delete", f"{item_type}", data=data) return response.json()
[docs] def upload_document(self, file: IO, name: str = None, file_name: str = None): """Uploads a document to GLPI Parameters ---------- name: str, default None The human readable name. If unset it will be the same as `file_name`. file: IO A file like object file_name: str, default None If you want the file uploaded to have a name different from the `file` you can set this variable. Ex: 'my_filename.png' If unset it will try to obtain a name from the `file` to use as the `file_name`. """ manifest = json.dumps({"input": {"name": name, "_filename": [file_name]}}) url = self._get_method_url("Document/") headers = self._header_dict({"Session-Token": self.session_token}) if self.__session is None: self.__session = requests.Session() if file_name is None: file_name = file.name del headers["Content-Type"] r = self.__session.post( url, headers=headers, verify=self.verify_tls, files={"filename[0]": (file_name, file)}, data={"uploadManifest": manifest}, ) r.raise_for_status() return r.json()
[docs] def download_document(self, id_: int): """Return a Document identified by `id` as `bytes`""" response = self._do_method( "get", f"Document/{id_}", headers={"Accept": "application/octet-stream"}, on_error_raise=False, ) return response.content
[docs] def download_user_profile_picture(self, id_: int): """Return the profile picture of a `User` identified by `id` as `bytes`""" response = self._do_method( "get", f"User/{id_}/Picture", on_error_raise=False, ) if response.status_code == 204: raise GLPIError("User doesn't have a profile picture") return response.content