Skip to content

Commit f0a85a4

Browse files
committed
PYTHON-952 - Basic command monitoring
This commit adds support for publishing events when commands (ismaster, findAndModify, listCollections, etc.) start and succeed or fail. See the new module, pymongo.monitoring, for details. This commit only adds basic infrastructure and support for monitoring generic commands. Support for monitoring queries, getMores, killCursors, and write operations are forthcoming.
1 parent 04b1b8a commit f0a85a4

File tree

6 files changed

+493
-8
lines changed

6 files changed

+493
-8
lines changed

doc/api/pymongo/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Sub-modules:
3838
bulk
3939
errors
4040
message
41+
monitoring
4142
mongo_client
4243
mongo_replica_set_client
4344
operations

doc/api/pymongo/monitoring.rst

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
:mod:`monitoring` -- Tools for monitoring driver events.
2+
========================================================
3+
4+
.. automodule:: pymongo.monitoring
5+
:synopsis: Tools for monitoring driver events.
6+
7+
.. data:: COMMAND
8+
9+
The event type of user commands.
10+
.. autofunction:: subscribe(subscriber, events=COMMAND)
11+
.. autofunction:: get_subscribers(event=COMMAND)
12+
.. autoclass:: Subscriber
13+
:members:
14+
.. autoclass:: CommandStartedEvent
15+
:members:
16+
:inherited-members:
17+
.. autoclass:: CommandSucceededEvent
18+
:members:
19+
:inherited-members:
20+
.. autoclass:: CommandFailedEvent
21+
:members:
22+
:inherited-members:

