Raspberry Pi is a cheap single-board computer, which is often adopted by developers as an economical IoT solution, such as scanning barcode and Qr code. One thing you have to know is the CPU clock speed affects the decoding speed and accuracy. Dynamsoft Barcode Reader SDK supports Raspberry Pi. Its leading strengths are multiple and low-quality barcode decoding capabilities, which heavily rely on the CPU clock rate. Although the latest Raspberry Pi model has a 1.5GHz quad-core CPU, it is still a big challenge to trade off the speed and accuracy by customizing the algorithm parameters. In this article, I will show you how to scan barcode and Qr code in Python asynchronously on Raspberry Pi, as well as how to breakthrough the CPU bottleneck using server-side decoding via socket.
Prerequisites
- Python 3.6 or above
- Dynamsoft Barcode Reader v9.4
- License Key
- Python Package
-
pip install dbr
: the official Python package of Dynamsoft Barcode Reader, which provides full API and relevant documentation. -
pip install barcode-qr-code-sdk
: a community version based on Dynamsoft C/C++ Barcode SDK, providing async decoding API for easy usage.
-
Building Python Barcode Scanner on Raspberry Pi
Implementing a barcode scanner in Python involves the following steps:
- Use OpenCV to capture the video stream from the camera.
- Use Dynamsoft Barcode Reader to decode the barcode from the image.
Barcode reading is a CPU-intensive task. When running synchronous API on a high clock rate CPU, we may not be aware of the latency on desktop computers. However, the CPU clock rate of Raspberry Pi is much lower. To avoid FPS (frames per second) dropping, it is necessary to carry out barcode detection algorithm in a separate thread.
Python's GIL (Global Interpreter Lock) limits the thread concurrency performance, especially for CPU-intensive tasks. The multiprocessing
module is a better choice for the barcode scanning scenario. However, it is not easy to share the memory between processes. The Python package released by Dynamsoft Barcode Reader SDK provides three asynchronous decoding methods start_video_mode(), append_video_frame and stop_video_mode to overcome the GIL limitation. They maintains a C/C++ thread pool and a buffer queue.
To simplify the native-threaded API, the community version adds an alternative method called decodeMatAsync()
, which decodes the latest image buffer and send decoding results via a registered callback function.
import barcodeQrSDK import numpy as np import cv2 import json g_results = None def callback(results, elapsed_time): global g_results g_results = (results, elapsed_time) def run(): # set license barcodeQrSDK.initLicense("DLS2eyJoYW5kc2hha2VDb2RlIjoiMjAwMDAxLTE2NDk4Mjk3OTI2MzUiLCJvcmdhbml6YXRpb25JRCI6IjIwMDAwMSIsInNlc3Npb25QYXNzd29yZCI6IndTcGR6Vm05WDJrcEQ5YUoifQ==") # initialize barcode scanner scanner = barcodeQrSDK.createInstance() params = scanner.getParameters() # register callback function to native thread scanner.addAsyncListener(callback) cap = cv2.VideoCapture(0) while True: ret, image = cap.read() if image is not None: scanner.decodeMatAsync(image) if g_results != None: print('Elapsed time: ' + str(g_results[1]) + 'ms') cv2.putText(image, 'Elapsed time: ' + str(g_results[1]) + 'ms', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) for result in g_results[0]: x1 = result.x1 y1 = result.y1 x2 = result.x2 y2 = result.y2 x3 = result.x3 y3 = result.y3 x4 = result.x4 y4 = result.y4 cv2.drawContours(image, [np.int0([(x1, y1), (x2, y2), (x3, y3), (x4, y4)])], 0, (0, 255, 0), 2) cv2.putText(image, result.text, (x1, y1), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) cv2.imshow('Barcode QR Code Scanner', image) ch = cv2.waitKey(1) if ch == 27: break scanner.clearAsyncListener() if __name__ == '__main__': run()
Moving Barcode Detection Work to Server Side
The performance of running above Python barcode scanner on Raspberry Pi 4 looks not bad. However, if you want to run the program on some older Raspberry Pi models or other cheaper single-board computers with lower CPU clock rate, the performance will be much worse. To relieve the CPU burden, we can move the heavy computation to a powerful server. Here we use Python socket programming. You can get started with the article - Socket Programming in Python.
How to Compress Camera Frames for Socket Transmission
WebP is a modern image format that provides superior lossless and lossy compression for images. It is supported by OpenCV. The following code shows how to encode and decode an image with WebP using OpenCV API:
import cv2 as cv import numpy as np cap = cv.VideoCapture(0) rval, frame = cap.read() # Send webp = cv.imencode('.webp', frame, [cv.IMWRITE_WEBP_QUALITY, 90])[1] bytes_sent = webp.tobytes() # Receive frame = cv.imdecode(np.frombuffer(bytes_sent, np.uint8), cv.IMREAD_COLOR)
A Simple Socket Class for Sending and Receiving Data
We create a SimpleSocket
class with socket
and selector
modules. The selector
is used to implement non-blocking I/O.
import socket import selectors class SimpleSocket(): def __init__(self) -> None: self.sel = selectors.DefaultSelector() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.callback = None def registerEventCb(self, callback): self.callback = callback def startClient(self, address, port): self.sock.setblocking(False) self.sock.connect_ex((address, port)) events = selectors.EVENT_READ | selectors.EVENT_WRITE self.sel.register(self.sock, events, data=self.callback) def startServer(self, port, number_of_clients=1): self.sock.bind(('', port)) self.sock.listen(number_of_clients) print('waiting for a connection at port %s' % port) self.sock.setblocking(False) self.sel.register(self.sock, selectors.EVENT_READ, data=None)
The callback parameter is a tuple of read and write callback functions.
In an infinite loop, we call select()
to wait for I/O events. It monitors connection, read, and write events.
def monitorEvents(self): events = self.sel.select(timeout=None) for key, mask in events: if key.data is None: self.acceptConn(key.fileobj, self.callback) else: self.serveConn(key, mask) def acceptConn(self, sock, callback): connection, addr = sock.accept() print('Connected to %s' % addr[0]) connection.setblocking(False) events = selectors.EVENT_READ | selectors.EVENT_WRITE self.sel.register(connection, events, data=callback) def serveConn(self, key, mask): sock = key.fileobj callback = key.data if callback != None and len(callback) == 2: if mask & selectors.EVENT_READ: data_type, data = self.receiveData(sock) callback[0](data_type, data) if mask & selectors.EVENT_WRITE: data_type, data = callback[1]() if data_type != None and data != None: self.sendData(sock, data_type, data)
For C/S communication, we need to define a simple protocol.
''' +-+-+-+-+-------+-+-------------+-------------------------------+ |Type (1 byte) | Payload length (4 bytes) | |0: text, 1: json 2: webp | | +-------------------------------+-------------------------------+ | Payload Data | +-------------------------------- - - - - - - - - - - - - - - - + : Payload Data continued ... : + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | Payload Data continued ... | +---------------------------------------------------------------+ '''
The first byte is the data type that represents text, json, or webp. The next 4 bytes are the payload length. The rest of the data is the payload data.
According to the protocol, we can implement the receiveData()
and sendData()
functions.
def sendData(self, sock, data_type, data): msg = data_type + len(data).to_bytes(4, 'big') + data return self.send(sock, msg) def receiveData(self, sock): data_type = self.receive(sock, 1) if data_type == b'': return b'', b'' data_length = self.receive(sock, 4) if data_length == b'': return b'', b'' data = self.receive(sock, int.from_bytes(data_length, 'big')) return data_type, data def send(self, sock, msg): try: totalsent = 0 while totalsent < len(msg): sent = sock.send(msg[totalsent:]) if sent == 0: # connection closed return False totalsent = totalsent + sent except Exception as e: print(e) return False return True def receive(self, sock, size): try: chunks = [] bytes_recd = 0 while bytes_recd < size: chunk = sock.recv(min(size, 1024)) if chunk == b'': # connection closed return b'' chunks.append(chunk) bytes_recd = bytes_recd + len(chunk) except Exception as e: print(e) return b'' return b''.join(chunks)
Implementing Server-side Barcode Scanning Solution
The steps to implement the client.
- Create a
client.py
file. -
Set camera resolution to 640x480 and create a loop to capture frames.
import cv2 as cv from simplesocket import SimpleSocket, DataType import json import numpy as np g_local_results = None g_remote_results = None isDisconnected = False msgQueue = [] isReady = True cap = cv.VideoCapture(0) if cap.isOpened() == False: print("Unable to read camera feed") exit() cap.set(cv.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv.CAP_PROP_FRAME_HEIGHT, 480) def run(): while True: rval, frame = cap.read() cv.imshow('client', frame) if cv.waitKey(10) == 27: break if __name__ == '__main__': run()
-
Initialize the socket client and register the callback functions.
def callback(results, elapsed_time): global g_local_results print("Local decoding time: " + str(elapsed_time) + " ms") g_local_results = (results, elapsed_time) def readCb(data_type, data): global isDisconnected, g_remote_results, isReady if data == b'': isDisconnected = True if data_type == DataType.TEXT: text = data.decode('utf-8') print(text) if data_type == DataType.JSON: obj = json.loads(data) g_remote_results = (obj['results'], obj['time']) isReady = True # Data for sending def writeCb(): if len(msgQueue) > 0: data_type, data = msgQueue.pop(0) return data_type, data return None, None def run(): global isDisconnected, g_local_results, g_remote_results, isReady client = SimpleSocket() client.registerEventCb((readCb, writeCb)) client.startClient('192.168.8.72', 8080)
-
Keep reading frames from the camera and send them to the server.
while True: client.monitorEvents() if (isDisconnected): break rval, frame = cap.read() # Send data to server if isReady: isReady = False webp = cv.imencode('.webp', frame, [cv.IMWRITE_WEBP_QUALITY, 90])[1] msgQueue.append((DataType.WEBP, webp.tobytes())) cv.imshow('client', frame) if cv.waitKey(10) == 27: break
-
Display the results when the results are returned from the server.
if g_remote_results != None: print("Remote decoding time: " + str(int(g_remote_results[1])) + " ms") for result in g_remote_results[0]: text = result['text'] x1 = result['x1'] y1 = result['y1'] x2 = result['x2'] y2 = result['y2'] x3 = result['x3'] y3 = result['y3'] x4 = result['x4'] y4 = result['y4'] cv.putText(frame, text, (x1, y1), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) cv.drawContours(frame, [np.int0([(x1, y1), (x2, y2), (x3, y3), (x4, y4)])], 0, (0, 255, 0), 2) cv.putText(frame, "Remote decoding time: " + str(int(g_remote_results[1])) + " ms", (10, 60), cv.FONT_HERSHEY_SIMPLEX, 0.8, (255, 0, 0), 2)
Considering the CPU performance is good and there is no UI required on the server side, we can use synchronous API to recognize barcode and Qr code after receiving the frame from the client. The returned results are encoded as JSON string.
import cv2 as cv import numpy as np from simplesocket import SimpleSocket, DataType import json import barcodeQrSDK g_results = None isDisconnected = False msgQueue = [] # Initialize Dynamsoft Barcode Reader barcodeQrSDK.initLicense("DLS2eyJoYW5kc2hha2VDb2RlIjoiMjAwMDAxLTE2NDk4Mjk3OTI2MzUiLCJvcmdhbml6YXRpb25JRCI6IjIwMDAwMSIsInNlc3Npb25QYXNzd29yZCI6IndTcGR6Vm05WDJrcEQ5YUoifQ==") reader = barcodeQrSDK.createInstance() # Process received data def readCb(data_type, data): global isDisconnected, g_results, msgQueue if data == b'': isDisconnected = True if data_type == DataType.TEXT: text = data.decode('utf-8') print(text) if data_type == DataType.JSON: obj = json.loads(data) print(obj) if data_type == DataType.WEBP: try: frame = cv.imdecode(np.frombuffer(data, np.uint8), cv.IMREAD_COLOR) if frame is not None: results, elpased_time = reader.decodeMat(frame) g_results = (results, elpased_time) if g_results != None: jsonData = {'results': [], 'time': g_results[1]} for result in g_results[0]: format = result.format text = result.text x1 = result.x1 y1 = result.y1 x2 = result.x2 y2 = result.y2 x3 = result.x3 y3 = result.y3 x4 = result.x4 y4 = result.y4 data = {'format': format, 'text': text, 'x1': x1, 'y1': y1, 'x2': x2, 'y2': y2, 'x3': x3, 'y3': y3, 'x4': x4, 'y4': y4} jsonData['results'].append(data) msgQueue.append((DataType.JSON, json.dumps(jsonData).encode('utf-8'))) except Exception as e: isDisconnected = True def writeCb(): if len(msgQueue) > 0: data_type, data = msgQueue.pop(0) return data_type, data return None, None def run(): global isDisconnected server = SimpleSocket() server.registerEventCb((readCb, writeCb)) server.startServer(8080, 1) try: while True: server.monitorEvents() if (isDisconnected): break except KeyboardInterrupt: print("Caught keyboard interrupt, exiting") finally: server.shutdown() if __name__ == '__main__': run()
Testing the Server-side Barcode Scanning Solution
When running the client.py
and server.py
, you need to change the IP address and port. If the network transmission is stable, the server-side barcode scanning solution can achieve an ideal performance.
Source Code
https://github.com/yushulx/python-barcode-qrcode-sdk/tree/main/examples/socket
Top comments (1)
Some comments may only be visible to logged-in visitors. Sign in to view all comments.