1111import requests .exceptions
1212from sys import version_info
1313
14+ from influxdb .line_protocol import make_lines
1415from influxdb .resultset import ResultSet
1516from .exceptions import InfluxDBClientError
1617from .exceptions import InfluxDBServerError
@@ -96,7 +97,8 @@ def __init__(self,
9697
9798 self ._headers = {
9899 'Content-type' : 'application/json' ,
99- 'Accept' : 'text/plain' }
100+ 'Accept' : 'text/plain'
101+ }
100102
101103 @staticmethod
102104 def from_DSN (dsn , ** kwargs ):
@@ -182,7 +184,7 @@ def switch_user(self, username, password):
182184 self ._password = password
183185
184186 def request (self , url , method = 'GET' , params = None , data = None ,
185- expected_response_code = 200 ):
187+ expected_response_code = 200 , headers = None ):
186188 """Make a HTTP request to the InfluxDB API.
187189
188190 :param url: the path of the HTTP request, e.g. write, query, etc.
@@ -203,17 +205,13 @@ def request(self, url, method='GET', params=None, data=None,
203205 """
204206 url = "{0}/{1}" .format (self ._baseurl , url )
205207
208+ if headers is None :
209+ headers = self ._headers
210+
206211 if params is None :
207212 params = {}
208213
209- auth = {
210- 'u' : self ._username ,
211- 'p' : self ._password
212- }
213-
214- params .update (auth )
215-
216- if data is not None and not isinstance (data , str ):
214+ if isinstance (data , (dict , list )):
217215 data = json .dumps (data )
218216
219217 # Try to send the request a maximum of three times. (see #103)
@@ -223,9 +221,10 @@ def request(self, url, method='GET', params=None, data=None,
223221 response = self ._session .request (
224222 method = method ,
225223 url = url ,
224+ auth = (self ._username , self ._password ),
226225 params = params ,
227226 data = data ,
228- headers = self . _headers ,
227+ headers = headers ,
229228 verify = self ._verify_ssl ,
230229 timeout = self ._timeout
231230 )
@@ -254,18 +253,24 @@ def write(self, data, params=None, expected_response_code=204):
254253 :returns: True, if the write operation is successful
255254 :rtype: bool
256255 """
256+
257+ headers = self ._headers
258+ headers ['Content-type' ] = 'application/octet-stream'
259+
257260 self .request (
258261 url = "write" ,
259262 method = 'POST' ,
260263 params = params ,
261- data = data ,
262- expected_response_code = expected_response_code
264+ data = make_lines (data ).encode ('utf-8' ),
265+ expected_response_code = expected_response_code ,
266+ headers = headers
263267 )
264268 return True
265269
266270 def query (self ,
267271 query ,
268272 params = {},
273+ epoch = None ,
269274 expected_response_code = 200 ,
270275 database = None ,
271276 raise_errors = True ):
@@ -294,6 +299,9 @@ def query(self,
294299 params ['q' ] = query
295300 params ['db' ] = database or self ._database
296301
302+ if epoch is not None :
303+ params ['epoch' ] = epoch
304+
297305 response = self .request (
298306 url = "query" ,
299307 method = 'GET' ,
@@ -391,22 +399,25 @@ def _write_points(self,
391399 'points' : points
392400 }
393401
394- if time_precision :
395- data ['precision ' ] = time_precision
402+ if tags is not None :
403+ data ['tags ' ] = tags
396404
397- if retention_policy :
398- data ['retentionPolicy' ] = retention_policy
405+ params = {
406+ 'db' : database or self ._database
407+ }
399408
400- if tags :
401- data [ 'tags ' ] = tags
409+ if time_precision is not None :
410+ params [ 'precision ' ] = time_precision
402411
403- data ['database' ] = database or self ._database
412+ if retention_policy is not None :
413+ params ['rp' ] = retention_policy
404414
405415 if self .use_udp :
406416 self .send_packet (data )
407417 else :
408418 self .write (
409419 data = data ,
420+ params = params ,
410421 expected_response_code = 204
411422 )
412423
@@ -578,15 +589,20 @@ def get_list_users(self):
578589 """
579590 return list (self .query ("SHOW USERS" ).get_points ())
580591
581- def create_user (self , username , password ):
592+ def create_user (self , username , password , admin = False ):
582593 """Create a new user in InfluxDB
583594
584595 :param username: the new username to create
585596 :type username: str
586597 :param password: the password for the new user
587598 :type password: str
599+ :param admin: whether the user should have cluster administration
600+ privileges or not
601+ :type admin: boolean
588602 """
589603 text = "CREATE USER {} WITH PASSWORD '{}'" .format (username , password )
604+ if admin :
605+ text += ' WITH ALL PRIVILEGES'
590606 self .query (text )
591607
592608 def drop_user (self , username ):
@@ -609,29 +625,27 @@ def set_user_password(self, username, password):
609625 text = "SET PASSWORD FOR {} = '{}'" .format (username , password )
610626 self .query (text )
611627
612- def delete_series (self , id , database = None ):
613- """Delete series from a database.
628+ def delete_series (self , database = None , measurement = None , tags = None ):
629+ """Delete series from a database. Series can be filtered by
630+ measurement and tags.
614631
615- :param id: the id of the series to be deleted
616- :type id: int
632+ :param measurement: Delete all series from a measurement
633+ :type id: string
634+ :param tags: Delete all series that match given tags
635+ :type id: dict
617636 :param database: the database from which the series should be
618637 deleted, defaults to client's current database
619638 :type database: str
620639 """
621640 database = database or self ._database
622- self .query ('DROP SERIES %s' % id , database = database )
623-
624- def grant_admin_privileges (self , username ):
625- """Grant cluster administration privileges to an user.
626-
627- :param username: the username to grant privileges to
628- :type username: str
641+ query_str = 'DROP SERIES'
642+ if measurement :
643+ query_str += ' FROM "{}"' .format (measurement )
629644
630- .. note:: Only a cluster administrator can create/ drop databases
631- and manage users.
632- """
633- text = "GRANT ALL PRIVILEGES TO {}" .format (username )
634- self .query (text )
645+ if tags :
646+ query_str += ' WHERE ' + ' and ' .join (["{}='{}'" .format (k , v )
647+ for k , v in tags .items ()])
648+ self .query (query_str , database = database )
635649
636650 def revoke_admin_privileges (self , username ):
637651 """Revoke cluster administration privileges from an user.
@@ -683,9 +697,8 @@ def send_packet(self, packet):
683697 :param packet: the packet to be sent
684698 :type packet: dict
685699 """
686- data = json .dumps (packet )
687- byte = data .encode ('utf-8' )
688- self .udp_socket .sendto (byte , (self ._host , self .udp_port ))
700+ data = make_lines (packet ).encode ('utf-8' )
701+ self .udp_socket .sendto (data , (self ._host , self .udp_port ))
689702
690703
691704class InfluxDBClusterClient (object ):
0 commit comments