Merge lp:~pwlars/core-selftest-agent/agent-queues into lp:core-selftest-agent
- agent-queues
- Merge into trunk
| Status: | Merged |
|---|---|
| Approved by: | Paul Larson |
| Approved revision: | 8 |
| Merged at revision: | 6 |
| Proposed branch: | lp:~pwlars/core-selftest-agent/agent-queues |
| Merge into: | lp:core-selftest-agent |
| Diff against target: | 264 lines (+187/-5) 7 files modified core_selftest_agent/__init__.py (+52/-0) core_selftest_agent/constants.py (+11/-4) core_selftest_agent/tests/__init__.py (+16/-0) core_selftest_agent/tests/test_worker.py (+53/-0) core_selftest_agent/worker.py (+52/-0) requirements.txt (+1/-0) setup.py (+2/-1) |
| To merge this branch: | bzr merge lp:~pwlars/core-selftest-agent/agent-queues |
| Related bugs: |
| Reviewer | Review Type | Date Requested | Status |
|---|---|---|---|
| Joe Talbott (community) | Approve | ||
| Paul Larson (community) | Approve | ||
| Francis Ginther | Approve | ||
| Review via email: | |||
Commit message
Add workers and start interacting with the rabbit queues
Description of the change
Still a bit of a work in progress, but I'd like to go ahead and get it out for review and merge it in steps before it grows any more. This just adds some bits to make it start actually interacting with the queues and exchanges.
| Francis Ginther (fginther) wrote : | # |
Just some minor comments, can address them later if necessary.
| Paul Larson (pwlars) wrote : | # |
Ok, thanks for the review. I made the adjustments mentioned in the previous review, please ack.
| Francis Ginther (fginther) wrote : | # |
Approve, all for landing this so that we can start iterating.
| Ubuntu CI Bot (uci-bot) wrote : | # |
The attempt to merge lp:~pwlars/core-selftest-agent/agent-queues into lp:core-selftest-agent failed. Below is the output from the failed tests.
Using base prefix '/usr'
New python executable in /tmp/venv-
Also creating executable in /tmp/venv-
Installing setuptools, pip...done.
Running virtualenv with interpreter /usr/bin/python3
Ignoring indexes: https:/
Downloading/
Running setup.py (path:/
Downloading/
Installing collected packages: python-logstash, uservice-utils
Running setup.py install for python-logstash
Successfully installed python-logstash uservice-utils
Cleaning up...
Downloading/
Downloading flake8-
Downloading/
Downloading/
Downloading pyflakes-
Downloading/
Downloading mccabe-
Downloading/
Downloading pep8-1.
Downloading/
Downloading extras-0.0.3.tar.gz
Running setup.py (path:/
Downloading/
Downloading/
Downloading python-
Running setup.py (path:/
Downloading/
Downloading traceback2-
Downloading/
Downloading six-1.9.
Downloading/
Downloading argparse-
Downloading/
Downloading linecache2-
Installing collected packages: flake8, testtools, pyflakes, mccabe, pep8, extras, unittest2, python-mimeparse, traceback2, six, argparse, linecache2
*** Error compiling '/tmp/venv-
File "/tmp/venv-
raise exc_class, exc_obj, exc_tb
...
| Paul Larson (pwlars) wrote : | # |
Added missing kombu requirement
| Joe Talbott (joetalbott) wrote : | # |
Looks good, thanks.
| Ubuntu CI Bot (uci-bot) wrote : | # |
There are additional revisions which have not been approved in review. Please seek review and approval of these new revisions.
| Ubuntu CI Bot (uci-bot) wrote : | # |
The attempt to merge lp:~pwlars/core-selftest-agent/agent-queues into lp:core-selftest-agent failed. Below is the output from the failed tests.
Using base prefix '/usr'
New python executable in /tmp/venv-
Also creating executable in /tmp/venv-
Installing setuptools, pip...done.
Running virtualenv with interpreter /usr/bin/python3
Ignoring indexes: https:/
Downloading/
Could not find any downloads that satisfy the requirement kombu==3.0.24 (from -r requirements.txt (line 1))
Cleaning up...
No distributions at all found for kombu==3.0.24 (from -r requirements.txt (line 1))
Storing debug log for failure in /home/tarmac/
INFO: Executing: bzr branch lp:~canonical-ci-engineering/core-selftest-agent/pip-cache /tmp/tmp4kpzow9
INFO: Executing: virtualenv -p python3 /tmp/venv-
INFO: Executing: /tmp/venv-
ERROR: Calling `['/tmp/
Branched 1 revision.
Preview Diff
| 1 | === modified file 'core_selftest_agent/__init__.py' |
| 2 | --- core_selftest_agent/__init__.py 2015-05-21 16:39:28 +0000 |
| 3 | +++ core_selftest_agent/__init__.py 2015-05-26 15:50:25 +0000 |
| 4 | @@ -17,6 +17,7 @@ |
| 5 | |
| 6 | import argparse |
| 7 | import configparser |
| 8 | +import kombu |
| 9 | import logging |
| 10 | import os |
| 11 | |
| 12 | @@ -26,6 +27,44 @@ |
| 13 | ) |
| 14 | |
| 15 | from core_selftest_agent import constants |
| 16 | +from core_selftest_agent.worker import CoreSelftestAgentWorker |
| 17 | + |
| 18 | + |
| 19 | +class CoreImageResultPublisher(object): |
| 20 | + """A callable that can notify a results exchange that the test is running |
| 21 | + |
| 22 | + This will be a fanout exchange so multiple consumers can pick up these |
| 23 | + messages and decide whether to take action or ignore them. |
| 24 | + """ |
| 25 | + |
| 26 | + def __init__(self, connection): |
| 27 | + self.connection = connection |
| 28 | + exchange = kombu.Exchange(constants.RESULTS_EXCHANGE, |
| 29 | + type='fanout', durable=False) |
| 30 | + self.queue = kombu.Queue(constants.RESULTS_QUEUE, exchange) |
| 31 | + |
| 32 | + def __call__(self, payload): |
| 33 | + """Take 'payload' and enqueue it on the rabbit exchange.""" |
| 34 | + queue = self.connection.SimpleQueue(self.queue) |
| 35 | + queue.put(payload) |
| 36 | + queue.close() |
| 37 | + |
| 38 | + |
| 39 | +class CoreImageBuilder(object): |
| 40 | + """A callable that knows how to queue message for the image builder |
| 41 | + |
| 42 | + For now we're using a simple queue, but this can easily be extended to use |
| 43 | + a full-blown topic exchange in the future. |
| 44 | + """ |
| 45 | + |
| 46 | + def __init__(self, connection): |
| 47 | + self.connection = connection |
| 48 | + |
| 49 | + def __call__(self, payload): |
| 50 | + """Take 'payload' and enqueue it on the rabbit queue.""" |
| 51 | + queue = self.connection.SimpleQueue(constants.IMAGE_BUILD_QUEUE) |
| 52 | + queue.put(payload) |
| 53 | + queue.close() |
| 54 | |
| 55 | |
| 56 | def read_config(): |
| 57 | @@ -62,3 +101,16 @@ |
| 58 | log_path, |
| 59 | config['logstash'] if 'logstash' in config else None |
| 60 | ) |
| 61 | + |
| 62 | + ampq_uris = config.get('amqp', 'uris').split() |
| 63 | + try: |
| 64 | + with kombu.Connection(ampq_uris) as connection: |
| 65 | + monitor = CoreSelftestAgentWorker( |
| 66 | + connection, |
| 67 | + constants, |
| 68 | + CoreImageBuilder(connection), |
| 69 | + CoreImageResultPublisher(connection), |
| 70 | + ) |
| 71 | + monitor.run() |
| 72 | + except KeyboardInterrupt: |
| 73 | + print("Bye!") |
| 74 | |
| 75 | === modified file 'core_selftest_agent/constants.py' |
| 76 | --- core_selftest_agent/constants.py 2015-05-21 18:47:40 +0000 |
| 77 | +++ core_selftest_agent/constants.py 2015-05-26 15:50:25 +0000 |
| 78 | @@ -20,11 +20,18 @@ |
| 79 | |
| 80 | # Queue names are constant, rather than being defined in the config. |
| 81 | |
| 82 | -# The queue we listen to for new payloads to check: |
| 83 | -INPUT_QUEUE = "pm.candidates" |
| 84 | +# We listen to a fanout exchange for candidate packages to test |
| 85 | +INPUT_EXCHANGE = "candidates.exchange" |
| 86 | + |
| 87 | +# The queue needs to be unique to selftest agents |
| 88 | +INPUT_QUEUE = "core.selftest.candidates" |
| 89 | |
| 90 | # The queue we send events on to request image build and test |
| 91 | -OUTPUT_QUEUE = "core.package.v1" |
| 92 | +IMAGE_BUILD_QUEUE = "snappy-proposed-migration.package.v1" |
| 93 | + |
| 94 | +# We need to notify the results exchange that the test is in progress |
| 95 | +RESULTS_EXCHANGE = "results.exchange" |
| 96 | +RESULTS_QUEUE = "core.selftest.results" |
| 97 | |
| 98 | # The queue we put fatally error'd payloads into: |
| 99 | DEAD_LETTER_QUEUE = "core.deadletters.v1" |
| 100 | @@ -33,7 +40,7 @@ |
| 101 | |
| 102 | SERVICE_NAME = "core-selftest-agent" |
| 103 | |
| 104 | -HOSTNAME = socket.gethostname().replace('-machine', '') |
| 105 | +HOSTNAME = socket.gethostname() |
| 106 | |
| 107 | LOGGING_EXTRA = { |
| 108 | 'solution': SOLUTION_NAME, |
| 109 | |
| 110 | === added file 'core_selftest_agent/tests/__init__.py' |
| 111 | --- core_selftest_agent/tests/__init__.py 1970-01-01 00:00:00 +0000 |
| 112 | +++ core_selftest_agent/tests/__init__.py 2015-05-26 15:50:25 +0000 |
| 113 | @@ -0,0 +1,16 @@ |
| 114 | +# core-selftest-agent |
| 115 | +# Copyright (C) 2015 Canonical |
| 116 | +# |
| 117 | +# This program is free software: you can redistribute it and/or modify |
| 118 | +# it under the terms of the GNU General Public License as published by |
| 119 | +# the Free Software Foundation, either version 3 of the License, or |
| 120 | +# (at your option) any later version. |
| 121 | +# |
| 122 | +# This program is distributed in the hope that it will be useful, |
| 123 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 124 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 125 | +# GNU General Public License for more details. |
| 126 | +# |
| 127 | +# You should have received a copy of the GNU General Public License |
| 128 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 129 | +# |
| 130 | |
| 131 | === added file 'core_selftest_agent/tests/test_worker.py' |
| 132 | --- core_selftest_agent/tests/test_worker.py 1970-01-01 00:00:00 +0000 |
| 133 | +++ core_selftest_agent/tests/test_worker.py 2015-05-26 15:50:25 +0000 |
| 134 | @@ -0,0 +1,53 @@ |
| 135 | +# core-selftest-agent |
| 136 | +# Copyright (C) 2015 Canonical |
| 137 | +# |
| 138 | +# This program is free software: you can redistribute it and/or modify |
| 139 | +# it under the terms of the GNU General Public License as published by |
| 140 | +# the Free Software Foundation, either version 3 of the License, or |
| 141 | +# (at your option) any later version. |
| 142 | +# |
| 143 | +# This program is distributed in the hope that it will be useful, |
| 144 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 145 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 146 | +# GNU General Public License for more details. |
| 147 | +# |
| 148 | +# You should have received a copy of the GNU General Public License |
| 149 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 150 | +# |
| 151 | + |
| 152 | +import kombu |
| 153 | +from testtools import TestCase |
| 154 | + |
| 155 | +from core_selftest_agent import constants |
| 156 | +from core_selftest_agent.worker import CoreSelftestAgentWorker |
| 157 | + |
| 158 | + |
| 159 | +class WorkerTests(TestCase): |
| 160 | + def test_can_read_message(self): |
| 161 | + conn = kombu.Connection('memory:///') |
| 162 | + queue_message(conn, {'test': 'value'}) |
| 163 | + |
| 164 | + build_worker = LoggingConsumer() |
| 165 | + result_worker = LoggingConsumer() |
| 166 | + q = CoreSelftestAgentWorker(conn, constants, |
| 167 | + build_worker, result_worker) |
| 168 | + next(q.consume(limit=1, timeout=5.0)) |
| 169 | + self.assertEqual(build_worker.messages_seen, [dict(test='value')]) |
| 170 | + self.assertEqual(result_worker.messages_seen, [dict(test='value')]) |
| 171 | + |
| 172 | + |
| 173 | +class LoggingConsumer(object): |
| 174 | + """A consumer callback object that acks and logs all received payloads.""" |
| 175 | + def __init__(self): |
| 176 | + self.messages_seen = [] |
| 177 | + |
| 178 | + def __call__(self, message): |
| 179 | + self.messages_seen.append(message) |
| 180 | + |
| 181 | + |
| 182 | +def queue_message(conn, message): |
| 183 | + exchange = kombu.Exchange(constants.INPUT_EXCHANGE, |
| 184 | + type='fanout', durable=False) |
| 185 | + queue = kombu.Queue(constants.INPUT_QUEUE, exchange) |
| 186 | + with conn.SimpleQueue(queue) as q: |
| 187 | + q.put(message) |
| 188 | |
| 189 | === added file 'core_selftest_agent/worker.py' |
| 190 | --- core_selftest_agent/worker.py 1970-01-01 00:00:00 +0000 |
| 191 | +++ core_selftest_agent/worker.py 2015-05-26 15:50:25 +0000 |
| 192 | @@ -0,0 +1,52 @@ |
| 193 | +# core-selftest-agent |
| 194 | +# Copyright (C) 2015 Canonical |
| 195 | +# |
| 196 | +# This program is free software: you can redistribute it and/or modify |
| 197 | +# it under the terms of the GNU General Public License as published by |
| 198 | +# the Free Software Foundation, either version 3 of the License, or |
| 199 | +# (at your option) any later version. |
| 200 | +# |
| 201 | +# This program is distributed in the hope that it will be useful, |
| 202 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 203 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 204 | +# GNU General Public License for more details. |
| 205 | +# |
| 206 | +# You should have received a copy of the GNU General Public License |
| 207 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 208 | +# |
| 209 | + |
| 210 | +import kombu |
| 211 | + |
| 212 | +from kombu.mixins import ConsumerMixin |
| 213 | + |
| 214 | + |
| 215 | +class CoreSelftestAgentWorker(ConsumerMixin): |
| 216 | + |
| 217 | + def __init__(self, connection, config, build_worker, result_worker): |
| 218 | + self.config = config |
| 219 | + self.connection = connection |
| 220 | + self.build_worker = build_worker |
| 221 | + self.result_worker = result_worker |
| 222 | + |
| 223 | + exchange = kombu.Exchange(self.config.INPUT_EXCHANGE, |
| 224 | + type='fanout', durable=False) |
| 225 | + self.queue = kombu.Queue(self.config.INPUT_QUEUE, exchange) |
| 226 | + |
| 227 | + def get_consumers(self, Consumer, channel): |
| 228 | + """Return consumers instances for all configured queues.""" |
| 229 | + queues = [self.queue] |
| 230 | + return [kombu.Consumer( |
| 231 | + channel, queues=queues, callbacks=[self.process])] |
| 232 | + |
| 233 | + def process(self, body, message): |
| 234 | + """ Process incoming messages about candidate packages |
| 235 | + |
| 236 | + TODO: Check if we care about this package, if so, send a message |
| 237 | + to the image builder to create the image and start the tests, and |
| 238 | + a message to the results exchange to mark it as in progress. |
| 239 | + |
| 240 | + For now, just blindly post messages for these |
| 241 | + """ |
| 242 | + |
| 243 | + self.build_worker(body) |
| 244 | + self.result_worker(body) |
| 245 | |
| 246 | === modified file 'requirements.txt' |
| 247 | --- requirements.txt 2015-05-21 18:47:29 +0000 |
| 248 | +++ requirements.txt 2015-05-26 15:50:25 +0000 |
| 249 | @@ -1,2 +1,3 @@ |
| 250 | +kombu==3.0.24 |
| 251 | python-logstash==0.4.2 |
| 252 | uservice-utils==1.0.2.1 |
| 253 | |
| 254 | === modified file 'setup.py' |
| 255 | --- setup.py 2015-05-20 17:10:57 +0000 |
| 256 | +++ setup.py 2015-05-26 15:50:25 +0000 |
| 257 | @@ -36,5 +36,6 @@ |
| 258 | url='https://launchpad.net/core-selftest-agent', |
| 259 | license='GPLv3', |
| 260 | packages=find_packages(), |
| 261 | - scripts=['core-selftest-agent.py'] |
| 262 | + scripts=['core-selftest-agent.py'], |
| 263 | + test_suite='core_selftest_agent.tests', |
| 264 | ) |
One very minor comment in-line. Approving now and you can do what you will with the comment.