Skip to content

Commit ddb2002

Browse files
committed
Add missing tests
1 parent a821158 commit ddb2002

File tree

2 files changed

+44
-8
lines changed

2 files changed

+44
-8
lines changed

airflow/modules/rss_news/rss_news_exporter.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,25 @@
55

66
class NewsExporter:
77
def __init__(self, bootstrap_servers):
8-
self.producer = KafkaProducer(
9-
bootstrap_servers=bootstrap_servers,
10-
value_serializer=lambda x: self._encode_news(x)
8+
self._producer = self._connect_producer(
9+
bootstrap_servers
1110
)
1211

13-
def _encode_news(self, value):
14-
return json.dumps(value).encode("utf-8")
12+
def _connect_producer(self, bootstrap_servers):
13+
def encode_news(value):
14+
return json.dumps(value).encode("utf-8")
15+
16+
producer = KafkaProducer(
17+
bootstrap_servers=bootstrap_servers,
18+
value_serializer=lambda x: encode_news(x)
19+
)
20+
return producer
1521

1622
def __enter__(self):
1723
return self
1824

1925
def export_news_to_broker(self, topic, record, sleep_time=0.01):
20-
response = self.producer.send(
26+
response = self._producer.send(
2127
topic,
2228
value=record
2329
)
@@ -27,4 +33,4 @@ def export_news_to_broker(self, topic, record, sleep_time=0.01):
2733
)
2834

2935
def __exit__(self, type, value, traceback):
30-
self.producer.close()
36+
self._producer.close()

airflow/modules/tests/rss_news/test_rss_news_exporter.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
1-
from unittest.mock import patch
1+
from unittest.mock import patch, Mock
22
import pytest
3+
from rss_news import NewsExporter
4+
5+
6+
@patch("rss_news.rss_news_exporter.KafkaProducer")
7+
def test_connect_producer(mock_producer):
8+
9+
exporter = NewsExporter(["test_broker:9092"])
10+
11+
assert exporter._producer is not None
312

413

514
@patch("rss_news.NewsExporter")
@@ -19,3 +28,24 @@ def test_export_news_to_broker(exporter):
1928
exporter.export_news_to_broker.assert_called_once_with(
2029
topic, news
2130
)
31+
32+
33+
@patch("rss_news.rss_news_exporter.KafkaProducer")
34+
def test_export_news_to_broker_context_manager(mock_producer):
35+
mock_producer.send.return_value = Mock()
36+
topic = "test_topic"
37+
news = {
38+
"_id": "test_id",
39+
"title": "test_title",
40+
"link": "www.test.com",
41+
"date": "2020-01-01 00:00:00",
42+
"description": "Test",
43+
"author": "Test",
44+
"language": "pl"
45+
}
46+
47+
with NewsExporter(["test_broker:9092"]) as exporter:
48+
exporter.export_news_to_broker(topic, news)
49+
exporter._producer.send.assert_called_once_with(
50+
topic, value=news
51+
)

0 commit comments

Comments
 (0)