Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.
93 changes: 61 additions & 32 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from influxdb.line_protocol import make_lines, quote_ident, quote_literal
from influxdb.resultset import ResultSet
from .exceptions import InfluxDBClientError
from .exceptions import InfluxDBServerError


class InfluxDBClient(object):
Expand Down Expand Up @@ -69,6 +68,10 @@ class InfluxDBClient(object):
as a single file containing the private key and the certificate, or as
a tuple of both files’ paths, defaults to None
:type cert: str
:param use_msgpack: A bool indicating to use msgpack to retrieve query
results from InfluxDB. If False, the fallback will be JSON. This flag
sets the Accept header of the request. Defaults to True
:type use_msgpack: bool

:raises ValueError: if cert is provided but ssl is disabled (set to False)
"""
Expand All @@ -89,6 +92,7 @@ def __init__(self,
pool_size=10,
path='',
cert=None,
use_msgpack=True
):
"""Construct a new InfluxDBClient object."""
self.__host = host
Expand All @@ -110,7 +114,9 @@ def __init__(self,
)

if use_udp:
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
else:
self._udp_socket = None

if not path:
self.__path = ''
Expand Down Expand Up @@ -145,10 +151,16 @@ def __init__(self,
self._port,
self._path)

self._headers = {
'Content-Type': 'application/json',
'Accept': 'application/x-msgpack'
}
if use_msgpack:
self._headers = {
'Content-Type': 'application/json',
'Accept': 'application/x-msgpack'
}
else:
self._headers = {
'Content-Type': 'application/json',
'Accept': 'text/plain'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why text/plain and not application/json ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would have to ask the original implementor - this is the way it was implemented before moving the default to msgpack.

}

@property
def _baseurl(self):
Expand Down Expand Up @@ -243,14 +255,14 @@ def request(self, url, method='GET', params=None, data=None,
:param method: the HTTP method for the request, defaults to GET
:type method: str
:param params: additional parameters for the request, defaults to None
:type params: dict
:type params: dict, optional
:param data: the data of the request, defaults to None
:type data: str
:type data: str, optional
:param expected_response_code: the expected response code of
the request, defaults to 200
:type expected_response_code: int
:param headers: headers to add to the request
:type headers: dict
:type headers: dict, optional
:returns: the response from the request
:rtype: :class:`requests.Response`
:raises InfluxDBServerError: if the response code is any server error
Expand Down Expand Up @@ -285,6 +297,7 @@ def request(self, url, method='GET', params=None, data=None,
verify=self._verify_ssl,
timeout=self._timeout
)
response._msgpack = None
break
except (requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
Expand All @@ -297,30 +310,39 @@ def request(self, url, method='GET', params=None, data=None,
if not retry:
raise

type_header = response.headers and response.headers.get("Content-Type")
if type_header == "application/x-msgpack" and response.content:
response._msgpack = msgpack.unpackb(
packed=response.content,
ext_hook=_msgpack_parse_hook,
raw=False)
else:
response._msgpack = None
if self._is_msg_pack_response(response):
if response.content:
response._msgpack = msgpack.unpackb(
packed=response.content,
ext_hook=_msgpack_parse_hook,
raw=False)

def reformat_error(response):
if response._msgpack:
return json.dumps(response._msgpack, separators=(',', ':'))
else:
return response.content

# if there's not an error, there must have been a successful response
if 500 <= response.status_code < 600:
raise InfluxDBServerError(reformat_error(response))
elif response.status_code == expected_response_code:
if response.status_code == expected_response_code:
return response
else:
err_msg = reformat_error(response)
err_msg = self._reformat_msgpack_error(response)
raise InfluxDBClientError(err_msg, response.status_code)

@staticmethod
def _is_msg_pack_response(response):
if response is None:
return False

if response.headers is None:
return False

if "Content-Type" not in response.headers:
return False

content_type = response.headers["Content-Type"]
return content_type == "application/x-msgpack"

def _reformat_msgpack_error(self, _response):
if _response._msgpack is not None:
return json.dumps(_response._msgpack, separators=(',', ':'))
else:
return _response.content

def write(self, data, params=None, expected_response_code=204,
protocol='json'):
"""Write data to InfluxDB.
Expand Down Expand Up @@ -697,7 +719,7 @@ def create_retention_policy(self, name, duration, replication,
The minimum retention period is 1 hour.
:type duration: str
:param replication: the replication of the retention policy
:type replication: str
:type replication: int
:param database: the database for which the retention policy is
created. Defaults to current client's database
:type database: str
Expand All @@ -717,7 +739,7 @@ def create_retention_policy(self, name, duration, replication,
"CREATE RETENTION POLICY {0} ON {1} " \
"DURATION {2} REPLICATION {3} SHARD DURATION {4}".format(
quote_ident(name), quote_ident(database or self._database),
duration, replication, shard_duration)
duration, int(replication), shard_duration)

if default is True:
query_string += " DEFAULT"
Expand Down Expand Up @@ -1071,7 +1093,7 @@ def drop_continuous_query(self, name, database=None):
self.query(query_string)

def send_packet(self, packet, protocol='json', time_precision=None):
"""Send an UDP packet.
"""Send an UDP packet. Only valid when use_udp is True.

:param packet: the packet to be sent
:type packet: (if protocol is 'json') dict
Expand All @@ -1081,11 +1103,18 @@ def send_packet(self, packet, protocol='json', time_precision=None):
:param time_precision: Either 's', 'm', 'ms' or 'u', defaults to None
:type time_precision: str
"""
if not self._use_udp:
raise RuntimeError("Unable to send packet : use_udp set to False")

if protocol == 'json':
data = make_lines(packet, time_precision).encode('utf-8')
elif protocol == 'line':
data = ('\n'.join(packet) + '\n').encode('utf-8')
self.udp_socket.sendto(data, (self._host, self._udp_port))
else:
raise InfluxDBClientError("Invalid protocol name : "
"expected json or line")

self._udp_socket.sendto(data, (self._host, self._udp_port))

def close(self):
"""Close http session."""
Expand Down
4 changes: 2 additions & 2 deletions influxdb/resultset.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ def __iter__(self):
yield list(self.__getitem__(key))

@staticmethod
def _tag_matches(tags, filter):
def _tag_matches(tags, _filter):
"""Check if all key/values in filter match in tags."""
for tag_name, tag_value in filter.items():
for tag_name, tag_value in _filter.items():
# using _sentinel as I'm not sure that "None"
# could be used, because it could be a valid
# series_tags value : when a series has no such tag
Expand Down
Loading