Skip to content

Commit 7a62492

Browse files
authored
Merge pull request #44 from damklis/develop
Develop
2 parents 5981a5e + e5434a8 commit 7a62492

File tree

11 files changed

+150
-80
lines changed

11 files changed

+150
-80
lines changed

.coveragerc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[run]
22
omit =
33
*test*.py
4-
*main.py
4+
*operator.py
55

66
[report]
77
exclude_lines =
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from custom_operators.proxypool_operator import ProxyPoolOperator
2+
from custom_operators.rss_news_operator import RSSNewsOperator
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import json
2+
from concurrent.futures import ThreadPoolExecutor
3+
from retry import RetryOnException as retry
4+
from proxypool import (
5+
ProxyPoolValidator,
6+
ProxyPoolScraper,
7+
RedisProxyPoolClient
8+
)
9+
10+
from airflow.models.baseoperator import BaseOperator
11+
from airflow.utils.decorators import apply_defaults
12+
13+
14+
class ProxyPoolOperator(BaseOperator):
15+
16+
@apply_defaults
17+
def __init__(
18+
self,
19+
proxy_webpage,
20+
number_of_proxies,
21+
testing_url,
22+
max_workers,
23+
redis_config,
24+
redis_key,
25+
*args, **kwargs):
26+
super().__init__(*args, **kwargs)
27+
self.proxy_webpage = proxy_webpage
28+
self.testing_url = testing_url
29+
self.number_of_proxies = number_of_proxies
30+
self.max_workers = max_workers
31+
self.redis_config = redis_config
32+
self.redis_key = redis_key
33+
34+
@retry(5)
35+
def execute(self, context):
36+
proxy_scraper = ProxyPoolScraper(self.proxy_webpage)
37+
proxy_validator = ProxyPoolValidator(self.testing_url)
38+
proxy_stream = proxy_scraper.get_proxy_stream(self.number_of_proxies)
39+
40+
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
41+
results = executor.map(
42+
proxy_validator.validate_proxy, proxy_stream
43+
)
44+
valid_proxies = filter(lambda x: x.is_valid is True, results)
45+
sorted_valid_proxies = sorted(
46+
valid_proxies, key=lambda x: x.health, reverse=True
47+
)
48+
49+
with RedisProxyPoolClient(self.redis_key, self.redis_config) as client:
50+
client.override_existing_proxies(
51+
[
52+
json.dumps(record.proxy)
53+
for record in sorted_valid_proxies[:5]
54+
]
55+
)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from log import log
2+
from retry import RetryOnException as retry
3+
from proxypool import RedisProxyPoolClient
4+
from rss_news import (
5+
NewsProducer,
6+
NewsExporter,
7+
NewsValidator
8+
)
9+
10+
from airflow.models.baseoperator import BaseOperator
11+
from airflow.utils.decorators import apply_defaults
12+
13+
14+
@log
15+
class RSSNewsOperator(BaseOperator):
16+
17+
@apply_defaults
18+
def __init__(
19+
self,
20+
validator_config,
21+
rss_feed,
22+
language,
23+
redis_config,
24+
redis_key,
25+
bootstrap_servers,
26+
topic,
27+
*args, **kwargs):
28+
super().__init__(*args, **kwargs)
29+
self.validator_config = validator_config
30+
self.rss_feed = rss_feed
31+
self.language = language
32+
self.redis_config = redis_config
33+
self.redis_key = redis_key
34+
self.bootstrap_servers = bootstrap_servers
35+
self.topic = topic
36+
37+
@retry(5)
38+
def execute(self, context):
39+
validator = NewsValidator(self.validator_config)
40+
producer = NewsProducer(self.rss_feed, self.language)
41+
redis = RedisProxyPoolClient(self.redis_key, self.redis_config)
42+
43+
with NewsExporter(self.bootstrap_servers) as exporter:
44+
proxy = redis.get_proxy()
45+
self.logger.info(proxy)
46+
try:
47+
for news in producer.get_news_stream(proxy):
48+
self.logger.info(news)
49+
validator.validate_news(news)
50+
exporter.export_news_to_broker(
51+
self.topic,
52+
news.as_dict()
53+
)
54+
except Exception as err:
55+
redis.lpop_proxy()
56+
self.logger.error(f"Exception: {err}")
57+
raise err