pymongo/monitoring.py

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
# Copyright 2015 MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you
4+
# may not use this file except in compliance with the License. You
5+
# may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
# implied. See the License for the specific language governing
13+
# permissions and limitations under the License.
14+
15+
"""Tools to monitor driver events.
16+
17+
Use :func:`subscribe` to register subscribers for specific events. Only
18+
events of type :data:`COMMAND` are currently supported. Subscribers must be
19+
a subclass of :class:`Subscriber` and implement :meth:`~Subscriber.started`,
20+
:meth:`~Subscriber.succeeded`, and :meth:`~Subscriber.failed`.
21+
22+
For example, a simple logging subscriber might be implemented like this::
23+
24+
import logging
25+
26+
from pymongo import monitoring
27+
28+
class LoggingSubscriber(monitoring.Subscriber):
29+
30+
def started(self, event):
31+
logging.info("Command {0.command_name} with request id "
32+
"{0.request_id} started on server "
33+
"{0.connection_id}".format(event))
34+
35+
def succeeded(self, event):
36+
logging.info("Command {0.command_name} with request id "
37+
"{0.request_id} on server {0.connection_id} "
38+
"succeeded in {0.duration_micros} "
39+
"microseconds".format(event))
40+
41+
def failed(self, event):
42+
logging.info("Command {0.command_name} with request id "
43+
"{0.request_id} on server {0.connection_id} "
44+
"failed in {0.duration_micros} "
45+
"microseconds".format(event))
46+
47+
monitoring.subscribe(LoggingSubscriber(), monitoring.COMMAND)
48+
"""
49+
50+
import sys
51+
import traceback
52+
53+
_SUBSCRIBERS = []
54+
55+
COMMAND = 0
56+
57+
58+
class Subscriber(object):
59+
"""Abstract base class for all subscribers."""
60+
61+
def started(self, event):
62+
"""Abstract method to handle CommandStartedEvent.
63+
64+
:Parameters:
65+
- `event`: An instance of :class:`CommandStartedEvent`
66+
"""
67+
raise NotImplementedError
68+
69+
def succeeded(self, event):
70+
"""Abstract method to handle CommandSucceededEvent.
71+
72+
:Parameters:
73+
- `event`: An instance of :class:`CommandSucceededEvent`
74+
"""
75+
raise NotImplementedError
76+
77+
def failed(self, event):
78+
"""Abstract method to handle CommandFailedEvent.
79+
80+
:Parameters:
81+
- `event`: An instance of :class:`CommandFailedEvent`
82+
"""
83+
raise NotImplementedError
84+
85+
86+
def _to_micros(dur):
87+
"""Convert duration 'dur' to microseconds."""
88+
if hasattr(dur, 'total_seconds'):
89+
return int(dur.total_seconds() * 10e6)
90+
# Python 2.6
91+
return dur.microseconds + (dur.seconds + dur.days * 24 * 3600) * 10e6
92+
93+
94+
def _validate_events(events):
95+
"""Validate that 'event' is an int."""
96+
if not isinstance(events, int) or events != COMMAND:
97+
raise ValueError("only events of type monitoring.COMMAND "
98+
"are currently supported")
99+
100+
101+
def subscribe(subscriber, events=COMMAND):
102+
"""Register a subscriber for events.
103+
104+
This version of PyMongo only publishes events of type :data:`COMMAND`.
105+
106+
:Parameters:
107+
- `subscriber`: A subclass of abstract class :class:`Subscriber`.
108+
- `events`: Optional integer to set event subscriptions
109+
"""
110+
_validate_events(events)
111+
if not isinstance(subscriber, Subscriber):
112+
raise TypeError("subscriber must be a subclass "
113+
"of pymongo.monitoring.Subscriber")
114+
_SUBSCRIBERS.append(subscriber)
115+
116+
117+
def get_subscribers(event=COMMAND):
118+
"""Get the list of subscribers for `event`.
119+
120+
:Parameters:
121+
- `event`: Return subscribers for this event type.
122+
"""
123+
_validate_events(event)
124+
return _SUBSCRIBERS[:]
125+
126+
127+
def enabled():
128+
return bool(_SUBSCRIBERS)
129+
130+
131+
def _handle_exception():
132+
"""Print exceptions raised by subscribers to stderr."""
133+
# Heavily influenced by logging.Handler.handleError.
134+
135+
# See note here:
136+
# https://docs.python.org/3.4/library/sys.html#sys.__stderr__
137+
if sys.stderr:
138+
einfo = sys.exc_info()
139+
try:
140+
traceback.print_exception(einfo[0], einfo[1], einfo[2],
141+
None, sys.stderr)
142+
except IOError:
143+
pass
144+
finally:
145+
del einfo
146+
147+
148+
def publish_command_start(command, database_name, request_id, connection_id):
149+
"""Publish a CommandStartedEvent to all command event subscribers.
150+
151+
:Parameters:
152+
- `command`: The command document.
153+
- `database_name`: The name of the database this command was run against.
154+
- `request_id`: The request id for this operation.
155+
- `connection_id`: The address (host, port) of the server this command
156+
was sent to.
157+
"""
158+
event = CommandStartedEvent(
159+
command, database_name, request_id, connection_id)
160+
for subscriber in get_subscribers(COMMAND):
161+
try:
162+
subscriber.started(event)
163+
except Exception:
164+
_handle_exception()
165+
166+
167+
def publish_command_success(
168+
duration, reply, command_name, request_id, connection_id):
169+
"""Publish a CommandSucceededEvent to all command event subscribers.
170+
171+
:Parameters:
172+
- `duration`: The command duration as a datetime.timedelta.
173+
- `reply`: The server reply document.
174+
- `command_name`: The command name.
175+
- `request_id`: The request id for this operation.
176+
- `connection_id`: The address (host, port) of the server this command
177+
was sent to.
178+
"""
179+
event = CommandSucceededEvent(
180+
duration, reply, command_name, request_id, connection_id)
181+
for subscriber in get_subscribers(COMMAND):
182+
try:
183+
subscriber.succeeded(event)
184+
except Exception:
185+
_handle_exception()
186+
187+
188+
def publish_command_failure(
189+
duration, failure, command_name, request_id, connection_id):
190+
"""Publish a CommandFailedEvent to all command event subscribers.
191+
192+
:Parameters:
193+
- `duration`: The command duration as a datetime.timedelta.
194+
- `failure`: The server reply document.
195+
- `command_name`: The command name.
196+
- `request_id`: The request id for this operation.
197+
- `connection_id`: The address (host, port) of the server this command
198+
was sent to.
199+
"""
200+
event = CommandFailedEvent(
201+
duration, failure, command_name, request_id, connection_id)
202+
for subscriber in get_subscribers(COMMAND):
203+
try:
204+
subscriber.failed(event)
205+
except Exception:
206+
_handle_exception()
207+
208+
209+
class _CommandEvent(object):
210+
"""Base class for command events."""
211+
212+
__slots__ = ("__cmd_name", "__rqst_id", "__conn_id")
213+
214+
def __init__(self, command_name, request_id, connection_id):
215+
self.__cmd_name = command_name
216+
self.__rqst_id = request_id
217+
self.__conn_id = connection_id
218+
219+
@property
220+
def command_name(self):
221+
"""The command name."""
222+
return self.__cmd_name
223+
224+
@property
225+
def request_id(self):
226+
"""The request id for this operation."""
227+
return self.__rqst_id
228+
229+
@property
230+
def connection_id(self):
231+
"""The address (host, port) of the server this command was sent to."""
232+
return self.__conn_id
233+
234+
235+
class CommandStartedEvent(_CommandEvent):
236+
"""Event published when a command starts.
237+
238+
:Parameters:
239+
- `command`: The command document.
240+
- `database_name`: The name of the database this command was run against.
241+
- `request_id`: The request id for this operation.
242+
- `connection_id`: The address (host, port) of the server this command
243+
was sent to.
244+
"""
245+
__slots__ = ("__cmd", "__db")
246+
247+
def __init__(self, command, database_name, request_id, connection_id):
248+
if not command:
249+
raise ValueError("%r is not a valid command" % (command,))
250+
# Command name must be first key.
251+
command_name = next(iter(command))
252+
super(CommandStartedEvent, self).__init__(command_name,
253+
request_id,
254+
connection_id)
255+
self.__cmd = command
256+
self.__db = database_name
257+
258+
@property
259+
def command(self):
260+
"""The command document."""
261+
return self.__cmd
262+
263+
@property
264+
def database_name(self):
265+
"""The name of the database this command was run against."""
266+
return self.__db
267+
268+
269+
class CommandSucceededEvent(_CommandEvent):
270+
"""Event published when a command succeeds.
271+
272+
:Parameters:
273+
- `duration`: The command duration as a datetime.timedelta.
274+
- `reply`: The server reply document.
275+
- `command_name`: The command name.
276+
- `request_id`: The request id for this operation.
277+
- `connection_id`: The address (host, port) of the server this command
278+
was sent to.
279+
"""
280+
__slots__ = ("__duration_micros", "__reply")
281+
282+
def __init__(self, duration, reply, *args):
283+
super(CommandSucceededEvent, self).__init__(*args)
284+
self.__duration_micros = _to_micros(duration)
285+
self.__reply = reply
286+
287+
@property
288+
def duration_micros(self):
289+
"""The duration of this operation in microseconds."""
290+
return self.__duration_micros
291+
292+
@property
293+
def reply(self):
294+
"""The server failure document for this operation."""
295+
return self.__reply
296+
297+
298+
class CommandFailedEvent(_CommandEvent):
299+
"""Event published when a command fails.
300+
301+
:Parameters:
302+
- `duration`: The command duration as a datetime.timedelta.
303+
- `failure`: The server reply document.
304+
- `command_name`: The command name.
305+
- `request_id`: The request id for this operation.
306+
- `connection_id`: The address (host, port) of the server this command
307+
was sent to.
308+
"""
309+
__slots__ = ("__duration_micros", "__failure")
310+
311+
def __init__(self, duration, failure, *args):
312+
super(CommandFailedEvent, self).__init__(*args)
313+
self.__duration_micros = _to_micros(duration)
314+
self.__failure = failure
315+
316+
@property
317+
def duration_micros(self):
318+
"""The duration of this operation in microseconds."""
319+
return self.__duration_micros
320+
321+
@property
322+
def failure(self):
323+
"""The server failure document for this operation."""
324+
return self.__failure

0 commit comments

Comments
 (0)