Skip to content

Commit c5c1973

Browse files
committed
puka: six.
1 parent 75863fb commit c5c1973

File tree

3 files changed

+76
-0
lines changed

3 files changed

+76
-0
lines changed

python-puka/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,8 @@ You may need to install `pip` first:
4949

5050
python receive_logs_topic.py
5151
python emit_log_topic.py
52+
53+
[Tutorial six: RPC](http://www.rabbitmq.com/tutorial-six-python.html):
54+
55+
python rpc_server.py
56+
python rpc_client.py

python-puka/rpc_client.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#!/usr/bin/env python
2+
import puka
3+
import uuid
4+
5+
class FibonacciRpcClient(object):
6+
def __init__(self):
7+
self.client = client = puka.Client("amqp://localhost/")
8+
promise = client.connect()
9+
client.wait(promise)
10+
11+
promise = client.queue_declare(exclusive=True)
12+
self.callback_queue = client.wait(promise)['queue']
13+
14+
self.consume_promise = client.basic_consume(queue=self.callback_queue,
15+
no_ack=True)
16+
17+
def call(self, n):
18+
correlation_id = str(uuid.uuid4())
19+
# We don't need to wait on promise from publish, let it happen async.
20+
self.client.basic_publish(exchange='',
21+
routing_key='rpc_queue',
22+
headers={'reply_to': self.callback_queue,
23+
'correlation_id': correlation_id},
24+
body=str(n))
25+
while True:
26+
msg_result = self.client.wait(self.consume_promise)
27+
if msg_result['headers']['correlation_id'] == correlation_id:
28+
return int(msg_result['body'])
29+
30+
31+
fibonacci_rpc = FibonacciRpcClient()
32+
33+
print " [x] Requesting fib(30)"
34+
response = fibonacci_rpc.call(30)
35+
print " [.] Got %r" % (response,)

python-puka/rpc_server.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#!/usr/bin/env python
2+
import puka
3+
4+
client = puka.Client("amqp://localhost/")
5+
promise = client.connect()
6+
client.wait(promise)
7+
8+
promise = client.queue_declare(queue='rpc_queue')
9+
client.wait(promise)
10+
11+
# The worlds worst algorithm:
12+
def fib(n):
13+
if n == 0:
14+
return 0
15+
elif n == 1:
16+
return 1
17+
else:
18+
return fib(n-1) + fib(n-2)
19+
20+
21+
print " [x] Awaiting RPC requests"
22+
consume_promise = client.basic_consume(queue='rpc_queue', prefetch_count=1)
23+
while True:
24+
msg_result = client.wait(consume_promise)
25+
n = int(msg_result['body'])
26+
27+
print " [.] fib(%s)" % (n,)
28+
response = fib(n)
29+
30+
# This publish doesn't need to be synchronous.
31+
client.basic_publish(exchange='',
32+
routing_key=msg_result['headers']['reply_to'],
33+
headers={'correlation_id':
34+
msg_result['headers']['correlation_id']},
35+
body=str(response))
36+
client.basic_ack(msg_result)

0 commit comments

Comments
 (0)