airflow/dags/rss_news_dag.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
from airflow.operators.python_operator import PythonOperator
55

66
from dags_config import Config as config
7-
from rss_news import export_news_to_broker
8-
from proxypool import update_proxypool
7+
from custom_operators import (
8+
ProxyPoolOperator,
9+
RSSNewsOperator
10+
)
911

1012

1113
def extract_feed_name(url):
@@ -19,12 +21,15 @@ def dummy_callable(action):
1921

2022
def export_events(config, rss_feed, language, dag):
2123
feed_name = extract_feed_name(rss_feed)
22-
return PythonOperator(
24+
return RSSNewsOperator(
2325
task_id=f"exporting_{feed_name}_news_to_broker",
24-
python_callable=export_news_to_broker,
25-
op_kwargs={
26-
"config": config, "rss_feed": rss_feed, "language": language
27-
},
26+
validator_config=config.VALIDATOR_CONFIG,
27+
rss_feed=rss_feed,
28+
language=language,
29+
redis_config=config.REDIS_CONFIG,
30+
redis_key=config.REDIS_KEY,
31+
bootstrap_servers=config.BOOTSTRAP_SERVERS,
32+
topic=config.TOPIC,
2833
dag=dag
2934
)
3035

@@ -46,10 +51,14 @@ def create_dag(dag_id, interval, config, language, rss_feeds):
4651
dag=dag
4752
)
4853

49-
proxypool = PythonOperator(
54+
proxypool = ProxyPoolOperator(
5055
task_id="updating_proxypoool",
51-
python_callable=update_proxypool,
52-
op_kwargs={"config": config},
56+
proxy_webpage=config.PROXY_WEBPAGE,
57+
number_of_proxies=config.NUMBER_OF_PROXIES,
58+
testing_url=config.TESTING_URL,
59+
max_workers=config.NUMBER_OF_PROXIES,
60+
redis_config=config.REDIS_CONFIG,
61+
redis_key=config.REDIS_KEY,
5362
dag=dag
5463
)
5564

@@ -76,5 +85,9 @@ def create_dag(dag_id, interval, config, language, rss_feeds):
7685
interval = f"{n*4}-59/10 * * * *"
7786

7887
globals()[dag_id] = create_dag(
79-
dag_id, interval, config, language, rss_feeds
88+
dag_id,
89+
interval,
90+
config,
91+
language,
92+
rss_feeds
8093
)
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
from proxypool.redis_proxypool_client import RedisProxyPoolClient
2-
from proxypool.main import update_proxypool
32
from proxypool.proxypool_scraper import ProxyPoolScraper, ProxyRecord
43
from proxypool.proxypool_validator import ProxyPoolValidator

airflow/modules/proxypool/main.py

Lines changed: 0 additions & 25 deletions
This file was deleted.
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from rss_news.main import export_news_to_broker
21
from rss_news.rss_news_producer import NewsProducer, NewsFormatter, News
32
from rss_news.rss_news_exporter import NewsExporter
4-
from rss_news.rss_news_validator import NewsValidator
3+
from rss_news.rss_news_validator import NewsValidator

airflow/modules/rss_news/main.py

Lines changed: 0 additions & 30 deletions
This file was deleted.

minio/create_default_bucket.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
echo "Creating default bucket $DEFAULT_BUCKET"
44

55
mkdir -p /data/$DEFAULT_BUCKET \
6-
&& /usr/bin/minio server /data
6+
&& /usr/bin/minio server /data

0 commit comments

Comments
 (0)