Skip to content

Commit 78947a8

Browse files
authored
Merge pull request #16 from hellmarbecker/main
Add SASL authentication as an option
2 parents fd59907 + 4e62b6e commit 78947a8

File tree

2 files changed

+48
-12
lines changed

2 files changed

+48
-12
lines changed

README.md

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pip install -r requirements.txt
2424
## Usage
2525

2626
The Python code can be run in bash with the following,
27-
in SSL security protocol:
27+
in ``SSL`` security protocol:
2828
```bash
2929
python main.py \
3030
--security-protocol ssl \
@@ -35,7 +35,21 @@ python main.py \
3535
--nr-messages 0 \
3636
--max-waiting-time 0
3737
```
38-
in PLAINTEXT security protocol:
38+
in ``SASL_SSL`` security protocol:
39+
```bash
40+
python main.py \
41+
--security-protocol SASL_SSL \
42+
--sasl-mechanism SCRAM-SHA-256 \
43+
--username <USERNAME> \
44+
--password <PASSWORD> \
45+
--cert-folder ~/kafkaCerts/ \
46+
--host kafka-<name>.aivencloud.com \
47+
--port 13041 \
48+
--topic-name pizza-orders \
49+
--nr-messages 0 \
50+
--max-waiting-time 0
51+
```
52+
in ``PLAINTEXT`` security protocol:
3953
```bash
4054
python main.py \
4155
--security-protocol plaintext \
@@ -46,7 +60,7 @@ python main.py \
4660
--max-waiting-time 0
4761
```
4862
Where
49-
* `security-protocol`: Security protocol for Kafka. PLAINTEXT or SSL are supported.
63+
* `security-protocol`: Security protocol for Kafka. ``PLAINTEXT``, ``SSL`` or ``SASL_SSL`` are supported.
5064
* `cert-folder`: points to the folder containing the Apache Kafka CA certificate, Access certificate and Access key (see [blog post](https://aiven.io/blog/create-your-own-data-stream-for-kafka-with-python-and-faker?utm_source=github&utm_medium=organic&utm_campaign=blog_art&utm_content=post) for more)
5165
* `host`: the Apache Kafka host
5266
* `port`: the Apache Kafka port

main.py

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717
MAX_ADDITIONAL_TOPPINGS_IN_PIZZA = 5
1818

1919

20-
2120
# Creating a Faker instance and seeding to have the same results every time we execute the script
2221
fake = Faker()
2322
Faker.seed(4321)
2423

2524

2625
# function produce_msgs starts producing messages with Faker
2726
def produce_msgs(security_protocol='SSL',
27+
sasl_mechanism='SCRAM-SHA-256',
2828
cert_folder = '~/kafka-pizza/',
29+
username = '',
30+
password = '',
2931
hostname = 'hostname',
3032
port = '1234',
3133
topic_name = 'pizza-orders',
@@ -49,6 +51,17 @@ def produce_msgs(security_protocol='SSL',
4951
value_serializer=lambda v: json.dumps(v).encode('ascii'),
5052
key_serializer=lambda v: json.dumps(v).encode('ascii')
5153
)
54+
elif security_protocol.upper() == 'SASL_SSL':
55+
producer = KafkaProducer(
56+
bootstrap_servers=hostname + ':' + port,
57+
security_protocol='SASL_SSL',
58+
sasl_mechanism=sasl_mechanism,
59+
ssl_cafile=cert_folder+'/ca.pem',
60+
sasl_plain_username=username,
61+
sasl_plain_password=password,
62+
value_serializer=lambda v: json.dumps(v).encode('ascii'),
63+
key_serializer=lambda v: json.dumps(v).encode('ascii')
64+
)
5265
else:
5366
sys.exit("This security protocol is not supported!")
5467

@@ -96,8 +109,11 @@ def produce_msgs(security_protocol='SSL',
96109

97110
def main():
98111
parser = argparse.ArgumentParser()
99-
parser.add_argument('--security-protocol', help='Security protocol for Kafka (PLAINTEXT, SSL)', required=True)
100-
parser.add_argument('--cert-folder', help='Path to folder containing required Kafka certificates. Required --security-protocol equal SSL', required=False)
112+
parser.add_argument('--security-protocol', help='Security protocol for Kafka (PLAINTEXT, SSL, SASL_SSL)', required=True)
113+
parser.add_argument('--sasl-mechanism', help='SASL mechanism for Kafka (PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512)', required=False)
114+
parser.add_argument('--cert-folder', help='Path to folder containing required Kafka certificates. Required --security-protocol equal SSL or SASL_SSL', required=False)
115+
parser.add_argument('--username', help='Username. Required if security-protocol is SASL_SSL', required=False)
116+
parser.add_argument('--password', help='Password. Required if security-protocol is SASL_SSL', required=False)
101117
parser.add_argument('--host', help='Kafka Host (obtained from Aiven console)', required=True)
102118
parser.add_argument('--port', help='Kafka Port (obtained from Aiven console)', required=True)
103119
parser.add_argument('--topic-name', help='Topic Name', required=True)
@@ -106,19 +122,25 @@ def main():
106122
parser.add_argument('--subject', help='What type of content to produce (possible choices are [pizza, userbehaviour, stock, realstock, metric] pizza is the default', required=False)
107123
args = parser.parse_args()
108124
p_security_protocol = args.security_protocol
109-
p_cert_folder =args.cert_folder
110-
p_hostname =args.host
111-
p_port =args.port
112-
p_topic_name=args.topic_name
113-
p_subject=args.subject
125+
p_cert_folder = args.cert_folder
126+
p_username = args.username
127+
p_password = args.password
128+
p_sasl_mechanism = args.sasl_mechanism
129+
p_hostname = args.host
130+
p_port = args.port
131+
p_topic_name= args.topic_name
132+
p_subject = args.subject
114133
produce_msgs(security_protocol=p_security_protocol,
115134
cert_folder=p_cert_folder,
135+
username=p_username,
136+
password=p_password,
116137
hostname=p_hostname,
117138
port=p_port,
118139
topic_name=p_topic_name,
119140
nr_messages=int(args.nr_messages),
120141
max_waiting_time_in_sec=float(args.max_waiting_time),
121-
subject=p_subject
142+
subject=p_subject,
143+
sasl_mechanism=p_sasl_mechanism,
122144
)
123145
print(args.nr_messages)
124146

0 commit comments

Comments
 (0)